HERE.OLP: Как я могу "Подписаться на уведомления" на каталог?
Я пытаюсь подписаться на каталог для уведомления. Но не могу найти символ "thenAppy". Пожалуйста помоги.
// subscription to notifications
CompletionStage<NotificationSubscriptionControl> controlStage =
queryApi.subscribeToNotifications(consumerSettings)
.thenApply(subscription -> {
subscription
.notifications()
.runWith(Sink.foreach(notification ->
// this callback is called each time a new batch publication happens in catalog
System.out.printf("catalog %s has a new version %d\n", catalogHrn, notification.getCatalogVersion())
), myMaterializer);
return subscription.subscriptionControl();
});
[ОШИБКА] ОШИБКА КОМПИЛЯЦИИ: [ИНФО] ----------------------------------------- -------------------- [ОШИБКА] Main.java:[41,25] не может найти символ символа: метод thenApply((subscript...; }) местоположение: Интерфейс org.apache.flink.streaming.api.functions.source.SourceFunction [ERROR] Main.java:[44,65] пакет akka.stream.javadsl не существует [ERROR] Main.java:[47,40] не может найти символ символа: переменная myMaterializer
2 ответа
Судя по вашей ошибке компиляции, вы пытаетесь подписаться на уведомление в приложении Flink. Клиент запроса данных Flink queryApi возвращает функцию SourceFunction, а не CompletionStage. Вы можете использовать это так:
StreamExecutionEnvironment
.getExecutionEnvironment()
.addSource(
query.subscribeToNotifications(
new NotificationConsumerSettings(
"my-notification-consumer-group-1"
)
)
)
.addSink(
notification - > System.out.printf(
"catalog %s has a new version %d\n",
STREAMING_INPUT_CATALOG_HRN,
notification
.getCatalogVersion()
)
);
Если subscribeToNotifications является блокирующей функцией, вы можете заключить ее в завершенный метод метода CompletableFuture.
CompletionStage<NotificationSubscriptionControl> controlStage =
CompletableFuture.completedFuture(queryApi.subscribeToNotifications(consumerSettings))
.thenApply(
subscription -> {
subscription
.notifications()
.runWith(
Sink.foreach(
notification ->
// this callback is called each time a new batch publication
// happens in catalog
System.out.printf(
"catalog %s has a new version %d\n",
catalogHrn, notification.getCatalogVersion())),
myMaterializer);
return subscription.subscriptionControl();
});