Kafka Mirror Maker не может воспроизвести тему __consumer_offset

Я пытаюсь использовать зеркало производителя для репликации __consumre_offsets тема наряду с другими темами.

Это дает ошибку, как указано ниже.

[2018-10-24 16:16: 03,802] ОШИБКА Ошибка при отправке сообщения в тему __consumer_offsets с ключом:16 байтов, значение: 445 байтов с ошибкой: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.InvalidTopicException: запрос попытался выполнить операцию с недопустимой темой. [2018-10-24 16:16:03,828] ОШИБКА Ошибка при отправке сообщения в тему __consumer_offsets с ключом: 29 байтов, значение: 754 байта с ошибкой: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.InvalidTopicException: запрос попытался выполнить операцию с недопустимой темой.

Есть ли способ решить эту проблему?

В одной из презентаций на SlideShare на слайде 21 он упомянул о репликации смещенной темы в нескольких центрах обработки данных. Может кто-нибудь сказать мне возможный способ достичь того же?

Или есть какой-то другой лучший способ для резервного копирования и восстановления политики для Kafka.

0 ответов

Добавьте это в ваш consumer.config:

exclude.internal.topics=false

И добавьте это в ваш файл provider.config:

client.id=__admin_client


Причина в том, что производители не могут писать на внутренние темы, если вы не заявите client.id=__admin_client который используется AdminClient.scala. Найдено здесь: https://issues.apache.org/jira/browse/KAFKA-6524

@amdelamar @Abhisek Verma Это работает для тебя. Как было предложено выше, я попробовал те же настройки в своих файлах конфигурации потребителя и производителя соответственно, и он по-прежнему не работает с той же ошибкой. Он начинает копировать темы и по прошествии определенного времени продолжает давать сбой.

consumer.config

bootstrap.servers=https://kafka:9093
**exclude.internal.topics=false** 
client.id=mirror_maker_consumer1 
group.id=mirror_maker_consumer1
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="****" password="********";
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS

производитель.config

bootstrap.servers=https://kafka-*****:9093
acks=1
**client.id=__admin_client**
batch.size=100
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="**************" password="****";
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS

ОШИБКА Ошибка при отправке сообщения в тему __consumer_offsets с ключом: 62 байта, значение: 28 байтов с ошибкой: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

Другие вопросы по тегам