Обновление коннектора Debezium MySQL с опцией белого списка таблиц

Я использую MySQL-коннектор Debezium (0.7.5) и пытаюсь понять, каков наилучший подход, если я хочу обновить эту конфигурацию с помощью опции table.whitelist,

Допустим, я создаю соединитель, что-то вроде этого:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://debezium-host/connectors/ -d '
{
  "name": "MyConnector",
  "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "connect.timeout.ms": "60000",
      "tasks.max": "1",
      "database.hostname": "myhost",
      "database.port": "3306",
      "database.user": "***",
      "database.password": "***",
      "database.server.id": "3227197",
      "database.server.name": "MyServer",
      "database.whitelist": "myDb",
      "table.whitelist": "myDb.table1,myDb.table2",
      "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
      "database.history.kafka.topic": "MyConnectorHistoryTopic",
      "max.batch.size": "1024",
      "snapshot.mode": "initial",
      "decimal.handling.mode": "double"
    }
}'

Через некоторое время (2 недели) мне нужно добавить новую таблицу (myDb.table3) к этому table.whitelist опция (и эта таблица старая, она была создана до коннектора)

То, что я попробовал, было:

  • Приостановить разъем.
  • Удалена тема истории (может, в этом проблема?).
  • Обновил конфигурацию через конечную точку конфигурации обновления API.
  • Возобновите разъем.

Обновить команду через API:

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" https://kafka-connect-host/connectors/MyConnector/config/ -d '
{
  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "connect.timeout.ms": "60000",
  "tasks.max": "1",
  "database.hostname": "myhost",
  "database.port": "3306",
  "database.user": "***",
  "database.password": "***",
  "database.server.id": "3227197",
  "database.server.name": "MyServer",
  "database.whitelist": "myDb",
  "table.whitelist": "myDb.table1,myDb.table2,myDb.table3",
  "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
  "database.history.kafka.topic": "MyConnectorHistoryTopic",
  "max.batch.size": "1024",
  "snapshot.mode": "schema_only",
  "decimal.handling.mode": "double"
}'

Но это не сработало, и, возможно, это не самый лучший подход. В других разъемах я не пользуюсь опцией table.whitelistПоэтому, когда мне нужно было прослушать новый стол, у меня не было этой проблемы.

Моим последним вариантом, я думаю, будет удалить этот соединитель и создать еще один с этой новой конфигурацией, также прослушивая новую таблицу (myDb.table3). Проблема в том, если я хочу исходные данные из myDb.table3 Мне бы создать со снимком initial но я не хочу генерировать все сообщения из снимка из других таблиц myDb.table1,myDb.table2,

3 ответа

Последняя версия Debezium Server, вы можете добавить следующий конфиг

      debezium.snapshot.new.tables=parallel

В случае, если вы используете Debezium, вы можете попробовать это значение конфигурации

      snapshot.new.tables=parallel

Примечание. Сервер Debeziyum поддерживает Kinesis, подписку Google Pub и Apache Pulsar. Я использую это, и его конфигурация немного отличается. Мне пришлось добавлять «дебезиум» перед каждым элементом.

После добавления этой конфигурации любое дополнение к таблицам.whitelist. Для этих дополнительных таблиц Debezium создаст моментальные снимки.

Я не могу указать вам на документацию, но я просмотрел их код на GitHub, а также попробовал его на практике, что сработало для меня. Вот ссылка на код MySqlConnector

https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java

Там найдите Field.create("snapshot.new.tables")

Лично мне кажется, что в Debezium много всего, но документация разбросана.

Изменения в конфигурации белого / черного списка пока не поддерживаются. Это в настоящее время работает (см. DBZ-175), и мы надеемся получить поддержку предварительного просмотра для этого в одном из следующих выпусков. Для этого есть пиар, который требует немного больше работы.

До тех пор, пока это не будет реализовано, лучше всего настроить новый экземпляр коннектора, который будет захватывать только те дополнительные таблицы, которые вас интересуют. Это достигается ценой запуска двух коннекторов (которые оба будут поддерживать сеанс чтения журнала binlog)., но это помогает, если вам не нужно менять конфигурацию фильтра слишком часто.

У меня такая же проблема, и я решаю ее с помощью таблицы сигналов для debezium. Это работает таким образом, вам нужно создать таблицу для отправки команд debezium в вашу таблицу данных.

      CREATE TABLE public.debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32)  NULL, data VARCHAR(2048)  NULL);

и установите в вашей конфигурации do debzium тег "signal.data.collection": "public.debezium_signal"

после этого вы можете отправлять команды с вставкой в ​​эту таблицу:

      INSERT INTO debezium_signal (id, type, data)
VALUES(gen_random_uuid(),'execute-snapshot','{"data-collections": "myDb.table3"]}');

в моем случае я должен добавить сигнал таблицы в table.include.list, а также столбцы в column.include.list.

https://debezium.io/documentation/reference/stable/configuration/signalling.html

Другие вопросы по тегам