Kafka Подключиться, чтобы сохранить тему в индексе Elasticsearch, используя поле из сообщения (json)
Я пытаюсь индексировать сообщения в Elasticsearch, используя SMT только из API Connect Kafka.
До сих пор мне повезло с простым использованием функциональности роутера темы и метки времени. Однако теперь я хотел бы создать отдельные индексы на основе определенного поля в сообщении.
Предположим, что сообщения отформатированы так:
{"productId": 1, "category": "boat", "price": 135000}
{"productId": 1, "category": "helicopter", "price": 300000}
{"productId": 1, "category": "car", "price": 25000}
Можно ли каким-то образом индексировать их по следующим показателям в зависимости от категории продукта?
- продукт лодка
- продукт-вертолет
- продукт-автомобиль
или мне придется создавать отдельные темы для каждой категории (зная, что их может стать сотни или тысячи)?
Наблюдаю ли я за преобразованием, которое могло бы это сделать, или это просто невозможно, и нужно ли создавать собственный компонент?
2 ответа
Если вы используете Confluent Platform
Вы можете сделать некоторую маршрутизацию в зависимости от значения поля в сообщении.
Для этого вы должны использовать ExtractTopic
SMT из Confluent. Более подробную информацию об этом SMT можно найти по адресу https://docs.confluent.io/current/connect/transforms/extracttopic.html
Kafka Sink Connector обрабатывает сообщения, которые представлены SinkRecord
, каждый SinkRecord
содержит несколько полей: topic
, partition
, value
, key
и т. д. Эти поля устанавливаются Kafka Connect, и с помощью преобразования вы можете изменить это значение. ExtractTopic
SMT меняет значение topic
основанный на value
или же key
сообщения.
Конфигурация трансформации будет примерно такой:
{
...
"transforms": "ExtractTopic",
"transforms.ExtractTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
"transforms.ExtractTopic.field": "name", <-- name of field, that value will be used as index name
...
}
Одним из ограничений является то, что вы должны создавать индексы заранее.
Как я предполагаю, что вы используете Elasticsearch Sink Connector. Elasticsearch-коннектор имеет возможность создавать индексы, но он делает это при открытии - метод для создания писателей для определенного раздела (ElasticsearchSinkTask::open
). В вашем случае в данный момент все индексы не могут быть созданы, потому что значения всех сообщений недоступны.
Может быть, это не самый чистый подход, потому что ExtractTopic
скорее следует использовать для разъемов источника, но в вашем случае это может работать.
С Kafka Connect нет ничего такого, что могло бы сделать это. У вас есть несколько вариантов:
- Соединитель приемника Elasticsearch будет направлять сообщения в целевой индекс на основе его темы, поэтому вы можете написать собственный SMT, который будет проверять сообщение и соответственно направлять его в другую тему.
- Используйте потоковый процессор для предварительной обработки сообщений, чтобы они уже были на разные темы к тому времени, когда они потребляются соединителем приемника Elasticsearch. Например, Kafka Streams или KSQL.
- KSQL вам нужно было бы жестко кодировать каждую категорию (
CREATE STREAM product-boat AS SELECT * FROM messages WHERE category='boat'
так далее) - У Kafka Streams теперь есть динамическая маршрутизация ( KIP-303), которая была бы более гибким способом сделать это
- KSQL вам нужно было бы жестко кодировать каждую категорию (
- Ручной код специального соединителя приемника Elasticsearch с кодированной логикой для маршрутизации сообщений в индексы на основе содержимого сообщения. Это похоже на худший из трех подходов ИМО.