Переход от socko к веб-сокетам akka-http

У меня есть приложение akka, основанное на soso websockets. Связь с сокетами происходит внутри одного субъекта, а сообщения, выходящие из субъекта и входящие в него (входящие и исходящие сообщения соответственно), помечаются идентификатором сокета, который является свойством первого класса в soso websocket (в socko приходит запрос на подключение помечены идентификатором, и все переходы жизненного цикла, такие как рукопожатие, разъединение, входящие кадры и т. д., помечаются аналогично)

Я хотел бы переопределить этот единственный актер, используя akka-http (в наши дни socko более или менее заброшен по понятным причинам), но это не так просто, потому что две библиотеки концептуально сильно различаются; akka-http скрывает подробности нижнего уровня о рукопожатии, отключении и т. д., просто отправляя любому действующему субъекту, связанному с http-сервером, заголовок запроса UpgradeToWebsocket. Объект заголовка содержит метод, который принимает материализованный поток в качестве обработчика для всех сообщений, которыми обменивается клиент.

Все идет нормально; Я могу получать сообщения через веб-сокет и отвечать на них напрямую. Все официальные примеры предполагают некоторую модель запроса-ответа без сохранения состояния, поэтому я пытаюсь понять, как сделать следующий шаг для назначения метки материализованному потоку, управления его жизненным циклом и состоянием соединения (мне нужно сообщить другим участникам в приложение, когда клиент разрывает соединение, а также помечает сообщения.)

Альтернатива (перемоделирование всего приложения с использованием akka-streams) - слишком сложная работа, поэтому любые советы о том, как отслеживать сокеты, будут высоко оценены.

2 ответа

Решение

Ответ от Rüdiger Klaehn был полезной отправной точкой, спасибо!

В конце концов я пошел с ActorPublisher после прочтения другого вопроса здесь ( отправка сообщений через веб-сокеты с помощью akka http).

Главное, что поток "материализован" где-то под капотом akka-http, поэтому вам нужно перейти в UpgradeToWebSocket.handleMessagesWithSinkSource пара источник / приемник, которая уже знает о существующем актере. Поэтому я создаю актера (который реализует ActorPublisher[TextMessage.Strict]), а затем заверните его в Source.fromPublisher(ActorPublisher(myActor)),

Когда вы хотите вставить сообщение в поток от актера receive метод вы сначала проверьте, если totalDemand > 0 (т. е. поток готов принять ввод) и, если это так, вызвать onNext с содержанием сообщения.

Чтобы взаимодействовать с существующей системой на основе акторов, вы должны посмотреть на Source.actorRef а также Sink.actorRef, Source.actorRef создает ActorRef, на который вы можете отправлять сообщения, и Sink.actorRef позволяет обрабатывать входящие сообщения с помощью актера, а также обнаруживать закрытие веб-сокета.

Чтобы подключить актера, созданного Source.actorRef для существующего долгоживущего актера, используйте Flow#mapMaterializedValue, Это также было бы хорошим местом для назначения уникального идентификатора для сокетного соединения.

Этот ответ на связанный вопрос может помочь вам начать.

Одна вещь, о которой нужно знать. Текущая реализация веб-сокета не закрывает поток от сервера к клиенту, когда поток от клиента к серверу закрывается с помощью сообщения о закрытии веб-сокета. Существует проблема, чтобы реализовать это, но пока она не реализована, вы должны сделать это самостоятельно. Например, имея что-то подобное в вашем стеке протоколов.

Другие вопросы по тегам