Apache Kafka: воспроизведение сообщений в теме
Я рассматриваю возможность использования Apache Kafka в качестве хранилища событий для хранения событий в микросервисе.
Одна вещь, которую я читаю в различных блогах, состоит в том, что Кафку можно рассматривать как единственный источник правды, где журнал Кафки будет хранить все события для данной темы.
Мне было интересно, есть ли у Кафки возможность воспроизводить сообщения с начала времен (например, в случае сбоя жесткого диска / сети, например)?
(обратите внимание, что я вижу, что в папке /tmp в каталоге темы хранятся некоторые журналы). Кто-нибудь знает какие-либо команды (если таковые имеются), которые могут быть вызваны для воспроизведения сообщений в теме?
1 ответ
Да, вы можете искать конкретное смещение, но
начало времени
зависит от темы или конфигурации брокера. IIRC, срок хранения по умолчанию составляет 7 дней.
Обратитесь к документации Kafka.
Да, вы можете воспроизвести сообщение. Поскольку у потребителя есть контроль над сбросом смещения. Вы можете начать чтение сообщений с самого начала или, если вы знаете какое-либо существующее значение смещения, вы также можете прочитать его оттуда. Как только сообщение будет зафиксировано, оно будет в теме, пока не истечет срок его хранения. Срок хранения по умолчанию составляет 7 дней, но вы можете изменить его в любой момент.
В зависимости от вашего варианта использования вы можете настроить темы событий так, чтобы они никогда не истекли. Здесь есть несколько соображений:
1) Вы можете повторно обработать исходный поток из заданной темы Kafka только в течение срока хранения исходной темы. Например, если событие происходит и создается в исходной теме в момент времени t0, а срок хранения этой темы равен t1, то это событие может быть обработано до t0 + t1.
2) если сообщения входной темы не являются независимыми, то есть это не поток записей только для добавления, но сообщения логически зависят друг от друга (об этом вы можете прочитать в https://kafka.apache.org/0110/documentation/streams/developer-guide), тогда вы не сможете логически обработать его в ЛЮБОЙ данный момент истории. Например, если ваш входной исходный поток на самом деле представляет собой поток, похожий на журнал изменений, такой, что событие "user-create" происходит в t0, а событие "user-update" происходит в t2, тогда, если вы выполняете повторную обработку во время t1, где t0
Как говорится, если ваши события независимы, то обработка данных довольно проста. Вы можете даже истечь старые сегменты. Но если ваши сообщения взаимозависимы, возможно, у вас не будет возможности истечь старые записи. Для получения дополнительной информации о сохранении см. Этот список рассылки.
То, как вы пишете свою логику обработки и механизм, который вы используете для обработки состояния, также важно учитывать. Предположим, вы используете Kafka Streams для обработки событий в состоянии, которое вы запрашиваете. Если вы хотите иметь возможность повторно обрабатывать события, чтобы перестроить это состояние, в дополнение к возможности обрабатывать события в реальном времени во время события, ваш механизм обработки потоков должен быть способен координировать воспроизведение исторических событий во время обработки таким образом, чтобы они синхронизируются так же, как и во время оперативной обработки, так что логика обработки обрабатывает эти события одинаково. В настоящее время возможности Kafka Streams в этом отделе ограничены, поэтому вы должны написать топологию, которая выполняет эту координацию. Похоже, что Apache Beam не предлагает много и в этом отделе. В блоге упоминается:
Общей характеристикой архивных данных является то, что они могут поступать радикально не в порядке. Разделение архивных файлов часто приводит к совершенно другому порядку обработки, чем события, приходящие почти в реальном времени. Все данные также будут доступны и, следовательно, мгновенно доставлены с точки зрения вашего конвейера. Независимо от того, проводите ли вы эксперименты с предыдущими данными или повторно обрабатываете прошлые результаты, чтобы исправить ошибку обработки данных, крайне важно, чтобы ваша логика обработки была применима к архивным событиям так же легко, как и поступающие данные почти в реальном времени.
... но в документации по Beam вообще не упоминается время или историческая обработка. Поэтому я считаю, что акцент в этом параграфе должен быть сделан на том, что "крайне важно, чтобы ваша логика обработки была применима к архивированным событиям так же легко, как и поступающие данные почти в реальном времени".
Поэтому имеет смысл рассматривать историческую повторную обработку как общий случай, а временную обработку событий как частный случай. Чтобы написать логику, которая применима к обоим режимам обработки:
- Вы должны знать, какие операции в вашем потоковом процессоре дают детерминированные результаты как в историческом, так и в реальном режимах обработки, а какие нет. Например, в Kafka Streams не все операции дают одинаковые результаты как в реальном времени, так и в хронологической обработке.
- Вы можете захотеть, чтобы ваша логика обработки не зависела от таких факторов времени выполнения, как
System.currentTimeMillis()
или генерировать случайные идентификаторы, которые не будут одинаковыми для каждого прогона (если, конечно, вы не хотите, чтобы они отличались каждый раз).