Акка источник отправляет неполное сообщение

Я посылаю байты в виде массива байтовых строк через akka Source и Tcp, длина полного массива, который включает в себя: begin ++ len ++ gzip ++ sign ++ term 997, но только 710 байт достигают сервера. Код здесь:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source, Tcp}
import akka.util.ByteString
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}


class UpperServiceClient(ip: String, port: Int){
  def run = {
    implicit val system = ActorSystem("ClientSys")
    implicit val materializer = ActorMaterializer()

    ...
    // Initializing of (begin, len, gzip, sign, term) arrays
    ...

    val conn = Tcp().outgoingConnection(ip, port)
    val res: Future[ByteString] = Source(begin ++ len ++ gzip ++ sign ++ term).via(conn).
      runFold(ByteString.empty) { (acc, in) => acc ++ in }

    val resp = Await.result(res, 3.seconds)
  }
}

Сервер принимает и успешно обрабатывает первые 710 байт. На сервере нет проблем, потому что, когда я пытаюсь отправить эти байты от другого клиента, сообщение завершается. Есть идеи, с чем может быть связана проблема? Или может кто-нибудь посоветует, как разделить сообщение на два и отправить через одно соединение?

1 ответ

Возможным решением было бы построить Source от Iterator значения вместо Array ценности:

val sourceIterator : () => Iterator[ByteString] = 
  () => Iterable.apply(begin, len, gzip, sign, term)
                .map(_.iterator)
                .reduceLeftOption(_ ++ _)
                .getOrElse(Iterator.empty)

val byteStringSource : Source[ByteString,_] = Source fromIterator sourceIterator

Если это не сработает, то я предполагаю, что есть параметр конфигурации (в клиентском akka, или в клиентской ОС, или в сетевом интерфейсе клиента,...), который устанавливает ограничение на размер исходящего сообщения...

Другие вопросы по тегам