Как я могу отправлять большие сообщения с Kafka (более 15 МБ)?

Я отправляю String-сообщения в Kafka V. 0.8 с API Java Producer. Если размер сообщения составляет около 15 МБ, я получаю MessageSizeTooLargeException, Я пытался установить message.max.bytesдо 40 МБ, но я все равно получаю исключение. Небольшие сообщения работали без проблем.

(Исключение появляется у производителя, у меня нет приложения в этом приложении.)

Что я могу сделать, чтобы избавиться от этого исключения?

Мой пример конфиг производителя

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Журнал ошибок:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)

9 ответов

Решение

Вам нужно настроить три (или четыре) свойства:

  • Потребительская сторона:fetch.message.max.bytes - это определит наибольший размер сообщения, которое может быть получено потребителем.
  • Брокерская сторона: replica.fetch.max.bytes - это позволит репликам в посредниках отправлять сообщения в кластере и обеспечивать правильную репликацию сообщений. Если это слишком мало, то сообщение никогда не будет реплицировано, и, следовательно, потребитель никогда не увидит сообщение, потому что сообщение никогда не будет зафиксировано (полностью реплицировано).
  • Брокерская сторона: message.max.bytes - это самый большой размер сообщения, которое может получить брокер от производителя.
  • Сторона брокера (по теме): max.message.bytes - это самый большой размер сообщения, которое брокер разрешает добавить в тему. Этот размер проверяется предварительным сжатием. (По умолчанию для брокера message.max.bytes.)

Я выяснил сложный способ, связанный с номером 2: вы не получаете НИКАКИХ исключений, сообщений или предупреждений от Kafka, поэтому обязательно учитывайте это при отправке больших сообщений.

Незначительные изменения, необходимые для Kafka 0.10 и нового потребителя, по сравнению с ответом смеха:

  • Брокер: Без изменений, вам все еще нужно увеличить свойства message.max.bytes а также replica.fetch.max.bytes, message.max.bytes должен быть равен или меньше (*) чем replica.fetch.max.bytes,
  • Производитель: Увеличение max.request.size отправить сообщение большего размера.
  • Потребитель: увеличение max.partition.fetch.bytes получать большие сообщения.

(*) Прочитайте комментарии, чтобы узнать больше о message.max.bytes<= replica.fetch.max.bytes

Ответ @laughing_man довольно точный. Но все же я хотел дать рекомендацию.

Кафка не предназначена для обработки больших сообщений.

Я рекомендую, чтобы ваш API использовал облачное хранилище Ex S3 и просто отправил Kafka или любому брокеру сообщений ссылку на S3. Вы должны найти где-то для сохранения ваших данных, может быть, это сетевой диск, может быть, что угодно, но это не должно быть брокер сообщений.

Теперь, если вы не хотите идти с вышеупомянутым решением

Максимальный размер сообщения составляет 1 МБ (настройка в ваших брокерах называется message.max.bytes) Apache Kafka. Если вам это действительно нужно, вы можете увеличить это, а также увеличить сетевые буферы для своих производителей и потребителей.

И если вы действительно хотите разделить свое сообщение, убедитесь, что каждое разделение сообщения имеет один и тот же ключ, чтобы оно передавалось в один и тот же раздел, а содержимое вашего сообщения должно содержать "идентификатор части", чтобы ваш потребитель мог полностью восстановить сообщение.,

Вы также можете изучить сжатие, если ваше сообщение основано на тексте (gzip, snappy, lz4 сжатие), что может уменьшить размер данных, но не волшебным образом.

Опять же, вы должны использовать внешнюю систему для хранения этих данных и просто отправить внешнюю ссылку на Кафку. Это очень распространенная архитектура, и она должна быть принята и широко принята.

Имейте в виду, что Кафка работает лучше всего, только если сообщения огромны по объему, но не по размеру.

Идея состоит в том, чтобы иметь одинаковый размер сообщения, отправляемого от Kafka Producer в Kafka Broker, а затем полученного Kafka Consumer, т.е.

Кафка производитель -> Кафка Брокер -> Кафка Потребитель

Предположим, что если требуется отправить 15 МБ сообщения, то все три источника должны быть синхронизированы между производителем, брокером и потребителем.

Kafka Producer отправляет 15 МБ -> Kafka Broker разрешает / сохраняет 15 МБ -> Kafka Consumer получает 15 МБ

Поэтому значение должно быть A.) На брокере: message.max.bytes = 15728640 replica.fetch.max.bytes = 15728640

B.) на потребителя: fetch.message.max.bytes=15728640

Вам необходимо переопределить следующие свойства:

Конфигурации брокера ($ KAFKA_HOME / config / server.properties)

  • replica.fetch.max.bytes
  • message.max.bytes

Конфигурации потребителя ($KAFKA_HOME/config/consumer.properties)
Этот шаг не работает для меня. Я добавил его в приложение для потребителей, и оно работало нормально

  • fetch.message.max.bytes

Перезагрузите сервер.

посмотрите на эту документацию для получения дополнительной информации: http://kafka.apache.org/08/configuration.html

Я думаю, что большинство ответов здесь устарели или не совсем полны.

Чтобы сослаться на ответ Саши Веттер (с обновлением для Kafka 0.10), я хотел бы предоставить некоторую дополнительную информацию и ссылки на официальную документацию.


Конфигурация производителя:

  • (Ссылка) должна быть увеличена для файлов размером более 1 МБ, иначе они будут отклонены

Конфигурация брокера / темы:

  • (Ссылка) может быть установлена, если вы хотите увеличить размер сообщения на уровне брокера. Но из документации: «Это может быть установлено для каждой темы с конфигурацией max.message.bytes на уровне темы».
  • (Ссылка) может быть увеличена, если только одна тема может принимать лагерные файлы. Конфигурацию брокера изменять нельзя.

Я всегда предпочитаю конфигурацию с ограничением темы, потому что я могу настроить тему самостоятельно как клиент для кластера Kafka (например, с клиентом администратора ). Я не могу иметь никакого влияния на саму конфигурацию брокера.


В ответах выше при необходимости упоминаются еще несколько конфигураций:

Из документации: «Это не абсолютный максимум, если первый пакет записей в первом непустом разделе выборки больше, чем это значение, пакет записей все равно будет возвращен, чтобы гарантировать, что прогресс может быть достигнут».

Из документации: «Записи выбираются в пакетах потребителем. Если первый пакет записей в первом непустом разделе выборки превышает этот предел, пакет все равно будет возвращен, чтобы гарантировать, что потребитель может продолжить. "

  • (Ссылка) (Конфигурация потребителя; выше не упоминается, но та же категория)

Из документации: «Записи выбираются в пакетах потребителем, и если первый пакет записей в первом непустом разделе выборки больше, чем это значение, пакет записей все равно будет возвращен, чтобы гарантировать, что потребитель может сделать прогресс."


Вывод: конфигурации, касающиеся выборки сообщений, не нужно изменять для обработки сообщений, больше, чем значения по умолчанию для этой конфигурации (если бы это было протестировано в небольшой установке). Вероятно, потребитель всегда может получить пакеты размером 1. Однако необходимо установить две конфигурации из первого блока, как упоминалось в ответах ранее.

Это пояснение не должно ничего рассказывать о производительности и не должно быть рекомендацией устанавливать или не устанавливать эту конфигурацию. Лучшие значения следует оценивать индивидуально в зависимости от конкретной запланированной пропускной способности и структуры данных.

Одна ключевая вещь, чтобы помнить это message.max.bytes атрибут должен быть синхронизирован с потребителем fetch.message.max.bytes имущество. размер выборки должен быть не меньше максимального размера сообщения, в противном случае могут возникнуть ситуации, когда производители могут отправлять сообщения больше, чем потребитель может использовать / извлечь. Возможно, стоит взглянуть на это.
Какую версию Кафки вы используете? Также предоставьте более подробную информацию о том, что вы получаете. есть что-то вроде... payload size of xxxx larger than 1000000 подойдя в журнале?

Для людей, использующих landoop kafka: вы можете передавать значения конфигурации в переменных среды, например:

docker run -d --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083  -p 9581-9585:9581-9585 -p 9092:9092
 -e KAFKA_TOPIC_MAX_MESSAGE_BYTES=15728640 -e KAFKA_REPLICA_FETCH_MAX_BYTES=15728640  landoop/fast-data-dev:latest `

И если вы используете rdkafka, то передайте message.max.bytes в конфигурации производителя, например:

  const producer = new Kafka.Producer({
        'metadata.broker.list': 'localhost:9092',
        'message.max.bytes': '15728640',
        'dr_cb': true
    });

Аналогичным образом для потребителя

  const kafkaConf = {
   "group.id": "librd-test",
   "fetch.message.max.bytes":"15728640",
   ... .. }                                                                                                                                                                                                                                                      

Вот как я успешно отправил данные размером до 100 МБ, используя kafka-python==2.0.2:

Маклер:

      consumer = KafkaConsumer(
    ...
    max_partition_fetch_bytes=max_bytes,
    fetch_max_bytes=max_bytes,         
)

Производитель (см. Окончательное решение в конце):

      producer = KafkaProducer(
    ...
    max_request_size=KafkaSettings.MAX_BYTES,
)

Потом:

      producer.send(topic, value=data).get()

После отправки таких данных появилось следующее исключение:

MessageSizeTooLargeError: The message is n bytes when serialized which is larger than the total memory buffer you have configured with the buffer_memory configuration.

Наконец я увеличил buffer_memory (по умолчанию 32 МБ), чтобы получить сообщение на другом конце.

      producer = KafkaProducer(
    ...
    max_request_size=KafkaSettings.MAX_BYTES,
    buffer_memory=KafkaSettings.MAX_BYTES * 3,
)
Другие вопросы по тегам