Кодек сообщений Kafka - сжатие и распаковка

При использовании kafka я могу установить кодек, установив свойство kafka.compression.codec моего производителя kafka.

Предположим, я использую сжатие snappy в моем производителе, когда я получаю сообщения от kafka с использованием некоторого kafka-потребителя, должен ли я что-то делать для декодирования данных из snappy или это какая-то встроенная функция для потребителя kafka?

В соответствующей документации я не смог найти ни одного свойства, касающегося кодирования у кафки потребителя (это касается только производителя).

Может кто-нибудь это очистить?

3 ответа

Решение

Согласно моему пониманию, декомпрессия сама заботится о Потребителе. Как уже упоминалось на их официальной вики-страницеThe consumer iterator transparently decompresses compressed data and only returns an uncompressed message

Как показано в этой статье, потребитель работает следующим образом.

У потребителя есть фоновые потоки "извлечения", которые непрерывно извлекают данные от брокеров партиями по 1 МБ и добавляют их во внутреннюю очередь блокировки. Поток потребителя удаляет данные из этой очереди блокировки, распаковывает и перебирает сообщения

А также на странице документа под сквозное пакетное сжатие написано, что

Пакет сообщений может быть объединен в сжатом виде и отправлен на сервер в этой форме. Этот пакет сообщений будет записан в сжатом виде и останется сжатым в журнале и будет распакован только потребителем.

Таким образом, кажется, что декомпрессионная часть обрабатывается самим потребителем, все что вам нужно сделать, это предоставить действительный / поддерживаемый тип сжатия, используя compression.codec Атрибут ProducerConfig при создании производителя. Я не смог найти ни одного примера или объяснения, где говорится о каком-либо подходе к декомпрессии со стороны потребителя. Пожалуйста, поправьте меня, если я ошибаюсь.

У меня есть небольшие сомнения относительно распаковки на стороне потребителя kafka, если производитель kafka отправляет сжатый поток (или GZIP или SNAPPY). Похоже, что потребитель kafka прозрачно сделал декомпрессию сжатого потока на стороне потребителя. Пожалуйста, поправьте меня в эту погоду, я не уверен, что здесь.

Или есть какой-нибудь распакованный пример на стороне потребителя кафки, если мои рассуждения неверны?

У меня та же проблема с v0.8.1, и это декомпрессия сжатия в Kafka плохо документирована, за исключением того, что Потребитель должен "прозрачно" распаковывать сжатые данные, которые он НИКОГДА не делал.

Пример клиентского клиента высокого уровня, использующего ConsumerIterator на веб-сайте Kafka, работает только с несжатыми данными. Как только я включаю сжатие в клиенте Producer, сообщение никогда не попадает в следующий цикл while. Надеемся, что они должны решить эту проблему как можно скорее, или они не должны требовать эту функцию, так как некоторые пользователи могут использовать Kafka для транспортировки сообщений большого размера, которые требуют возможности пакетирования и сжатия.

ConsumerIterator <byte[], byte[]> it = stream.iterator();
while(it.hasNext())
{
   String message = new String(it.next().message());
}
Другие вопросы по тегам