Kafka поток присоединяется с определенным ключом в качестве ввода

У меня есть 3 разные темы с 3 файлами Avro в реестре схемы, я хочу транслировать эти темы, объединять их и записывать в одну тему. проблема в том, что ключ, к которому я хочу присоединиться, отличается от ключа, в который я записываю данные в каждую тему.

Допустим, у нас есть эти 3 файла Avro:
Тревога:

{
  "type" : "record",
  "name" : "Alarm",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "alarm_id",
    "type" : "string",
    "doc" : "Unique identifier of the alarm."
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "Unique identifier of the  network element ID that produces the alarm."
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "is the timestamp when the alarm was generated."
  }, {
    "name" : "severity",
    "type" : [ "null", "string" ],
    "doc" : "The severity field is the default severity associated to the alarm ",
    "default" : null
  }]
}

Инцидент:

{
  "type" : "record",
  "name" : "Incident",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "incident_id",
    "type" : "string",
    "doc" : "Unique identifier of the incident."
  }, {
    "name" : "incident_type",
    "type" : [ "null", "string" ],
    "doc" : "Categorization of the incident e.g. Network fault, network at risk, customer impact, etc",
    "default" : null
  }, {
    "name" : "alarm_source_id",
    "type" : "string",
    "doc" : "Respective Alarm"
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "is the timestamp when the incident was generated on the node."
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "ID of specific network element."
  }]
}

Техническое обслуживание:

{
  "type" : "record",
  "name" : "Maintenance",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "maintenance_id",
    "type" : "string",
    "doc" : "The message number is the unique ID for every maintenance"
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "The NE ID is the network element ID on which the maintenance is done."
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "The timestamp when the maintenance start."
  }, {
    "name" : "end_time",
    "type" : "long",
    "doc" : "The timestamp when the maintenance start."
  }]
}

У меня есть 3 темы в моей Kafka для каждого из этих Avro (скажем, alarm_raw, инцидент_raw, maintenance_raw), и всякий раз, когда я хотел написать в эти темы, я использую ne_id в качестве ключа (поэтому тема разделена ne_id). теперь я хочу присоединиться к этим 3 темам, получить новую запись и записать ее в новую тему. Проблема в том, что я хочу присоединиться к Alarm и Incident на основе alarm_id и alarm_source_id и присоединиться к Alarm и Maintenance на основе ne_id. Я хочу избежать создания новой темы и переназначить новый ключ. Есть ли способ указать ключ во время присоединения?

2 ответа

Решение

Это зависит от того, какой тип соединения вы хотите использовать (см. https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics).

Для KStream-KStream присоединиться, в настоящее время (v0.10.2 и ранее) не иначе, как установка нового ключа (например, с помощью selectKey()) и сделайте перераспределение.

Для KStream-KTable присоединиться, Кафка 0.10.2 (будет выпущен в ближайшие недели) содержит новую функцию под названием GlobalKTables (см. https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams). Это позволяет вам выполнять неключевое соединение для KTable (т. Е. Соединение KStream-GlobalKTable, и, таким образом, вам не нужно перераспределять данные в вашей GlobalKTable).

Примечание. Соединение KStream-GlobalKTable имеет другую семантику, чем соединение KStream-KTable. Это не синхронизировано по времени в отличие от более поздних, и, таким образом, соединение является недетерминированным по конструкции в отношении обновлений GlobalKTable; т.е. нет никакой гарантии, что запись KStream будет первой, кто "увидит" обновления GlobalKTable и, таким образом, присоединится к обновленной записи GlobalKTable.

Также планируется добавить соединение KTable-GlobalKTable. Это может стать доступным в 0.10.3, Однако нет никаких планов по добавлению "глобальных" объединений KStream-KStream.

Вы можете сохранить тот же ключ, изменив его.
Ты можешь использовать KeyValueMapper с помощью которого вы можете изменить свой ключ, а также значение.
Вы должны использовать его следующим образом:

val modifiedStream = kStream.map[String,String](
    new KeyValueMapper[String, String,KeyValue[String,String]]{
        override def apply(key: String, value: String): KeyValue[String, String] = new KeyValue("modifiedKey", value)
    }
)

Вы можете применить вышеупомянутую логику на нескольких Kstream объекты для поддержания единого ключа для присоединения KStreams.

Другие вопросы по тегам