Как читать ответ как Observable[String] с помощью STTP
Я использую STTP-клиент. Я хочу интерпретировать ответ как строки, разделенные на строки, например Observable[String]
Вот STP потокового API:
import java.nio.ByteBuffer
import com.softwaremill.sttp._
import com.softwaremill.sttp.okhttp.monix.OkHttpMonixBackend
import monix.eval.Task
import monix.reactive.Observable
implicit val sttpBackend = OkHttpMonixBackend()
val res: Task[Response[Observable[ByteBuffer]]] = sttp
.post(uri"someUri")
.response(asStream[Observable[ByteBuffer]])
.send()
Так как я могу получить Observable[String]
?
Вот несколько идей:
1. Есть ли простой способ split
Наблюдаемый линиями?
2. Или, может быть, я могу получить сырой InputStream
из ответа, так что я могу легко разделить его, но я не могу найти способ использовать что-то вроде asStream[InputStream]
3. Или, может быть, просто используйте http backend witout sttp
слой?
1 ответ
Ваша основная проблема в том, как конвертировать Observable[ByteBuffer]
в Observable[String]
где каждый String
это линия, правильно?
Вы можете использовать метод bufferWithSelector(selector: Observable[S]): Observable[Seq[A]]
, Этот метод буферизует Observable, пока селектор Observable не выпустит элемент.
Я сделал небольшой пример, используя Int
s:
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._
val source = Observable.range(0, 1000, 1)
.delayOnNext(100.milliseconds)
val selector = source.filter(_ % 10 == 0)
val buffered = source.bufferWithSelector(selector)
.map(_.foldLeft("")((s, i) => s + i.toString)) // This folds the Seq[Int] into a String for display purposes
buffered.foreach(println)
Конечно, у этого есть главный недостаток: основная наблюдаемая source
будет оцениваться дважды. Вы можете увидеть это, изменив приведенный выше пример:
// Start writing your ScalaFiddle code here
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._
val source = Observable.range(0, 1000, 1)
.delayOnNext(100.milliseconds)
.map {x => println(x); x} // <------------------
val selector = source.filter(_ % 10 == 0)
val buffered = source.bufferWithSelector(selector)
.map(_.foldLeft("")((s, i) => s + i.toString))
buffered.foreach(println)
Это напечатает каждый номер дважды.
Чтобы это исправить, вы должны преобразовать source
Наблюдаемый в горячий Наблюдаемый:
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration._
val source = Observable.range(0, 1000, 1)
.delayOnNext(100.milliseconds)
.map {x => println(x); x}
.publish // <-----------------------------
// source is now a ConnectableObservable and will start emitting elements
// once you call source.connect()
val selector = source.filter(_ % 10 == 0)
val buffered = source.bufferWithSelector(selector)
.map(_.foldLeft("")((s, i) => s + i.toString))
buffered.foreach(println)
source.connect() // <---------------------------
Единственное, что вам нужно сделать, это изменить селектор так, чтобы он генерировал элементы только тогда, когда встречается перевод строки.
Я бы предложил разделить Observable[ByteBuffer]
в Observable[Byte]
первый (используя flatMap
) чтобы избежать головной боли.