Akka Stream TCP + производитель Akka Stream Kafka не прекращает публиковать сообщения и не выводить сообщения об ошибках
У меня есть следующий поток:
Source(IndexedSeq(ByteString.empty))
.via(
Tcp().outgoingConnection(bsAddress, bsPort)
.via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
.map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
Producer.plainSink(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
)
).onComplete {
case Success(Done) => printAndByeBye("Stream ends successfully")
case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
}
Некоторое время он работает нормально, и я могу использовать сообщения, опубликованные по теме Кафки. Но время от времени, по-видимому, через произвольный интервал, больше не публикуется сообщений, и этот код не регистрирует никаких ошибок (printAndByeBye напечатает переданное сообщение и завершит работу системы актера.) После перезапуска приложения сообщения продолжают течь.
Есть идеи, как узнать, что здесь происходит?
Изменить: я положил Камон на это, и я мог видеть следующее поведение:
Похоже, что-то остановлено, не сообщая, что поток должен остановиться, но я не знаю, как сделать это явным и остановить поток.
3 ответа
Поток не прерывался, но поток TCP вышел из режима ожидания, так как устройство, публикующее данные, через некоторое время перестает отправлять данные без разрыва соединения. Вместо использования более простого:
TCP().outgoingConnection(bsAddress, bsPort)
Я заканчиваю тем, что использую:
def outgoingConnection(
remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[SocketOption] = Nil,
halfClose: Boolean = true,
connectTimeout: Duration = Duration.Inf,
idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = ???
так
Tcp().outgoingConnection(bsAddress, bsPort)
стал
val connectTimeout: Duration = 1 second
val idleTimeout: Duration = 2 second
Tcp().outgoingConnection(
remoteAddress = InetSocketAddress.createUnresolved(bsAddress, bsPort),
connectTimeout = connectTimeout,
idleTimeout = idleTimeout
)
сообщая idleTimeout, последующее начало сбой и другой поток может быть перезапущен.
Я бы предложил создать поток с атрибутами наблюдения, чтобы обрабатывать возможные исключения в вашем TCP-соединении, например:
val flow =
Tcp().outgoingConnection("", 12)
.via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
.map(_.utf8String).withAttributes(ActorAttributes.supervisionStrategy {
case ex: Throwable =>
println("Error ocurred: " + ex)
Supervision.Resume
}
а также
Source(IndexedSeq(ByteString.empty))
.via(flow)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
Producer.plainSink(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
)
).onComplete {
case Success(Done) => printAndByeBye("Stream ends successfully")
case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
}
Если есть какая-либо ошибка с потоком, поток останавливается. С этой конфигурацией вы увидите, не вызвал ли поток каких-либо исключений.
Если все замолкает, это может быть вызвано противодавлением. Попробуйте и выборочно замените ступени, поддерживающие противодавление, ступенями, не воспринимающими противодавление, и проверьте, сохраняется ли проблема. В вашем случае есть 2 возможных источника противодавления:
1) TCP соединение
Вы можете попробовать и прикрепить бесконечный источник ByteString
к Кафке, делая что-то вроде:
Source.cycle(() => List(???).iterator)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
Producer.plainSink(
ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
)
).onComplete {
case Success(Done) => printAndByeBye("Stream ends successfully")
case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
}
2) мойка Кафка
замените его на некоторые записи
Source(IndexedSeq(ByteString.empty))
.via(
Tcp().outgoingConnection(bsAddress, bsPort)
.via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
.map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runForeach(println)
.onComplete {
case Success(Done) => printAndByeBye("Stream ends successfully")
case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
}
Вы видите проблему только в одном из 2 случаев? В обоих? Ни в одном?