Kafka поток присоединяется с определенным ключом в качестве ввода
У меня есть 3 разные темы с 3 файлами Avro в реестре схемы, я хочу транслировать эти темы, объединять их и записывать в одну тему. проблема в том, что ключ, к которому я хочу присоединиться, отличается от ключа, в который я записываю данные в каждую тему.
Допустим, у нас есть эти 3 файла Avro:
Тревога:
{
"type" : "record",
"name" : "Alarm",
"namespace" : "com.kafkastream.schema.avro",
"fields" : [ {
"name" : "alarm_id",
"type" : "string",
"doc" : "Unique identifier of the alarm."
}, {
"name" : "ne_id",
"type" : "string",
"doc" : "Unique identifier of the network element ID that produces the alarm."
}, {
"name" : "start_time",
"type" : "long",
"doc" : "is the timestamp when the alarm was generated."
}, {
"name" : "severity",
"type" : [ "null", "string" ],
"doc" : "The severity field is the default severity associated to the alarm ",
"default" : null
}]
}
Инцидент:
{
"type" : "record",
"name" : "Incident",
"namespace" : "com.kafkastream.schema.avro",
"fields" : [ {
"name" : "incident_id",
"type" : "string",
"doc" : "Unique identifier of the incident."
}, {
"name" : "incident_type",
"type" : [ "null", "string" ],
"doc" : "Categorization of the incident e.g. Network fault, network at risk, customer impact, etc",
"default" : null
}, {
"name" : "alarm_source_id",
"type" : "string",
"doc" : "Respective Alarm"
}, {
"name" : "start_time",
"type" : "long",
"doc" : "is the timestamp when the incident was generated on the node."
}, {
"name" : "ne_id",
"type" : "string",
"doc" : "ID of specific network element."
}]
}
Техническое обслуживание:
{
"type" : "record",
"name" : "Maintenance",
"namespace" : "com.kafkastream.schema.avro",
"fields" : [ {
"name" : "maintenance_id",
"type" : "string",
"doc" : "The message number is the unique ID for every maintenance"
}, {
"name" : "ne_id",
"type" : "string",
"doc" : "The NE ID is the network element ID on which the maintenance is done."
}, {
"name" : "start_time",
"type" : "long",
"doc" : "The timestamp when the maintenance start."
}, {
"name" : "end_time",
"type" : "long",
"doc" : "The timestamp when the maintenance start."
}]
}
У меня есть 3 темы в моей Kafka для каждого из этих Avro (скажем, alarm_raw, инцидент_raw, maintenance_raw), и всякий раз, когда я хотел написать в эти темы, я использую ne_id в качестве ключа (поэтому тема разделена ne_id). теперь я хочу присоединиться к этим 3 темам, получить новую запись и записать ее в новую тему. Проблема в том, что я хочу присоединиться к Alarm и Incident на основе alarm_id и alarm_source_id и присоединиться к Alarm и Maintenance на основе ne_id. Я хочу избежать создания новой темы и переназначить новый ключ. Есть ли способ указать ключ во время присоединения?
2 ответа
Это зависит от того, какой тип соединения вы хотите использовать (см. https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics).
Для KStream-KStream присоединиться, в настоящее время (v0.10.2
и ранее) не иначе, как установка нового ключа (например, с помощью selectKey()
) и сделайте перераспределение.
Для KStream-KTable присоединиться, Кафка 0.10.2
(будет выпущен в ближайшие недели) содержит новую функцию под названием GlobalKTables
(см. https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams). Это позволяет вам выполнять неключевое соединение для KTable (т. Е. Соединение KStream-GlobalKTable, и, таким образом, вам не нужно перераспределять данные в вашей GlobalKTable).
Примечание. Соединение KStream-GlobalKTable имеет другую семантику, чем соединение KStream-KTable. Это не синхронизировано по времени в отличие от более поздних, и, таким образом, соединение является недетерминированным по конструкции в отношении обновлений GlobalKTable; т.е. нет никакой гарантии, что запись KStream будет первой, кто "увидит" обновления GlobalKTable и, таким образом, присоединится к обновленной записи GlobalKTable.
Также планируется добавить соединение KTable-GlobalKTable. Это может стать доступным в 0.10.3
, Однако нет никаких планов по добавлению "глобальных" объединений KStream-KStream.
Вы можете сохранить тот же ключ, изменив его.
Ты можешь использовать KeyValueMapper
с помощью которого вы можете изменить свой ключ, а также значение.
Вы должны использовать его следующим образом:
val modifiedStream = kStream.map[String,String](
new KeyValueMapper[String, String,KeyValue[String,String]]{
override def apply(key: String, value: String): KeyValue[String, String] = new KeyValue("modifiedKey", value)
}
)
Вы можете применить вышеупомянутую логику на нескольких Kstream
объекты для поддержания единого ключа для присоединения KStream
s.