Как читать ответ как Observable[String] с помощью STTP

Я использую STTP-клиент. Я хочу интерпретировать ответ как строки, разделенные на строки, например Observable[String]

Вот STP потокового API:

import java.nio.ByteBuffer

import com.softwaremill.sttp._
import com.softwaremill.sttp.okhttp.monix.OkHttpMonixBackend
import monix.eval.Task
import monix.reactive.Observable

implicit val sttpBackend = OkHttpMonixBackend()

val res: Task[Response[Observable[ByteBuffer]]] = sttp
  .post(uri"someUri")
  .response(asStream[Observable[ByteBuffer]])
  .send()

Так как я могу получить Observable[String]?

Вот несколько идей:

1. Есть ли простой способ split Наблюдаемый линиями?
2. Или, может быть, я могу получить сырой InputStream из ответа, так что я могу легко разделить его, но я не могу найти способ использовать что-то вроде asStream[InputStream]
3. Или, может быть, просто используйте http backend witout sttp слой?

1 ответ

Ваша основная проблема в том, как конвертировать Observable[ByteBuffer] в Observable[String]где каждый String это линия, правильно?

Вы можете использовать метод bufferWithSelector(selector: Observable[S]): Observable[Seq[A]], Этот метод буферизует Observable, пока селектор Observable не выпустит элемент.

Я сделал небольшой пример, используя Ints:

import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._

val source = Observable.range(0, 1000, 1)
  .delayOnNext(100.milliseconds)

val selector = source.filter(_ % 10 == 0)

val buffered = source.bufferWithSelector(selector)
  .map(_.foldLeft("")((s, i) => s + i.toString)) // This folds the Seq[Int] into a String for display purposes

buffered.foreach(println)

Попробуйте!


Конечно, у этого есть главный недостаток: основная наблюдаемая source будет оцениваться дважды. Вы можете увидеть это, изменив приведенный выше пример:

// Start writing your ScalaFiddle code here

import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._

val source = Observable.range(0, 1000, 1)
  .delayOnNext(100.milliseconds)
  .map {x => println(x); x}  // <------------------

val selector = source.filter(_ % 10 == 0)

val buffered = source.bufferWithSelector(selector)
  .map(_.foldLeft("")((s, i) => s + i.toString))

buffered.foreach(println)

Это напечатает каждый номер дважды.


Чтобы это исправить, вы должны преобразовать source Наблюдаемый в горячий Наблюдаемый:

import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._

val source = Observable.range(0, 1000, 1)
  .delayOnNext(100.milliseconds)
  .map {x => println(x); x}
  .publish // <-----------------------------

// source is now a ConnectableObservable and will start emitting elements
// once you call source.connect()

val selector = source.filter(_ % 10 == 0)

val buffered = source.bufferWithSelector(selector)
  .map(_.foldLeft("")((s, i) => s + i.toString))

buffered.foreach(println)

source.connect() // <---------------------------

Попробуйте!

Единственное, что вам нужно сделать, это изменить селектор так, чтобы он генерировал элементы только тогда, когда встречается перевод строки.

Я бы предложил разделить Observable[ByteBuffer] в Observable[Byte] первый (используя flatMap) чтобы избежать головной боли.

Другие вопросы по тегам