Debezium + Schema Registry Avro Schema: почему у меня есть поля "до" и "после" и как использовать их с HudiDeltaStreamer?
У меня есть таблица в PostgreSQL со следующей схемой:
Table "public.kc_ds"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
--------+-----------------------+-----------+----------+-----------------------------------+----------+--------------+-------------
id | integer | | not null | nextval('kc_ds_id_seq'::regclass) | plain | |
num | integer | | not null | | plain | |
text | character varying(50) | | not null | | extended | |
Indexes:
"kc_ds_pkey" PRIMARY KEY, btree (id)
Publications:
"dbz_publication"
Когда я запускаю коннектор источника Debezium для этой таблицы, который использует
io.confluent.connect.avro.AvroConverter
и Schema Registry, он создает схему Schema Registry, которая выглядит следующим образом (некоторые поля здесь опущены):
"fields":[
{
"name":"before",
"type":[
"null",
{
"type":"record",
"name":"Value",
"fields":[
{
"name":"id",
"type":"int"
},
{
"name":"num",
"type":"int"
},
{
"name":"text",
"type":"string"
}
],
"connect.name":"xxx.public.kc_ds.Value"
}
],
"default":null
},
{
"name":"after",
"type":[
"null",
"Value"
],
"default":null
},
]
Сообщения в моей теме Kafka, созданные Debezium, выглядят следующим образом (некоторые поля опущены):
{
"before": null,
"after": {
"xxx.public.kc_ds.Value": {
"id": 2,
"num": 2,
"text": "text version 1"
}
}
Когда я ВСТАВЛЯЮ или ОБНОВЛЯЮ,
"before"
всегда
null
, и
"after"
содержит мои данные; когда я УДАЛЯЮ, верно обратное:
"after"
равно нулю и
"before"
содержит данные (хотя для всех полей установлены значения по умолчанию).
Вопрос №1: Почему Kafka Connect создает схему с
"before"
и
"after"
поля? Почему эти поля ведут себя так странно?
Вопрос № 2: есть ли встроенный способ заставить Kafka Connect отправлять простые сообщения в мои темы, все еще используя реестр схем? Обратите внимание, что преобразование Flatten - это не то, что мне нужно: если оно включено, у меня все равно будет
"before"
и
"after"
поля.
Вопрос №3 (на самом деле ни на что не надейся, но, возможно, кто-то знает): необходимость сглаживания моих сообщений возникает из-за того, что мне нужно читать данные из моих тем с помощью HudiDeltaStreamer, и похоже, что этот инструмент ожидает плоские входные данные. В
"before"
и
"after"
в конечном итоге поля становятся отдельными объектными столбцами в результирующих файлах .parquet. Кто-нибудь знает, как HudiDeltaStreamer должен интегрироваться с сообщениями, созданными Kafka Connect?