Как я могу недетерминированно сгладить бесконечные потоки FS2

Я использую потоковую библиотеку Scala FS2.

у меня есть Stream[F, [Stream[F, A]] где и внутренние потоки и внешний поток бесконечны (с соответствующими Async случаи для F). Я хочу закончить с Stream[F, A] который одновременно извлекается из внешнего потока, а также из внутреннего потока, где каждый новый элемент из внешнего потока заменяет текущий внутренний поток, из которого я извлекаю. В частности, я хочу "в конце концов" получить хотя бы попытку извлечь (хотя меня может прервать внешний поток, прежде чем я смогу это сделать) из всех внутренних потоков.

Мои попытки сделать это с внешним Async ссылка, которая содержит текущий внутренний поток, кажется, заканчивается Interrupt исключения бросают.

Я не хочу простой flatMap или даже concurrent.join, Поскольку мои внутренние потоки бесконечны, они никогда не дойдут до конечного числа внутренних потоков.

Есть ли способ добиться этого с помощью FS2?

0 ответов

Другие вопросы по тегам