Akka Streams - объедините последние операции
Я хотел бы объединить последние с Akka Streams, как описано здесь.
Я не могу понять, как это сделать - пожалуйста, помогите!
Спасибо, Райан.
1 ответ
Я просто быстро это реализовал. Не уверен, что это без ошибок, но стоит попробовать:) https://gist.github.com/tg44/2e75d45c234ca02d91cfdac35f41a5a2 Комментарии под суть приветствуются!
Как мы говорили на канале Gitter, этого нельзя добиться поэтапно, но вы можете написать функциональность с помощью специального этапа. Вам понадобится два входа и один выход (может быть расширен до N входа), так что это вентилятор в форме.
Я сохраняю входящие элементы в настройках, и всякий раз, когда ввод готов (иначе как отправить элемент), я сохраняю данный элемент в опции. Всякий раз, когда для вывода нужен элемент (а у нас уже есть один элемент из обоих входов), я даю ему значения из опций в виде набора. Это подход с учетом противодавления.
Для подхода с противодавлением (когда вы создаете все пары) вам необходимо обработать ожидающий "другой" выходной элемент, а не последний, и обработать входные извлечения. Я думаю, что моя реализация по-прежнему не обрабатывает слишком быстрых производителей с медленным регистром потребителей (мы можем пропустить один элемент, может обрабатывать выбросы) и может заблокироваться, если оба ввода выдают один и тот же элемент несколько раз (возможно, emits может справиться и с этим).
Если вы хотите расширить функциональность моего кода или написать другие пользовательские этапы, прочитайте это: http://doc.akka.io/docs/akka/2.5/scala/stream/stream-customize.html