Web3J: код подписки для прослушивания события выполняется дважды двумя разными потоками

Я разрабатываю приложение с SpringBoot и Web3J, где я использую оболочку контракта для взаимодействия с контрактом Smart. Вот автоматически сгенерированный код метода для прослушивания события, называемого NewId:

public Observable<NewIdEventResponse> newIdEventObservable(DefaultBlockParameter startBlock, DefaultBlockParameter endBlock) {
    final Event event = new Event("NewId", 
            Arrays.<TypeReference<?>>asList(),
            Arrays.<TypeReference<?>>asList(new TypeReference<Bytes32>() {}, new TypeReference<Address>() {}));
    EthFilter filter = new EthFilter(startBlock, endBlock, getContractAddress());
    filter.addSingleTopic(EventEncoder.encode(event));
    return web3j.ethLogObservable(filter).map(new Func1<Log, NewIdEventResponse>() {
        @Override
        public NewIdEventResponse call(Log log) {
            EventValues eventValues = extractEventParameters(event, log);
            NewIdEventResponse typedResponse = new NewIdEventResponse();
            typedResponse.key = (Bytes32) eventValues.getNonIndexedValues().get(0);
            typedResponse.contractId = (Address) eventValues.getNonIndexedValues().get(1);
            return typedResponse;
        }
    });
}

Я создал подписку s на наблюдаемую, где я печатаю имя исполняющего потока И увеличиваю счетчик, инициализированный до 0. Затем после выполнения подписки "нить mian" (весенняя загрузка нити) спит в течение 5 секунд, затем печатает значение счетчика и вызывает s.unsubscribe . Вот код:

//Invoke transactional contract method...
                this.counter = 0;
                CountDownLatch latch = new CountDownLatch(1);
                log.warn("Counter value before subscription: "+counter);
                Subscription s = contractWrapper.newIdEventObservable(DefaultBlockParameterName.LATEST, DefaultBlockParameterName.LATEST)
                        .subscribe(evento ->{
                            log.warn("Event Received");
                            log.warn("Thread Name "+Thread.currentThread().getName());
                            latch.countDown();
                            this.testIncCounter();
                            },                              
                        Throwable::printStackTrace);

                latch.await();
                log.warn("Main Thread going to sleep for 5 seconds");
                Thread.sleep(5000);
                log.warn("Unsubscribing...");
                s.unsubscribe();
                log.warn("Counter value "+counter);

Метод testIncCounter синхронизируется:

private synchronized void testIncCounter(){
        counter++;
    }

Проблема заключается в том, что код подписки выполняется дважды разными потоками, как это видно из журналов,а конечное значение счетчика равно 2 и должно быть равно 1. Вот вывод:

2017-11-07 20:56:17.345  WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl    : Counter value before subscription: 0
2017-11-07 20:56:17.433  WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl    : Event Received
2017-11-07 20:56:17.433  WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl    : Thread Name http-nio-8090-exec-4
2017-11-07 20:56:17.449  WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl    : Main Thread going to sleep for 5 seconds
2017-11-07 20:56:17.491  WARN 23533 --- [ool-17-thread-1] com.ckgt.service.impl.CecaServiceImpl    : Event Received
2017-11-07 20:56:17.491  WARN 23533 --- [ool-17-thread-1] com.ckgt.service.impl.CecaServiceImpl    : Thread Name pool-17-thread-1
2017-11-07 20:56:22.450  WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl    : Unsubscribing...
2017-11-07 20:56:22.459  WARN 23533 --- [nio-8090-exec-4] com.ckgt.service.impl.CecaServiceImpl    : Counter value 2

Может быть, это ошибка, или я делаю что-то не так? Заранее большое спасибо.

0 ответов

Другие вопросы по тегам