Реализация Java в Gcloud PubSub - java.util.concurrent.RejectedExecutionException

Я использую образец фрагмента из документации GCloud, чтобы получить сообщение в качестве подписчика. Моя версия pubsub gcloud jar - 0.19.0-alpha

Проблема в том, что я могу получить сообщение с атрибутом map, но у меня продолжает появляться это исключение:

2017-07-12 16: 52: 25,219 [grpc-default-worker-ELG-1-16] WARN io.netty.util.concurrent.DefaultPromise - Исключение было сгенерировано io.grpc.netty.NettyClientHandler$3.operationComplete() java.util.concurrent.RejectedExecutionException: Задача java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@fbf4a6d отклонено из java.util.concurrent.ScheduledThreadPoolExecutor@25cbe860[ограниченные пулы = размер 0, активные задачи, размер 0, активные задачи завершенные задачи = 2403] в java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) в java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) в запланированном экземпляре.delayedExecute(ScheduledThreadPoolExecutor.java:326) в java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) в java.util.concurrent.ScheduledThreadPoolExec.ecava.ExjExecececec $DelegatedExecutorService.execute(Executors.java:668) по адресу io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:110) по адресу io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.onReady(ClientCallomp3)..internal.DelayedStream$DelayedStreamListener.onReady(DelayedStream.java:398) в io.grpc.internal.AbstractStream2$TransportState.notifyIfReady(AbstractStream2.java:305) в io.grpc.internal.AbstractStream2 $ TransportState.locStj.:248) по адресу io.grpc.netty.NettyClientStream$TransportState.setHttp2Stream(NettyClientStream.java:227) по адресу io.grpc.netty.NettyClientHandler$3.operationComplete(NettyClientHandler.java:429) по адресу io.gr.operationComplete(NettyClientHandler.java:417) в io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507) в io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:48).util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420) на io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)

После этого программа выключается и перестает слушать и получать сообщения. Как устранить это прерывание, и я даже избавляюсь от предложения finally, в котором есть subscriber.stopAsync().

0 ответов

В предоставленном ими фрагменте кода есть ошибка. Вам нужно вызвать get() для сообщения messaegeIdFuture. Следующий код решает проблему:

Publisher publisher = null;
String projectId = ServiceOptions.getDefaultProjectId();
ProjectTopicName topic = ProjectTopicName.of(projectId, "test");
ApiFuture<String> messageIdFuture = null;
try {
       publisher = Publisher.newBuilder(topic).build();
       ByteString data = ByteString.copyFromUtf8("my-message");
       PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
       messageIdFuture = publisher.publish(pubsubMessage);
    } catch (IOException e) {
       e.printStackTrace();
    } finally {
        messageIdFuture.get();    //This resolves this issue.
        // Wait on any pending requests
        if (publisher != null) {
            publisher.shutdown();
            //publisher.awaitTermination(1, TimeUnit.SECONDS);
        }
    }
Другие вопросы по тегам