使用 binlog_format="row" 在只读副本上启用 BinLogs

Enable BinLogs on read-replica with binlog_format="row"

我最近通过启用自动备份在我的只读副本上启用了二进制日志(如前所述 here)。但是默认 binlog_format 设置为 MIXED。

binlog_format=MIXED

由于这种不一致,Debezium 连接器失败,因为它找到了 MIXED 格式的初始二进制日志。有没有办法从一开始就启用ROW格式的binlog?

添加错误日志:

Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: Caused by: io.debezium.text.ParsingException: Failed to parse statement 'update user_payments set address='400001', amount_details='{base:100.00, tax_1:0.00, tax_2:0.00, tax_3:0.00
}', bank_reference_number='698774', commercial_pack='HSPremiumMonth', country='in', coupon=null, create_date='2018-10-12 08:35:27', currency='INR', customer_id='acn|9a3722ef-5547-4e0e-ad3f-ef40eabf095e@mandardeodhar.me', discount_amount=0
.0, email='9a3722ef-5547-4e0e-ad3f-ef40eabf095e@mandardeodhar.me', fname='Vibhor', freetrial=1, hs_invoice_number=null, invoice_amount=199.0, invoice_date=null, invoice_number=null, last_update_date='2018-10-12 08:35:43', lname='User', me
ta=null, parent_transaction_id=null, payment_hash='2HL5LM', pg_commercial_pack='hotstar-razor-upi-hsp-month', pg_name='payu', pg_transaction_id='403993715518439176', service_configuration_mci=18, service_end_date='2018-11-11 08:35:43', se
rvice_start_date='2018-10-12 08:35:43', service_type='RECURRING', settlement_group=null, subscription_family_name='HotstarPremium', tax_amount=0.0, transaction_amount=199.0, transaction_status='Completed', transaction_type='Payment' where
 transaction_id=1012005'
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.relational.ddl.LegacyDdlParser.parse(LegacyDdlParser.java:225)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.relational.ddl.LegacyDdlParser.parse(LegacyDdlParser.java:200)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:297)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:637)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:436)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011... 7 more
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: Caused by: io.debezium.text.ParsingException: Expecting token type 128 at line 1, column 1 but found 'update':  ===>> update user_payments
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.text.TokenStream.consume(TokenStream.java:750)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.relational.ddl.LegacyDdlParser.consumeStatement(LegacyDdlParser.java:462)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.relational.ddl.LegacyDdlParser.parseUnknownStatement(LegacyDdlParser.java:309)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.MySqlDdlParser.parseNextStatement(MySqlDdlParser.java:191)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.relational.ddl.LegacyDdlParser.parse(LegacyDdlParser.java:219)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011... 11 more
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: [2018-10-12 08:35:43,348] INFO Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored. (io.debezium.connector.mysql.BinlogReader:457)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: [2018-10-12 08:35:43,517] INFO WorkerSourceTask{id=um-users-qa-test-7-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: [2018-10-12 08:35:43,517] INFO WorkerSourceTask{id=um-users-qa-test-7-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: [2018-10-12 08:35:43,518] INFO WorkerSourceTask{id=um-users-qa-test-7-0} Finished commitOffsets successfully in 1 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:427)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: [2018-10-12 08:35:43,518] ERROR WorkerSourceTask{id=um-users-qa-test-7-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: org.apache.kafka.connect.errors.ConnectException: Failed to parse statement 'update user_payments set address='400001', amount_details='{base:100.00, tax_1:0.00, tax_2:0.00, tax_3:0.00}', bank_reference_number='698774', commercial_pack='HSPremiumMonth', country='in', coupon=null, create_date='2018-10-12 08:35:27', currency='INR', customer_id='acn|9a3722ef-5547-4e0e-ad3f-ef40eabf095e@mandardeodhar.me', discount_amount=0.0, email='9a3722ef-5547-4e0e-ad3f-ef40eabf095e@mandardeodhar.me', fname='Vibhor', freetrial=1, hs_invoice_number=null, invoice_amount=199.0, invoice_date=null, invoice_number=null, last_update_date='2018-10-12 08:35:43', lname='User', meta=null, parent_transaction_id=null, payment_hash='2HL5LM', pg_commercial_pack='hotstar-razor-upi-hsp-month', pg_name='payu', pg_transaction_id='403993715518439176', service_configuration_mci=18, service_end_date='2018-11-11 08:35:43', service_start_date='2018-10-12 08:35:43', service_type='RECURRING', settlement_group=null, subscription_family_name='HotstarPremium', tax_amount=0.0, transaction_amount=199.0, transaction_status='Completed', transaction_type='Payment' where transaction_id=1012005'
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:200)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:178)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:452)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.EventBuffer.completeTransaction(EventBuffer.java:187)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at io.debezium.connector.mysql.EventBuffer.add(EventBuffer.java:101)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1055)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:913)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at com.github.shyiko.mysql.binlog.BinaryLogClient.run(BinaryLogClient.java:793)
Oct 12 08:35:43 kafkaConnect1 connect-distributed[13342]: #011at java.lang.Thread.run(Thread.java:748)

启用备份后,您可以在RDS 中将参数组binlog_format 更改为ROW

Select RDS 产品 -> Select 您的实例。

点击实例的参数组:

搜索参数binlog_format -> Select它 -> 编辑参数 -> Select Row.

执行此操作后,您将需要重新启动实例以应用此新参数值。在您的数据库再次联机后,您可以检查值是否正确 运行 这些命令:

show global variables like 'log_bin';
show global variables like 'binlog_format';

结果应该是这样的:

然后您可以使用 REST API 从 Kafka Connect 中删除您的连接器,然后使用 "snapshot.mode": "when_needed" 重新注册您的连接器。这会将白名单配置中的表中的所有行生成到 Kafka 中的相应主题。

此外,正如迈克尔所说,您可能想增加 binlog retention hours 参数。

CALL mysql.rds_show_configuration;
CALL mysql.rds_set_configuration('binlog retention hours', 24);

希望对您有所帮助。