Akka Stream TCP + производитель Akka Stream Kafka не прекращает публиковать сообщения и не выводить сообщения об ошибках

У меня есть следующий поток:

Source(IndexedSeq(ByteString.empty))
.via(
  Tcp().outgoingConnection(bsAddress, bsPort)
    .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
    .map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
  Producer.plainSink(
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
  )
).onComplete {
    case Success(Done) => printAndByeBye("Stream ends successfully")
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
  }

Некоторое время он работает нормально, и я могу использовать сообщения, опубликованные по теме Кафки. Но время от времени, по-видимому, через произвольный интервал, больше не публикуется сообщений, и этот код не регистрирует никаких ошибок (printAndByeBye напечатает переданное сообщение и завершит работу системы актера.) После перезапуска приложения сообщения продолжают течь.

Есть идеи, как узнать, что здесь происходит?

Изменить: я положил Камон на это, и я мог видеть следующее поведение:

Размер почтового ящика на одного актера

Время в почтовом ящике на одного актера

Время обработки на одного актера

Похоже, что-то остановлено, не сообщая, что поток должен остановиться, но я не знаю, как сделать это явным и остановить поток.

3 ответа

Решение

Поток не прерывался, но поток TCP вышел из режима ожидания, так как устройство, публикующее данные, через некоторое время перестает отправлять данные без разрыва соединения. Вместо использования более простого:

TCP().outgoingConnection(bsAddress, bsPort)

Я заканчиваю тем, что использую:

def outgoingConnection(
remoteAddress:  InetSocketAddress,
localAddress:   Option[InetSocketAddress]           = None,
options:        immutable.Traversable[SocketOption] = Nil,
halfClose:      Boolean                             = true,
connectTimeout: Duration                            = Duration.Inf,
idleTimeout:    Duration                            = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = ???

так

Tcp().outgoingConnection(bsAddress, bsPort)

стал

val connectTimeout: Duration = 1 second
val idleTimeout: Duration = 2 second
Tcp().outgoingConnection(
    remoteAddress = InetSocketAddress.createUnresolved(bsAddress, bsPort),
    connectTimeout = connectTimeout,
    idleTimeout = idleTimeout
  )

сообщая idleTimeout, последующее начало сбой и другой поток может быть перезапущен.

Я бы предложил создать поток с атрибутами наблюдения, чтобы обрабатывать возможные исключения в вашем TCP-соединении, например:

val flow = 
    Tcp().outgoingConnection("", 12)
          .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
          .map(_.utf8String).withAttributes(ActorAttributes.supervisionStrategy {
      case ex: Throwable =>
        println("Error ocurred: " + ex)
        Supervision.Resume
     }

а также

Source(IndexedSeq(ByteString.empty))
.via(flow)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
  Producer.plainSink(
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
  )
).onComplete {
    case Success(Done) => printAndByeBye("Stream ends successfully")
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
  }

Если есть какая-либо ошибка с потоком, поток останавливается. С этой конфигурацией вы увидите, не вызвал ли поток каких-либо исключений.

Если все замолкает, это может быть вызвано противодавлением. Попробуйте и выборочно замените ступени, поддерживающие противодавление, ступенями, не воспринимающими противодавление, и проверьте, сохраняется ли проблема. В вашем случае есть 2 возможных источника противодавления:

1) TCP соединение

Вы можете попробовать и прикрепить бесконечный источник ByteString к Кафке, делая что-то вроде:

Source.cycle(() => List(???).iterator)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
  Producer.plainSink(
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
  )
).onComplete {
    case Success(Done) => printAndByeBye("Stream ends successfully")
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
  }

2) мойка Кафка

замените его на некоторые записи

Source(IndexedSeq(ByteString.empty))
.via(
  Tcp().outgoingConnection(bsAddress, bsPort)
    .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
    .map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runForeach(println)
.onComplete {
    case Success(Done) => printAndByeBye("Stream ends successfully")
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
  }

Вы видите проблему только в одном из 2 случаев? В обоих? Ни в одном?

Другие вопросы по тегам