Как я могу недетерминированно сгладить бесконечные потоки FS2
Я использую потоковую библиотеку Scala FS2.
у меня есть Stream[F, [Stream[F, A]]
где и внутренние потоки и внешний поток бесконечны (с соответствующими Async
случаи для F
). Я хочу закончить с Stream[F, A]
который одновременно извлекается из внешнего потока, а также из внутреннего потока, где каждый новый элемент из внешнего потока заменяет текущий внутренний поток, из которого я извлекаю. В частности, я хочу "в конце концов" получить хотя бы попытку извлечь (хотя меня может прервать внешний поток, прежде чем я смогу это сделать) из всех внутренних потоков.
Мои попытки сделать это с внешним Async
ссылка, которая содержит текущий внутренний поток, кажется, заканчивается Interrupt
исключения бросают.
Я не хочу простой flatMap
или даже concurrent.join
, Поскольку мои внутренние потоки бесконечны, они никогда не дойдут до конечного числа внутренних потоков.
Есть ли способ добиться этого с помощью FS2?