Включить BinLogs для чтения-реплики с binlog_format="row"

Недавно я включил binlogs в мою реплику для чтения, включив автоматическое резервное копирование (как уже упоминалось здесь). Однако по умолчанию binlog_format был установлен в MIXED.

binlog_format=MIXED

Из-за этого несоответствия происходит сбой коннектора Debezium, так как он находит исходные binlogs в формате MIXED. Есть ли способ включить binlogs с форматом ROW с самого начала?

Добавление журнала ошибок:

Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: Caused by: io.debezium.text.ParsingException: Failed to parse statement 'update user_payments set address='400001', amount_details='{base:100.00, tax_1:0.00, tax_2:0.00, tax_3:0.00
}', bank_reference_number='698774', commercial_pack='HSPremiumMonth', country='in', coupon=null, create_date='2018-10-12 08:35:27', currency='INR', customer_id='acn|9a3722ef-5547-4e0e-ad3f-ef40eabf095e@mandardeodhar.me', discount_amount=0
.0, email='9a3722ef-5547-4e0e-ad3f-ef40eabf095e@mandardeodhar.me', fname='Vibhor', freetrial=1, hs_invoice_number=null, invoice_amount=199.0, invoice_date=null, invoice_number=null, last_update_date='2018-10-12 08:35:43', lname='User', me
ta=null, parent_transaction_id=null, payment_hash='2HL5LM', pg_commercial_pack='hotstar-razor-upi-hsp-month', pg_name='payu', pg_transaction_id='403993715518439176', service_configuration_mci=18, service_end_date='2018-11-11 08:35:43', se
rvice_start_date='2018-10-12 08:35:43', service_type='RECURRING', settlement_group=null, subscription_family_name='HotstarPremium', tax_amount=0.0, transaction_amount=199.0, transaction_status='Completed', transaction_type='Payment' where
 transaction_id=1012005'
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.relational.ddl.LegacyDdlParser.parse(LegacyDdlParser.java:225)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.relational.ddl.LegacyDdlParser.parse(LegacyDdlParser.java:200)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:297)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:637)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:436)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011... 7 more
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: Caused by: io.debezium.text.ParsingException: Expecting token type 128 at line 1, column 1 but found 'update':  ===>> update user_payments
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.text.TokenStream.consume(TokenStream.java:750)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.relational.ddl.LegacyDdlParser.consumeStatement(LegacyDdlParser.java:462)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.relational.ddl.LegacyDdlParser.parseUnknownStatement(LegacyDdlParser.java:309)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.MySqlDdlParser.parseNextStatement(MySqlDdlParser.java:191)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.relational.ddl.LegacyDdlParser.parse(LegacyDdlParser.java:219)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011... 11 more
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: [2018-10-12 08:35:43,348] INFO Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored. (io.debezium.connector.mysql.BinlogReader:457)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: [2018-10-12 08:35:43,517] INFO WorkerSourceTask{id=um-users-qa-test-7-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: [2018-10-12 08:35:43,517] INFO WorkerSourceTask{id=um-users-qa-test-7-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: [2018-10-12 08:35:43,518] INFO WorkerSourceTask{id=um-users-qa-test-7-0} Finished commitOffsets successfully in 1 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:427)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: [2018-10-12 08:35:43,518] ERROR WorkerSourceTask{id=um-users-qa-test-7-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: org.apache.kafka.connect.errors.ConnectException: Failed to parse statement 'update user_payments set address='400001', amount_details='{base:100.00, tax_1:0.00, tax_2:0.00, tax_3:0.00}', bank_reference_number='698774', commercial_pack='HSPremiumMonth', country='in', coupon=null, create_date='2018-10-12 08:35:27', currency='INR', customer_id='acn|9a3722ef-5547-4e0e-ad3f-ef40eabf095e@mandardeodhar.me', discount_amount=0.0, email='9a3722ef-5547-4e0e-ad3f-ef40eabf095e@mandardeodhar.me', fname='Vibhor', freetrial=1, hs_invoice_number=null, invoice_amount=199.0, invoice_date=null, invoice_number=null, last_update_date='2018-10-12 08:35:43', lname='User', meta=null, parent_transaction_id=null, payment_hash='2HL5LM', pg_commercial_pack='hotstar-razor-upi-hsp-month', pg_name='payu', pg_transaction_id='403993715518439176', service_configuration_mci=18, service_end_date='2018-11-11 08:35:43', service_start_date='2018-10-12 08:35:43', service_type='RECURRING', settlement_group=null, subscription_family_name='HotstarPremium', tax_amount=0.0, transaction_amount=199.0, transaction_status='Completed', transaction_type='Payment' where transaction_id=1012005'
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:200)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:178)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:452)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.EventBuffer.completeTransaction(EventBuffer.java:187)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.EventBuffer.add(EventBuffer.java:101)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1055)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:913)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at java.lang.Thread.run(Thread.java:748)

1 ответ

После включения резервного копирования вы можете изменить группу параметров binlog_format в ROW в РДС.

Выберите продукт RDS -> Выберите свой экземпляр.

Нажмите в группе параметров вашего экземпляра:

введите описание изображения здесь

Поиск параметра binlog_format -> Выберите его -> Изменить параметры -> Выбрать Row ,

введите описание изображения здесь

После этого вам нужно будет перезапустить ваш экземпляр, чтобы применить это новое значение параметра. После того, как ваша база данных снова подключится к сети, вы можете проверить правильность значений, выполнив следующие команды:

show global variables like 'log_bin';
show global variables like 'binlog_format';

Результат должен быть примерно таким: введите описание изображения здесь

Затем вы можете удалить свой соединитель с помощью REST API из Kafka Connect и снова зарегистрировать свой соединитель с помощью "snapshot.mode": "when_needed", Это создаст все строки из ваших таблиц в конфигурации белого списка для соответствующих тем в Kafka.

Кроме того, как сказал Майкл, вы можете увеличить binlog retention hours параметр.

CALL mysql.rds_show_configuration;
CALL mysql.rds_set_configuration('binlog retention hours', 24);

Я надеюсь, что это помогает.

Другие вопросы по тегам