Проблемы, связанные с использованием таблицы Kafka KSQL AVRO в качестве источника для KDK Kinkka Connect JDBC Sink

Я боролся с этим уже около недели, пытаясь получить простую (3 поля) отформатированную AVRO таблицу KSQL в качестве источника для приемника коннектора JDBC (mysql)

Я получаю следующие ошибки (после строки INFO):

[2018-12-11 18:58:50,678] INFO Setting metadata for table "DSB_ERROR_TABLE_WINDOWED" to Table{name='"DSB_ERROR_TABLE_WINDOWED"', columns=[Column{'MOD_CLASS', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'METHOD', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'COUNT', isPrimaryKey=false, allowsNull=true, sqlType=BIGINT}]} (io.confluent.connect.jdbc.util.TableDefinitions)

[2018-12-11 18:58:50,679] ERROR WorkerSinkTask{id=dev-dsb-errors-mysql-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: DSB_ERROR_TABLE_WINDOWED
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:127)
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:64)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:79)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:124)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:63)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Я могу сказать, что приемник делает что-то правильно, когда схема извлекается (см. Перед ошибкой выше), и таблица успешно создается в базе данных с правильной схемой:

MariaDB [dsb_errors_ksql]> describe  DSB_ERROR_TABLE_WINDOWED;
+-----------+--------------+------+-----+---------+-------+
| Field     | Type         | Null | Key | Default | Extra |
+-----------+--------------+------+-----+---------+-------+
| MOD_CLASS | varchar(256) | YES  |     | NULL    |       |
| METHOD    | varchar(256) | YES  |     | NULL    |       |
| COUNT     | bigint(20)   | YES  |     | NULL    |       |
+-----------+--------------+------+-----+---------+-------+
3 rows in set (0.01 sec)

И вот определение KTABLE:

ksql> describe extended DSB_ERROR_TABLE_windowed;

Name                 : DSB_ERROR_TABLE_WINDOWED
Type                 : TABLE
Key field            : KSQL_INTERNAL_COL_0|+|KSQL_INTERNAL_COL_1
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : AVRO
Kafka topic          : DSB_ERROR_TABLE_WINDOWED (partitions: 4, replication: 1)

 Field     | Type
---------------------------------------
 ROWTIME   | BIGINT           (system)
 ROWKEY    | VARCHAR(STRING)  (system)
 MOD_CLASS | VARCHAR(STRING)
 METHOD    | VARCHAR(STRING)
 COUNT     | BIGINT
---------------------------------------

Queries that write into this TABLE
-----------------------------------
CTAS_DSB_ERROR_TABLE_WINDOWED_37 : create table DSB_ERROR_TABLE_windowed  with (value_format='avro') as  select mod_class, method, count(*) as count  from DSB_ERROR_STREAM window session ( 60 seconds) group by mod_class, method   having count(*) > 0;

Для этой таблицы автоматически сгенерирована запись в реестре схемы (но нет ключевой записи):

{
    "subject": "DSB_ERROR_TABLE_WINDOWED-value",
    "version": 7,
    "id": 143,
    "schema": "{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"MOD_CLASS\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"METHOD\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"COUNT\",\"type\":[\"null\",\"long\"],\"default\":null}]}"
}

и вот определение Connect Worker:

{ "name": "dev-dsb-errors-mysql-sink",
   "config": { 
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "DSB_ERROR_TABLE_WINDOWED", 
        "connection.url": "jdbc:mysql://os-compute-d01.maeagle.corp:32692/dsb_errors_ksql?user=xxxxxx&password=xxxxxx",
        "auto.create": "true",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://kafka-d01.maeagle.corp:8081",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter"
   }       
}

Насколько я понимаю (что может быть неверным), KSQL должен создавать соответствующие схемы AVRO в реестре схем, а Kafka Connect должна иметь возможность правильно их читать. Как я отмечал выше, что-то работает, так как соответствующая таблица генерируется в Mysql, хотя я удивлен, что не создано ключевое поле...

Большинство постов и примеров используют JSON, а не AVRO, поэтому они не особенно полезны.

Похоже, что в части десериализации чтения и вставки записи темы...

Я в растерянности на данный момент и мог бы использовать некоторые рекомендации.

Я также открыл похожий билет через github:

https://github.com/confluentinc/ksql/issues/2250

С Уважением,

--John

0 ответов

Как сказал выше Джон, ключ в записи темы - это не строка, а строка, постфиксированная с одним сериализованным 64-битным целым числом Java, представляющим время начала окна.

Connect не имеет SMT, который может обрабатывать оконный формат ключей. Однако можно было бы написать единицу, чтобы убрать целое число и просто вернуть естественный ключ. Затем вы можете включить это в путь к классу и обновить конфигурацию подключения.

Если вам требуется время начала окна в базе данных, вы можете обновить запрос ksqlDB, чтобы включить время начала окна в качестве поля в значении.

Другие вопросы по тегам