Переход от socko к веб-сокетам akka-http
У меня есть приложение akka, основанное на soso websockets. Связь с сокетами происходит внутри одного субъекта, а сообщения, выходящие из субъекта и входящие в него (входящие и исходящие сообщения соответственно), помечаются идентификатором сокета, который является свойством первого класса в soso websocket (в socko приходит запрос на подключение помечены идентификатором, и все переходы жизненного цикла, такие как рукопожатие, разъединение, входящие кадры и т. д., помечаются аналогично)
Я хотел бы переопределить этот единственный актер, используя akka-http (в наши дни socko более или менее заброшен по понятным причинам), но это не так просто, потому что две библиотеки концептуально сильно различаются; akka-http скрывает подробности нижнего уровня о рукопожатии, отключении и т. д., просто отправляя любому действующему субъекту, связанному с http-сервером, заголовок запроса UpgradeToWebsocket. Объект заголовка содержит метод, который принимает материализованный поток в качестве обработчика для всех сообщений, которыми обменивается клиент.
Все идет нормально; Я могу получать сообщения через веб-сокет и отвечать на них напрямую. Все официальные примеры предполагают некоторую модель запроса-ответа без сохранения состояния, поэтому я пытаюсь понять, как сделать следующий шаг для назначения метки материализованному потоку, управления его жизненным циклом и состоянием соединения (мне нужно сообщить другим участникам в приложение, когда клиент разрывает соединение, а также помечает сообщения.)
Альтернатива (перемоделирование всего приложения с использованием akka-streams) - слишком сложная работа, поэтому любые советы о том, как отслеживать сокеты, будут высоко оценены.
2 ответа
Ответ от Rüdiger Klaehn был полезной отправной точкой, спасибо!
В конце концов я пошел с ActorPublisher
после прочтения другого вопроса здесь ( отправка сообщений через веб-сокеты с помощью akka http).
Главное, что поток "материализован" где-то под капотом akka-http, поэтому вам нужно перейти в UpgradeToWebSocket.handleMessagesWithSinkSource
пара источник / приемник, которая уже знает о существующем актере. Поэтому я создаю актера (который реализует ActorPublisher[TextMessage.Strict]
), а затем заверните его в Source.fromPublisher(ActorPublisher(myActor))
,
Когда вы хотите вставить сообщение в поток от актера receive
метод вы сначала проверьте, если totalDemand > 0
(т. е. поток готов принять ввод) и, если это так, вызвать onNext
с содержанием сообщения.
Чтобы взаимодействовать с существующей системой на основе акторов, вы должны посмотреть на Source.actorRef
а также Sink.actorRef
, Source.actorRef
создает ActorRef, на который вы можете отправлять сообщения, и Sink.actorRef
позволяет обрабатывать входящие сообщения с помощью актера, а также обнаруживать закрытие веб-сокета.
Чтобы подключить актера, созданного Source.actorRef
для существующего долгоживущего актера, используйте Flow#mapMaterializedValue
, Это также было бы хорошим местом для назначения уникального идентификатора для сокетного соединения.
Этот ответ на связанный вопрос может помочь вам начать.
Одна вещь, о которой нужно знать. Текущая реализация веб-сокета не закрывает поток от сервера к клиенту, когда поток от клиента к серверу закрывается с помощью сообщения о закрытии веб-сокета. Существует проблема, чтобы реализовать это, но пока она не реализована, вы должны сделать это самостоятельно. Например, имея что-то подобное в вашем стеке протоколов.