Описание тега zio-streams
1
ответ
Как читать сообщения с ZHub через ZStream?
Я новичок в ZHub и ZStream и хотел ознакомиться с их API. К сожалению, я не смог заставить работать этот простой пример: for hub <- Hub.bounded[String](4) stream = ZStream.fromHub(hub) _ <- hub.publish("Hello") _ <- hub.publish("World") col…
06 май '21 в 13:22
2
ответа
ZIO Streams: В чем разница между ZSink и ZTransducer?
Я изучаю ZIO Streams, использую версию библиотеки 1.0.9. zio-streams. Я не могу найти ссылку, которая показывает мне разницу между ZSink и ZTransducer. В чем разница?
11 июл '21 в 16:33
1
ответ
Интеграция потребителей Zio-Kafka с десериализатором Zio-Json
Я изучаю библиотеку zio-kafka, и я хочу использовать zio-json для десериализации значений сообщений в формате JSON. У меня есть простой класс case вместе с его декодером и кодировщиком: case class Player(name: String, score: Int) object Player { imp…
30 июл '21 в 22:33
0
ответов
Могу ли я использовать ZSink для фиксации смещений в Zio-Kafka?
Я изучаю интеграцию ZIO с Apache Kafka, используя библиотеку zio-kafka. В примере на главной странице проекта Github они используют mapM функция для фиксации смещения чанка: Consumer.subscribeAnd(Subscription.topics("topic150")) .plainStream(Serde.s…
01 авг '21 в 22:41
0
ответов
Как я могу создать пакетный запрос и понять ответ с помощью ZStream (ZIO)?
У меня есть api, который получает такой запрос: case class UsersRequest(ids: List[Long]) и возвращает такой ответ: case class UsersInfoResponse(info: List[Info]) case class Info(userId: Long, info: String) Кроме того, у меня есть методы, которые отп…
02 ноя '21 в 21:20
1
ответ
Прервать обработку ZStream mapMPar
У меня есть следующий код, который из-за ограничений максимального количества строк Excel ограничен ~ 1 миллионом строк: ZStream.unwrap(generateStreamData).mapMPar(32) {m => streamDataToCsvExcel } Все достаточно просто и работает отлично. Я отсле…
28 янв '22 в 14:16
1
ответ
Потоковая передача ByteArrayOutputStream в ответ akka http
Я создаю ByteArrayOutputStream, используя ZIO Streams, т.е.: lazy val byteArrayOutputStream = new ByteArrayOutputStream() val sink = ZSink.fromOutputStream(byteArrayOutputStream).contramapChunks[String](_.flatMap(_.getBytes) val data = ZStream.unwra…
29 ноя '21 в 12:06
1
ответ
zio-grpc bi-stream НЕ закрывается на стороне сервера после закрытия grpcurl с помощью `Ctrl-C`
Консультация по поводу вызова закрытия двухпотока zio-grpc: когда он будет закрыт? Я использую grpcurl для тестирования bistream, но серверная часть zio-grpc не закрывается немедленно (через некоторое время она закроется). Я наблюдаю за событием зак…
17 окт '22 в 03:45
1
ответ
Как создать ZStream из строки из ZStream из байта
Мне нужно прочитать файл с http. Я использую sttp с ZioBackend следующим образом: val sttpBackend: SttpBackend[Task, ZioStreams] = ??? val request = basicRequest .post(uri"...") .response(asStreamUnsafe(ZioStreams)) .readTimeout(Duration.Inf) val re…
22 ноя '22 в 04:41
0
ответов
Как объединить два потока и отсортировать их в zio-потоках
Как и mergeSorted akka-stream . Это очень полезно при обработке данных истории и сохранении временного порядка. Чем заменить в zio-streams?
23 фев '23 в 15:59
0
ответов
Как использовать zio kafka с google protobuf, когда вам нужно прочитать данные из темы и получить их как прототип класса Java?
Мне нужно получить данные из темы Kafka в виде Zio Stream, данные есть в формате googleprotobuf, также мне нужно проверить схему Я использую следующий пример файла protobuf, который создает для меня класс proto.Data Java: syntax = "proto3"; package …
15 июл '22 в 16:34
0
ответов
Как анализировать вложенный json-массив в потоковом режиме с помощью zio-json
Для такого массива json: [ my-json-obj1, my-json-obj2, my-json-obj3, .... my-json-objN ] ИMyJsonObjкласс, который представляет отображение одного объекта в массиве, я могу сказать: val myJson = '''[...]''' ZStream .fromIterable(myJson.toSeq) .via(Js…
27 июн '22 в 23:09
0
ответов
Scala ZIOStream с будущим
я пытаюсь сочинять сZStream.asyncследуя этому примеру , но у меня есть проблема с типом:ZIO.fromFutureдает мнеTask[+A] = ZIO[Any, Throwable, A]ноZstream.emitпотребностиZIO[R, Option[E], Chunk[A]] [ Как я могу превратить Future в ZStream, если это им…
15 фев '23 в 07:31
0
ответов
Эквивалентный код будущего взаимодействия ZIO с разными результатами
Работал с ZIO впервые и написал код, сводившийся к следующему: val x = ZStream.fromIterable(Iterable.empty).runDrain await(zio.Runtime.default.unsafeRun(x.toFuture)) он компилируется, но терпит неудачу во время выполнения с [info] java.util.concurre…
27 апр '23 в 18:22
0
ответов
Манипулирование данными с помощью ZIO Stream — компилируется и запускается, но не завершается.
Я создал 3 разные версии мини-конвейера обработки данных. Один с Scala Views, один с FS2 и третий с ZIO Streams. Реализация View и FS2 запускается и завершается довольно быстро (FS2 намного быстрее). Однако моя реализация ZIO компилируется и запуска…
25 мар '23 в 13:26
0
ответов
Scala, ZIO, ZStream – как передать пользовательский объект данных в конечную точку?
Я хочу передавать данные изZstreamс некоторым временем повторения. У меня есть основная функция, которая возвращаетZIO: def processData(request: MyRequest): Task[Seq[SomePayload]] Я также вызываю этот метод для каждого элемента в моем списке.request…
15 мар '23 в 20:19
1
ответ
Правильный способ преобразования очереди, выдающей Take[_, A] в A?
Я не совсем понимаю дизайн пользовательского уровня ZStream ZIO при записи в очередь: val queue: RIO[Scope, Dequeue[Take[Nothing, Int]]] = ZStream(1, 2).toQueue() ... излучающий (например)Take(Success(Chunk(1))) Я понимаю, что на низком уровне что-т…
20 июл '23 в 17:16