Реализация Java в Gcloud PubSub - java.util.concurrent.RejectedExecutionException
Я использую образец фрагмента из документации GCloud, чтобы получить сообщение в качестве подписчика. Моя версия pubsub gcloud jar - 0.19.0-alpha
Проблема в том, что я могу получить сообщение с атрибутом map, но у меня продолжает появляться это исключение:
2017-07-12 16: 52: 25,219 [grpc-default-worker-ELG-1-16] WARN io.netty.util.concurrent.DefaultPromise - Исключение было сгенерировано io.grpc.netty.NettyClientHandler$3.operationComplete() java.util.concurrent.RejectedExecutionException: Задача java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@fbf4a6d отклонено из java.util.concurrent.ScheduledThreadPoolExecutor@25cbe860[ограниченные пулы = размер 0, активные задачи, размер 0, активные задачи завершенные задачи = 2403] в java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) в java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) в запланированном экземпляре.delayedExecute(ScheduledThreadPoolExecutor.java:326) в java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) в java.util.concurrent.ScheduledThreadPoolExec.ecava.ExjExecececec $DelegatedExecutorService.execute(Executors.java:668) по адресу io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:110) по адресу io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.onReady(ClientCallomp3)..internal.DelayedStream$DelayedStreamListener.onReady(DelayedStream.java:398) в io.grpc.internal.AbstractStream2$TransportState.notifyIfReady(AbstractStream2.java:305) в io.grpc.internal.AbstractStream2 $ TransportState.locStj.:248) по адресу io.grpc.netty.NettyClientStream$TransportState.setHttp2Stream(NettyClientStream.java:227) по адресу io.grpc.netty.NettyClientHandler$3.operationComplete(NettyClientHandler.java:429) по адресу io.gr.operationComplete(NettyClientHandler.java:417) в io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507) в io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:48).util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420) на io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
После этого программа выключается и перестает слушать и получать сообщения. Как устранить это прерывание, и я даже избавляюсь от предложения finally, в котором есть subscriber.stopAsync().
0 ответов
В предоставленном ими фрагменте кода есть ошибка. Вам нужно вызвать get() для сообщения messaegeIdFuture. Следующий код решает проблему:
Publisher publisher = null;
String projectId = ServiceOptions.getDefaultProjectId();
ProjectTopicName topic = ProjectTopicName.of(projectId, "test");
ApiFuture<String> messageIdFuture = null;
try {
publisher = Publisher.newBuilder(topic).build();
ByteString data = ByteString.copyFromUtf8("my-message");
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
messageIdFuture = publisher.publish(pubsubMessage);
} catch (IOException e) {
e.printStackTrace();
} finally {
messageIdFuture.get(); //This resolves this issue.
// Wait on any pending requests
if (publisher != null) {
publisher.shutdown();
//publisher.awaitTermination(1, TimeUnit.SECONDS);
}
}