Потоковая передача данных из Play Enumerator через Spray с использованием Chunked Responses

У меня есть данные, извлекаемые из Reactive Mongo, которые мне нужно протолкнуть через Spray Rest API. Я надеялся сделать это с помощью Chunked Response. Однако я обнаружил, что Enumerator, который возвращается из Reactive Mongo, способен проталкивать Spray быстрее, чем может обработать сетевое соединение. В результате соединение разрывается.

Мне удалось решить эту проблему с помощью функции Spray Ack в актере среднего уровня. Это, наряду с блокирующим ожиданием, позволило мне создать противодавление на счетчике. Однако я не хочу Await. Я хотел бы найти способ для потоковой передачи данных через Spray неблокирующим способом.

Это возможно? У меня есть несколько идей, которые могут сработать, если я смогу заполнить недостающие фрагменты.

1) Создайте обратное давление на Перечислитель неблокирующим способом (не знаю, как это сделать. Предложения?)

2) Разбить счетчик на меньшие счетчики. Начните использовать каждый перечислитель только после завершения предыдущего. Я могу сделать это с помощью актера. Здесь мне не хватает способа разбить больший перечислитель на меньшие перечислители.

3) Используйте что-то вроде метода "Enumeratee.take". Где бы я взял некоторое количество записей у Перечислителя, тогда, когда я буду готов, возьму еще немного. Это действительно то же самое решение, что и 2), но с несколько иной точки зрения. Однако это потребует, чтобы перечислитель поддерживал состояние. Есть ли способ использовать Enumeratee.take несколько раз против одного и того же перечислителя, не перезапускаясь с самого начала каждый раз?

Может ли кто-нибудь предложить альтернативные предложения, которые могут работать? Или, если это невозможно, пожалуйста, дайте мне знать.

Я использую Play Enumerators 2.3.5

2 ответа

Решение

После значительных экспериментов (и помощи от stackru) я смог найти решение, которое, кажется, работает. Он использует Spray Chunked Responses и строит вокруг него итерацию.

Соответствующие фрагменты кода включены здесь:

ChunkedResponder.scala

package chunkedresponses

import akka.actor.{Actor, ActorRef}
import spray.http.HttpHeaders.RawHeader
import spray.http._

object ChunkedResponder {
  case class Chunk(data: HttpData)
  case object Shutdown
  case object Ack
}

class ChunkedResponder(contentType: ContentType, responder: ActorRef) extends Actor {
  import ChunkedResponder._
  def receive:Receive = {
    case chunk: Chunk =>
      responder.forward(ChunkedResponseStart(HttpResponse(entity = HttpEntity(contentType, chunk.data))).withAck(Ack))
      context.become(chunking)
    case Shutdown =>
      responder.forward(HttpResponse(headers = List(RawHeader("Content-Type", contentType.value))).withAck(Ack))
      context.stop(self)
  }

  def chunking:Receive = {
    case chunk: Chunk =>
      responder.forward(MessageChunk(chunk.data).withAck(Ack))
    case Shutdown =>
      responder.forward(ChunkedMessageEnd().withAck(Ack))
      context.stop(self)
  }
}

ChunkIteratee.scala

package chunkedresponses

import akka.actor.ActorRef
import akka.util.Timeout
import akka.pattern.ask
import play.api.libs.iteratee.{Done, Step, Input, Iteratee}
import spray.http.HttpData
import scala.concurrent.duration._

import scala.concurrent.{ExecutionContext, Future}

class ChunkIteratee(chunkedResponder: ActorRef) extends Iteratee[HttpData, Unit] {
  import ChunkedResponder._
  private implicit val timeout = Timeout(30.seconds)

  def fold[B](folder: (Step[HttpData, Unit]) => Future[B])(implicit ec: ExecutionContext): Future[B] = {
    def waitForAck(future: Future[Any]):Iteratee[HttpData, Unit] = Iteratee.flatten(future.map(_ => this))

    def step(input: Input[HttpData]):Iteratee[HttpData, Unit] = input match {
      case Input.El(e) => waitForAck(chunkedResponder ? Chunk(e))
      case Input.Empty => waitForAck(Future.successful(Unit))
      case Input.EOF =>
        chunkedResponder ! Shutdown
        Done(Unit, Input.EOF)
    }

    folder(Step.Cont(step))
  }
}

package.scala

import akka.actor.{ActorContext, ActorRefFactory, Props}
import play.api.libs.iteratee.Enumerator
import spray.http.{HttpData, ContentType}
import spray.routing.RequestContext

import scala.concurrent.ExecutionContext

package object chunkedresponses {
  implicit class ChunkedRequestContext(requestContext: RequestContext) {
    def completeChunked(contentType: ContentType, enumerator: Enumerator[HttpData])
                       (implicit executionContext: ExecutionContext, actorRefFactory: ActorRefFactory) {
      val chunkedResponder = actorRefFactory.actorOf(Props(new ChunkedResponder(contentType, requestContext.responder)))
      val iteratee = new ChunkIteratee(chunkedResponder)
      enumerator.run(iteratee)
    }
  }
}

Я думаю, что идея заключается в том, что вы реализуете Iteratee чья fold Метод вызывает только предоставленный обратный вызов после получения подтверждения Spray. Что-то вроде:

def handleData(input: Input[String]) = new Iteratee[String] {
  def fold[B](folder: Step[Error, String] => Future[B]): Future[B] = {
    (sprayActor ? input).flatMap {
      case success => folder(Cont(handleData))
      case error => folder(Error(...))
      case done => ...
    }
  }
}

val initialIteratee = new Iteratee[String] {
  def fold[B](folder: Step[Error, String] => Future[B]) = folder(Cont(handleData))
}

enumerator.run(initialIteratee)

Это должно быть неблокирующим, но гарантирует, что следующий блок будет отправлен только после того, как предыдущий блок успешно завершен.

Другие вопросы по тегам