Как настроить кафку так, чтобы у нас была возможность читать с самого раннего, самого последнего, а также с любого заданного смещения?

Я знаю о настройке кафки для чтения из самого раннего или последнего сообщения. Как включить дополнительную опцию на случай, если мне нужно прочитать предыдущее смещение? Причина, по которой мне нужно это сделать, заключается в том, что более ранние сообщения, которые были прочитаны, необходимо снова обработать из-за некоторой ошибки в логике обработки ранее.

2 ответа

Я пытаюсь ответить на похожий, но не совсем тот же вопрос, поэтому давайте посмотрим, может ли моя информация помочь вам.

Во-первых, я работал с этим другим SO вопрос / ответ

Короче говоря, вы хотите зафиксировать свои смещения, и наиболее распространенным решением для этого является ZooKeeper. Поэтому, если ваш потребитель сталкивается с ошибкой или ему необходимо выключиться, он может возобновить работу с того места, где остановился.

Я сам работаю с потоком большого объема, который чрезвычайно велик, и мой потребитель (для теста) должен начинать каждый раз с самого начала. Документация указывает, что я должен использовать KafkaConsumer, чтобы объявить мою отправную точку.

Я постараюсь обновить свои выводы здесь, когда они будут успешными и надежными. Наверняка это решенная проблема.

В клиенте java kafka есть несколько методов для потребителя kafka, которые можно использовать для указания следующей позиции потребления.

public void seek(раздел TopicPartition, длинное смещение)

Переопределяет смещения выборки, которые потребитель будет использовать в следующем опросе (тайм-аут). Если этот API вызывается для одного и того же раздела более одного раза, последнее смещение будет использовано при следующем опросе (). Обратите внимание, что вы можете потерять данные, если этот API произвольно используется в середине потребления, чтобы сбросить смещения выборки

Этого достаточно, и есть также seekToBeginning и seekToEnd.

Другие вопросы по тегам