FS2: Функциональные потоки для Scala
0 ответов

Как использовать Free с парами в fs2?

Допустим, я хочу прочитать данные из jms, отобразить их через toUpperCase и отправить в другое место. Поэтому я абстрагируюсь от реальной системы обмена сообщениями с Jms[A], право? trait Jms[A] case object Pull extends Jms[String] case class Push(s…
11 ноя '16 в 10:00
2 ответа

FS2: возможно ли завершить очередь изящно?

Предположим, что я хочу преобразовать некоторые устаревшие асинхронные API в потоки FS2. API предоставляет интерфейс с 3 обратными вызовами: следующий элемент, успех, ошибка. Я бы хотел, чтобы Stream испускал все элементы, а затем выполнял их при по…
07 май '18 в 11:13
1 ответ

Как реализовать рекурсивную последовательность Фибоначчи в Scala с использованием FS2?

Пытаясь ознакомиться с FS2, я наткнулся на изящную рекурсивную реализацию, использующую поток коллекций Scala, и подумал, что мне стоит попробовать ее в FS2: import fs2.{Pure, Stream} val fibs: Stream[Pure, Int] = Stream[Pure, Int](0) ++ fibs.fold[I…
09 сен '16 в 17:27
3 ответа

Как разделить поток Fs2 по ключу, чтобы преобразовать каждый раздел отдельно?

Чего я хочу добиться, например, с помощью данных: time, part, data 0, a, 3 1, a, 4 2, b, 10 3, b, 20 3, a, 5 и преобразование: stream.keyBy(_.part).scan(0)((s, d) => s + d) получить: 0, a, 3 1, a, 7 2, b, 10 3, b, 30 3, a, 12 Я попытался разделит…
21 дек '18 в 14:18
0 ответов

Потребитель fs2-kafka обнаружил ошибку несоответствия типов fs2.stream требуется cats.effect.IO

Я следовал за примерами для fs2-kafka. Тем не менее, я довольно застрял на примере для потребителя. Проблема, которую я получаю, заключается в несоответствии типов между fs2.stream и cats.effect.IO (ошибка ниже) Код: NB: теперь обновлено с рекоменда…
01 фев '19 в 20:53
0 ответов

Поток fs2 в сжатый zip fs2 stream

У меня есть поток потоков FS2, и я хотел бы создать сжатый поток, готовый для записи в файл с *.zip расширение или для скачивания. Проблема в том, что поток никогда не заканчивается. Вот код: package backup import java.io.OutputStream import cats.ef…
09 фев '19 в 01:25
0 ответов

Моделирование нескольких вызовов функций с помощью потока (безопасным способом FP)

Учитывая функцию A => IO[B] (ака Kleisli[IO, A, B]) который должен вызываться несколько раз и имеет побочные эффекты, такие как обновление БД, как делегировать такие множественные вызовы в поток (я полагаю, Pipe[IO, A, B]) (fs2, monix наблюдаемый…
25 фев '19 в 10:10
1 ответ

Одновременно работает Scala FS2 Streams

Я пытаюсь смоделировать систему, которая является генератором нагрузки для веб-сайта. На сайте есть страницы, которые на данный момент являются строками. type Page = String val pages: Vector[Page] = Stream.eval(Task.delay { new Random().alphanumeric…
16 окт '16 в 20:05
0 ответов

Как я могу недетерминированно сгладить бесконечные потоки FS2

Я использую потоковую библиотеку Scala FS2. у меня есть Stream[F, [Stream[F, A]] где и внутренние потоки и внешний поток бесконечны (с соответствующими Async случаи для F). Я хочу закончить с Stream[F, A] который одновременно извлекается из внешнего…
04 окт '17 в 18:41
1 ответ

Несоответствие типов при следовании примеру из документации в fs2

Я пытаюсь создать приложение, используя FS2 (0.10.0). Я взял этот пример из документации: import fs2._ // import fs2._ import fs2.async // import fs2.async import scala.concurrent.ExecutionContext // import scala.concurrent.ExecutionContext import c…
04 фев '18 в 22:42
1 ответ

Настройка времени соединения Cassandra в Phantom DSL

Я использую фантом для подключения к Apache Cassandra и хочу настроить соединитель во время выполнения, то есть я хочу проанализировать некоторый файл конфигурации, извлечь список баз данных Cassandra и передать его каким-то образом Database объект.…
27 фев '18 в 12:08
1 ответ

Идиоматический способ обработки нескольких одновременных потоков в Scala

У меня есть список потоков, которые при вызове их next() будет спать случайное количество времени, а затем читать один символ из другого источника. Я пытаюсь написать потребителя (ей), которые будут продолжать вызывать эти потоки до EOF и создать об…
28 май '18 в 12:36
1 ответ

Как программно закрыть fs2.StreamApp?

Простирающийся StreamApp просит вас предоставить stream отсроченный Оно имеет requestShutdown параметр. def stream(args: List[String], requestShutdown: F[Unit]): Stream[F, ExitCode] Я предоставляю реализацию для этого и понимаю, что args передается …
06 мар '18 в 21:59
1 ответ

Как преобразовать поток Scala FS2 в строку?

Я хочу знать, как преобразовать Scala fs2 Stream в строку, из fs2 github readme пример: def converter[F[_]](implicit F: Sync[F]): F[Unit] = { val path = "/Users/lorancechen/version_control_project/_unlimited-works/git-server/src/test/resources" io.f…
25 янв '18 в 10:19
0 ответов

Как сгруппировать объекты с помощью функции классификатора в FS2?

У меня есть поток неупорядоченных measurements, что я хотел бы сгруппировать в партии фиксированного размера, чтобы я мог эффективно сохранить их позже: val measurements = for { id <- Seq("foo", "bar", "baz") value <- 1 to 5 } yield (id, value…
23 июн '18 в 16:53
1 ответ

FS2 (Функциональные потоки для Scala), как сделать группу (n)

Как сгруппировать элементы? Возможные решения: chunkLimit segmentLimit segmentN groupAdjecentBy Которые не делают именно это. я ищу что-то вроде сгруппированных в Akka Streams
07 апр '18 в 13:52
1 ответ

Scala fs2 Потоки с чанками и задачами?

// Simulated external API that synchronously returns elements one at a time indefinitely. def externalApiGet[A](): A = ??? // This wraps with the proper fs2 stream that will indefinitely return values. def wrapGetWithFS2[A](): Stream[Task, A] = Str…
21 дек '16 в 19:30
1 ответ

Поток FS2 работает до конца InputStream

Я очень новичок в FS2 и мне нужна помощь в разработке. Я пытаюсь создать поток, который будет тянуть куски из нижележащего InputStream пока все не закончится. Вот что я попробовал: import java.io.{File, FileInputStream, InputStream} import cats.effe…
24 май '18 в 18:07
0 ответов

Нужны советы о том, как создать потокобезопасную очередь в fs2 (Scala)

Мне нужно реализовать микросервис, который загружает тонну данных в память при запуске и делает эти данные доступными через HTTP GET. Я смотрел на fs2 как вариант, чтобы сделать данные доступными для веб-слоя через fs2.Queue, Меня беспокоит то, что …
29 сен '18 в 08:46
0 ответов

FS2 поток для непрочитанного InputStream

Я хотел бы конвертировать fs2.Stream в java.io.InputStream поэтому я могу передать этот входной поток в http-фреймворк (Finch и Akka Http). Я нашел fs2.io.toInputStream, но это не работает (ничего не печатает): import java.io.{ByteArrayInputStream, …
21 авг '18 в 01:37