Scala, ZIO, ZStream – как передать пользовательский объект данных в конечную точку?

Я хочу передавать данные изZstreamс некоторым временем повторения. У меня есть основная функция, которая возвращаетZIO:

      def processData(request: MyRequest): Task[Seq[SomePayload]]

Я также вызываю этот метод для каждого элемента в моем списке.requests.

Теперь я хотел бы взять этот вывод и передать в поток json, который у меня есть.Seq[SomePayload]. Я создал поток с конечной точкой:

        val streamingServerEndpoint: ZServerEndpoint[Any, ZioStreams] = streamingEndpoint.zServerLogic { _ =>
    val stream  =
      ZStream.fromZIO(ZIO.collectAll(requests.collect(service.processData(_))).repeat(Schedule.minuteOfHour(30))).map(_.toByte)

    ZIO.succeed((100L, stream))
  }

Конечная точка:

        val streamingEndpoint: PublicEndpoint[Unit, Unit, (Long, Stream[Throwable, Byte]), ZioStreams] =
    endpoint.get
      .in("receive")
      .out(header[Long](HeaderNames.ContentLength))
      .out(streamTextBody(ZioStreams)(CodecFormat.TextPlain(), Some(StandardCharsets.UTF_8)))

Но когда я запускаю этот код, я могу сделать запрос/receiveконечная точка, но на выходе ничего не передается. В журналах я вижу, что метод работает и возвращает данные, но на конечной точке ничего. Как мне изменить этот код для потоковой передачи jsons (или jsons в виде байтов) через API? я тоже хочу бежатьprocessDataдля всех запросов каждый час (минута 30 каждый час).

0 ответов

Другие вопросы по тегам