Как обрабатывать и сортировать большие сообщения в Reactive Kafka с помощью Akka-Stream

Можно ли при отправке большого файла с помощью Kafka распределить его по разделам, а затем повторно собрать с помощью Akka-Stream? как описано в этой презентации:

http://www.slideshare.net/JiangjieQin/handle-large-messages-in-apache-kafka-58692297

1 ответ

Решение

"Чанкинг", то есть продюсер, достаточно легко написать, используя что-то вроде реактивной кафки:

case class LargeMessage(bytes : Seq[Byte], topic : String)

def messageToKafka(message : LargeMessage, maxMessageSize : Int) = 
  Source.fromIterator(() => message.bytes.toIterator)
        .via(Flow[Byte].grouped(maxMessageSize))
        .via(Flow[Seq[Byte]].map(seq => new ProducerRecord(message.topic, seq)))
        .runWith(Producer.plainSink(producerSettings)

"Сборка", т. Е. Потребителя, может быть реализована способом, аналогичным документации:

   val messageFut : Future[LargeMessage] = 
     for {
       bytes <- Consumer.map(_._1).runWith(Sink.seq[Byte])
     } yield LargeMessage(bytes, topic)
Другие вопросы по тегам