Как отправить сообщение в реактивном потоке от подписчика издателю через подключение к веб-сокету

Мое приложение имеет интерфейс Akka-Websocket. Веб-сокет состоит из подписчика актера и издателя актера. Подписчик обрабатывает команды, отправляя их соответствующему субъекту. Издатель прослушивает поток событий и публикует информацию об обновлениях обратно в поток (и, наконец, клиенту). Это хорошо работает.

Мой вопрос: как Абонент может отправить событие обратно в поток? Например, для подтверждения выполнения полученной команды.

public class WebSocketApp extends HttpApp {

  private static final Gson gson = new Gson();

  @Override
  public Route createRoute() {
    return get(
        path("metrics").route(handleWebSocketMessages(metrics()))
        );
  }

  private Flow<Message, Message, ?> metrics() {
    Sink<Message, ActorRef> metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props());
    Source<Message, ActorRef> metricsSource = 
        Source.actorPublisher(WebSocketDataPublisherActor.props())
        .map((measurementData) -> TextMessage.create(gson.toJson(measurementData)));
    return Flow.fromSinkAndSource(metricsSink, metricsSource);
  }
}

Хорошее решение может быть, что подписчик актер (WebSocketCommandSubscriber актер в коде выше) может отправить сообщение обратно в поток, как sender().tell(...)...

1 ответ

Решение

Нет, это не возможно, не напрямую, по крайней мере. Потоки всегда однонаправлены - все сообщения текут в одном направлении, а спрос на них - в противоположном. Вам необходимо передать свои подтверждающие сообщения из приемника в источник, чтобы последний отправил его клиенту, например, путем регистрации исходного субъекта в субъекте приемника. Это может выглядеть так:

Flow.fromSinkAndSourceMat(metricsSink, metricsSource, (sinkActor, sourceActor) -> {
    sinkActor.tell(new RegisterSource(sourceActor), null);
})

Затем, после вашей раковины актер получает RegisterSource сообщение, он может отправить подтверждающие сообщения на предоставленный ActorRef, который затем направит их в выходной поток.

Другие вопросы по тегам