Кафка-потребитель 0.9 обратно совместим?

Будет ли будущий потребитель kafka 0.9.x совместимым с брокером 0.8?

Другими словами - можно только переключиться на новую потребительскую реализацию, не касаясь ничего другого?

5 ответов

Решение

Согласно документации Kafka 0.9.0, вы не можете использовать нового потребителя для чтения данных от брокеров 0.8.x. Причина в следующем:

0.9.0.0 имеет изменение протокола межброкерского взаимодействия по сравнению с предыдущими версиями.

Нет. Как правило, рекомендуется обновлять брокеров до того, как клиенты будут работать, поскольку они ориентированы на обратную совместимость. Брокер 0,9 будет работать как с API-интерфейсами 0,8, так и 0,9, но не наоборот.

Недавно я столкнулся с подобной проблемой, когда в моем приложении мне приходилось читать из Кафки 0,9, а затем записывать обратно в Кафку 0,8. Я использовал kafka client 0.9 следующим образом.

Consumer Config

    props.put("bootstrap.servers", "brokers_ip as comma seperated values");
    props.put("group.id", "your group id");
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", 1000);
    props.put("session.timeout.ms", 30000);
    consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe("List of topics to subscribe too");

Производитель Конфиг

        Properties props = new Properties();
        props.put("bootstrap.servers","list of broker ips");
        props.put("metadata.broker.list", "list of broker ips");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        String message = "hello world";
        KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic_name, message);
        producer.send(data);
        producer.close();

Надеюсь это поможет.

Основанный на этой вики-странице Consumer Client Re-design, которая цитирует,

Это повлечет за собой некоторые существенные изменения в пользовательских API *, поэтому мы хотели бы получить отзывы о предложении от нашего сообщества. Поскольку список изменений не мал, нам хотелось бы понять, являются ли некоторые функции предпочтительными по сравнению с другими, и что более важно, если некоторые функции не требуются вообще.

* Акцент мой.

Я не нашел нигде, конкретно заявляя о несовместимости. Но, используя эту цитату и тот факт, что производитель в 0.8 не был совместим с производителем в 0.7, я предполагаю, что они не совместимы.

Похоже, что в Кафке 0.9.0 встроена обратная совместимость. Проверьте http://kafka.apache.org/documentation.html

Цитирование из документации

В версии 0.9.0.0 есть потенциальные критические изменения (пожалуйста, ознакомьтесь с ними перед обновлением) и изменение протокола посредника между предыдущими версиями. Для непрерывного обновления:

  • Обновите файл server.properties на всех посредниках и добавьте следующее свойство: inter.broker.protocol.version = 0.8.2.X
  • Обновите брокеров. Это можно сделать брокеру за раз, просто выключив его, обновив код и перезапустив его.
  • После обновления всего кластера измените версию протокола, отредактировав файл inter.broker.protocol.version и установив для него значение 0.9.0.0.
  • Перезапустите посредников по одному, чтобы новая версия протокола вступила в силу
Другие вопросы по тегам