Kafka Mirror Maker не может воспроизвести тему __consumer_offset
Я пытаюсь использовать зеркало производителя для репликации __consumre_offsets
тема наряду с другими темами.
Это дает ошибку, как указано ниже.
[2018-10-24 16:16: 03,802] ОШИБКА Ошибка при отправке сообщения в тему __consumer_offsets с ключом:16 байтов, значение: 445 байтов с ошибкой: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.InvalidTopicException: запрос попытался выполнить операцию с недопустимой темой. [2018-10-24 16:16:03,828] ОШИБКА Ошибка при отправке сообщения в тему __consumer_offsets с ключом: 29 байтов, значение: 754 байта с ошибкой: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) org.apache.kafka.common.errors.InvalidTopicException: запрос попытался выполнить операцию с недопустимой темой.
Есть ли способ решить эту проблему?
В одной из презентаций на SlideShare на слайде 21 он упомянул о репликации смещенной темы в нескольких центрах обработки данных. Может кто-нибудь сказать мне возможный способ достичь того же?
Или есть какой-то другой лучший способ для резервного копирования и восстановления политики для Kafka.
0 ответов
Добавьте это в ваш consumer.config:
exclude.internal.topics=false
И добавьте это в ваш файл provider.config:
client.id=__admin_client
Причина в том, что производители не могут писать на внутренние темы, если вы не заявите client.id=__admin_client
который используется AdminClient.scala. Найдено здесь: https://issues.apache.org/jira/browse/KAFKA-6524
@amdelamar @Abhisek Verma Это работает для тебя. Как было предложено выше, я попробовал те же настройки в своих файлах конфигурации потребителя и производителя соответственно, и он по-прежнему не работает с той же ошибкой. Он начинает копировать темы и по прошествии определенного времени продолжает давать сбой.
consumer.config
bootstrap.servers=https://kafka:9093
**exclude.internal.topics=false**
client.id=mirror_maker_consumer1
group.id=mirror_maker_consumer1
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="****" password="********";
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
производитель.config
bootstrap.servers=https://kafka-*****:9093
acks=1
**client.id=__admin_client**
batch.size=100
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="**************" password="****";
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
ОШИБКА Ошибка при отправке сообщения в тему __consumer_offsets с ключом: 62 байта, значение: 28 байтов с ошибкой: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)