Web3J: код подписки для прослушивания события выполняется дважды двумя разными потоками
Я разрабатываю приложение с SpringBoot и Web3J, где я использую оболочку контракта для взаимодействия с контрактом Smart. Вот автоматически сгенерированный код метода для прослушивания события, называемого NewId:
public Observable<NewIdEventResponse> newIdEventObservable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
final Event event = new Event("NewId",
Arrays.<TypeReference<?>>asList(),
Arrays.<TypeReference<?>>asList(new TypeReference<Bytes32>() {}, new TypeReference<Address>() {}));
EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
filter.addSingleTopic(EventEncoder.encode(event));
return web3j.ethLogObservable(filter).map(new Func1<Log, NewIdEventResponse>() {
@Override
public NewIdEventResponse call(Log log) {
EventValues eventValues = extractEventParameters(event, log);
NewIdEventResponse typedResponse = new NewIdEventResponse();
typedResponse.key = (Bytes32) eventValues.getNonIndexedValues().get(0);
typedResponse.contractId = (Address) eventValues.getNonIndexedValues().get(1);
return typedResponse;
}
});
}
Я создал подписку s на наблюдаемую, где я печатаю имя исполняющего потока И увеличиваю счетчик, инициализированный до 0. Затем после выполнения подписки "нить mian" (весенняя загрузка нити) спит в течение 5 секунд, затем печатает значение счетчика и вызывает s.unsubscribe . Вот код:
//Invoke transactional contract method...
this.counter = 0;
CountDownLatch latch = new CountDownLatch(1);
log.warn("Counter value before subscription: "+counter);
Subscription s = contractWrapper.newIdEventObservable(DefaultBlockParameterName.LATEST, DefaultBlockParameterName.LATEST)
.subscribe(evento ->{
log.warn("Event Received");
log.warn("Thread Name "+Thread.currentThread().getName());
latch.countDown();
this.testIncCounter();
},
Throwable::printStackTrace);
latch.await();
log.warn("Main Thread going to sleep for 5 seconds");
Thread.sleep(5000);
log.warn("Unsubscribing...");
s.unsubscribe();
log.warn("Counter value "+counter);
Метод testIncCounter синхронизируется:
private synchronized void testIncCounter(){
counter++;
}
Проблема заключается в том, что код подписки выполняется дважды разными потоками, как это видно из журналов,а конечное значение счетчика равно 2 и должно быть равно 1. Вот вывод:
2017-11-07 20:56:17.345 WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl : Counter value before subscription: 0
2017-11-07 20:56:17.433 WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl : Event Received
2017-11-07 20:56:17.433 WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl : Thread Name http-nio-8090-exec-4
2017-11-07 20:56:17.449 WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl : Main Thread going to sleep for 5 seconds
2017-11-07 20:56:17.491 WARN 23533 --- [ool-17-thread-1] com.ckgt.service.impl.CecaServiceImpl : Event Received
2017-11-07 20:56:17.491 WARN 23533 --- [ool-17-thread-1] com.ckgt.service.impl.CecaServiceImpl : Thread Name pool-17-thread-1
2017-11-07 20:56:22.450 WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl : Unsubscribing...
2017-11-07 20:56:22.459 WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl : Counter value 2
Может быть, это ошибка, или я делаю что-то не так? Заранее большое спасибо.