Описание тега zio-streams

1 ответ

Как читать сообщения с ZHub через ZStream?

Я новичок в ZHub и ZStream и хотел ознакомиться с их API. К сожалению, я не смог заставить работать этот простой пример: for hub <- Hub.bounded[String](4) stream = ZStream.fromHub(hub) _ <- hub.publish("Hello") _ <- hub.publish("World") col…
06 май '21 в 13:22
2 ответа

ZIO Streams: В чем разница между ZSink и ZTransducer?

Я изучаю ZIO Streams, использую версию библиотеки 1.0.9. zio-streams. Я не могу найти ссылку, которая показывает мне разницу между ZSink и ZTransducer. В чем разница?
11 июл '21 в 16:33
1 ответ

Интеграция потребителей Zio-Kafka с десериализатором Zio-Json

Я изучаю библиотеку zio-kafka, и я хочу использовать zio-json для десериализации значений сообщений в формате JSON. У меня есть простой класс case вместе с его декодером и кодировщиком: case class Player(name: String, score: Int) object Player { imp…
30 июл '21 в 22:33
0 ответов

Могу ли я использовать ZSink для фиксации смещений в Zio-Kafka?

Я изучаю интеграцию ZIO с Apache Kafka, используя библиотеку zio-kafka. В примере на главной странице проекта Github они используют mapM функция для фиксации смещения чанка: Consumer.subscribeAnd(Subscription.topics("topic150")) .plainStream(Serde.s…
01 авг '21 в 22:41
0 ответов

Как я могу создать пакетный запрос и понять ответ с помощью ZStream (ZIO)?

У меня есть api, который получает такой запрос: case class UsersRequest(ids: List[Long]) и возвращает такой ответ: case class UsersInfoResponse(info: List[Info]) case class Info(userId: Long, info: String) Кроме того, у меня есть методы, которые отп…
02 ноя '21 в 21:20
1 ответ

Прервать обработку ZStream mapMPar

У меня есть следующий код, который из-за ограничений максимального количества строк Excel ограничен ~ 1 миллионом строк: ZStream.unwrap(generateStreamData).mapMPar(32) {m => streamDataToCsvExcel } Все достаточно просто и работает отлично. Я отсле…
28 янв '22 в 14:16
1 ответ

Потоковая передача ByteArrayOutputStream в ответ akka http

Я создаю ByteArrayOutputStream, используя ZIO Streams, т.е.: lazy val byteArrayOutputStream = new ByteArrayOutputStream() val sink = ZSink.fromOutputStream(byteArrayOutputStream).contramapChunks[String](_.flatMap(_.getBytes) val data = ZStream.unwra…
1 ответ

zio-grpc bi-stream НЕ закрывается на стороне сервера после закрытия grpcurl с помощью `Ctrl-C`

Консультация по поводу вызова закрытия двухпотока zio-grpc: когда он будет закрыт? Я использую grpcurl для тестирования bistream, но серверная часть zio-grpc не закрывается немедленно (через некоторое время она закроется). Я наблюдаю за событием зак…
17 окт '22 в 03:45
1 ответ

Как создать ZStream из строки из ZStream из байта

Мне нужно прочитать файл с http. Я использую sttp с ZioBackend следующим образом: val sttpBackend: SttpBackend[Task, ZioStreams] = ??? val request = basicRequest .post(uri"...") .response(asStreamUnsafe(ZioStreams)) .readTimeout(Duration.Inf) val re…
22 ноя '22 в 04:41
0 ответов

Как объединить два потока и отсортировать их в zio-потоках

Как и mergeSorted akka-stream . Это очень полезно при обработке данных истории и сохранении временного порядка. Чем заменить в zio-streams?
23 фев '23 в 15:59
0 ответов

Как использовать zio kafka с google protobuf, когда вам нужно прочитать данные из темы и получить их как прототип класса Java?

Мне нужно получить данные из темы Kafka в виде Zio Stream, данные есть в формате googleprotobuf, также мне нужно проверить схему Я использую следующий пример файла protobuf, который создает для меня класс proto.Data Java: syntax = "proto3"; package …
0 ответов

Как анализировать вложенный json-массив в потоковом режиме с помощью zio-json

Для такого массива json: [ my-json-obj1, my-json-obj2, my-json-obj3, .... my-json-objN ] ИMyJsonObjкласс, который представляет отображение одного объекта в массиве, я могу сказать: val myJson = '''[...]''' ZStream .fromIterable(myJson.toSeq) .via(Js…
27 июн '22 в 23:09
0 ответов

Scala ZIOStream с будущим

я пытаюсь сочинять сZStream.asyncследуя этому примеру , но у меня есть проблема с типом:ZIO.fromFutureдает мнеTask[+A] = ZIO[Any, Throwable, A]ноZstream.emitпотребностиZIO[R, Option[E], Chunk[A]] [ Как я могу превратить Future в ZStream, если это им…
15 фев '23 в 07:31
0 ответов

Эквивалентный код будущего взаимодействия ZIO с разными результатами

Работал с ZIO впервые и написал код, сводившийся к следующему: val x = ZStream.fromIterable(Iterable.empty).runDrain await(zio.Runtime.default.unsafeRun(x.toFuture)) он компилируется, но терпит неудачу во время выполнения с [info] java.util.concurre…
27 апр '23 в 18:22
0 ответов

Манипулирование данными с помощью ZIO Stream — компилируется и запускается, но не завершается.

Я создал 3 разные версии мини-конвейера обработки данных. Один с Scala Views, один с FS2 и третий с ZIO Streams. Реализация View и FS2 запускается и завершается довольно быстро (FS2 намного быстрее). Однако моя реализация ZIO компилируется и запуска…
25 мар '23 в 13:26
0 ответов

Scala, ZIO, ZStream – как передать пользовательский объект данных в конечную точку?

Я хочу передавать данные изZstreamс некоторым временем повторения. У меня есть основная функция, которая возвращаетZIO: def processData(request: MyRequest): Task[Seq[SomePayload]] Я также вызываю этот метод для каждого элемента в моем списке.request…
15 мар '23 в 20:19
1 ответ

Правильный способ преобразования очереди, выдающей Take[_, A] в A?

Я не совсем понимаю дизайн пользовательского уровня ZStream ZIO при записи в очередь: val queue: RIO[Scope, Dequeue[Take[Nothing, Int]]] = ZStream(1, 2).toQueue() ... излучающий (например)Take(Success(Chunk(1))) Я понимаю, что на низком уровне что-т…
20 июл '23 в 17:16