Debezium + Schema Registry Avro Schema: почему у меня есть поля "до" и "после" и как использовать их с HudiDeltaStreamer?

У меня есть таблица в PostgreSQL со следующей схемой:

                                                              Table "public.kc_ds"
 Column |         Type          | Collation | Nullable |              Default              | Storage  | Stats target | Description
--------+-----------------------+-----------+----------+-----------------------------------+----------+--------------+-------------
 id     | integer               |           | not null | nextval('kc_ds_id_seq'::regclass) | plain    |              |
 num    | integer               |           | not null |                                   | plain    |              |
 text   | character varying(50) |           | not null |                                   | extended |              |
Indexes:
    "kc_ds_pkey" PRIMARY KEY, btree (id)
Publications:
    "dbz_publication"

Когда я запускаю коннектор источника Debezium для этой таблицы, который использует io.confluent.connect.avro.AvroConverter и Schema Registry, он создает схему Schema Registry, которая выглядит следующим образом (некоторые поля здесь опущены):

       "fields":[
      {
         "name":"before",
         "type":[
            "null",
            {
               "type":"record",
               "name":"Value",
               "fields":[
                  {
                     "name":"id",
                     "type":"int"
                  },
                  {
                     "name":"num",
                     "type":"int"
                  },
                  {
                     "name":"text",
                     "type":"string"
                  }
               ],
               "connect.name":"xxx.public.kc_ds.Value"
            }
         ],
         "default":null
      },
      {
         "name":"after",
         "type":[
            "null",
            "Value"
         ],
         "default":null
      },
]

Сообщения в моей теме Kafka, созданные Debezium, выглядят следующим образом (некоторые поля опущены):

       {
  "before": null,
  "after": {
    "xxx.public.kc_ds.Value": {
      "id": 2,
      "num": 2,
      "text": "text version 1"
    }
}

Когда я ВСТАВЛЯЮ или ОБНОВЛЯЮ, "before" всегда null, и "after"содержит мои данные; когда я УДАЛЯЮ, верно обратное: "after" равно нулю и "before" содержит данные (хотя для всех полей установлены значения по умолчанию).

Вопрос №1: Почему Kafka Connect создает схему с "before" и "after"поля? Почему эти поля ведут себя так странно?

Вопрос № 2: есть ли встроенный способ заставить Kafka Connect отправлять простые сообщения в мои темы, все еще используя реестр схем? Обратите внимание, что преобразование Flatten - это не то, что мне нужно: если оно включено, у меня все равно будет "before" и "after" поля.

Вопрос №3 (на самом деле ни на что не надейся, но, возможно, кто-то знает): необходимость сглаживания моих сообщений возникает из-за того, что мне нужно читать данные из моих тем с помощью HudiDeltaStreamer, и похоже, что этот инструмент ожидает плоские входные данные. В "before" и "after"в конечном итоге поля становятся отдельными объектными столбцами в результирующих файлах .parquet. Кто-нибудь знает, как HudiDeltaStreamer должен интегрироваться с сообщениями, созданными Kafka Connect?

0 ответов