Debezium MySQL Connector 可以根据事件的操作类型将数据更改事件路由到不同的主题吗?

Can Debezium MySQL Connector route data change event to different topic by event's operation type?

Debezium 支持三种类型的数据更改事件:

我知道 Debezium 发布消息的有效负载中有一个 op 字段来标识事件类型,但我想知道我是否可以将这三种类型的数据更改事件路由到不同的操作类型的 Kafka 主题,如 SMT?

单个消息转换

正如您所建议的,单消息转换是一个不错的选择。 Debezium 有一个目前处于测试阶段的转换,称为 ContentBasedRouter,您可以使用它使用包括 Groovy.

在内的语言对路由进行编码

ksqlDB

您可以使用 ksqlDB 执行此操作:

-- Declare source topic from Debezium as ksqlDB stream
CREATE STREAM ORDERS WITH (KAFKA_TOPIC='asgard.demo.ORDERS', VALUE_FORMAT='AVRO');

-- Create three streams (backed by Kafka topics) based on the op-type
CREATE STREAM ORDERS_UPDATES AS SELECT * FROM ORDERS WHERE OP='u';
CREATE STREAM ORDERS_DELETES AS SELECT * FROM ORDERS WHERE OP='d';
CREATE STREAM ORDERS_CREATES AS SELECT * FROM ORDERS WHERE OP='c';

查看数据

ksql> SHOW TOPICS;

 Kafka Topic                           | Partitions | Partition Replicas
-------------------------------------------------------------------------
 ORDERS_CREATES                        | 1          | 1
 ORDERS_DELETES                        | 1          | 1
 ORDERS_UPDATES                        | 1          | 1

检查计数

ksql> SELECT OP,COUNT(*) AS EVENTS FROM ORDERS GROUP BY OP EMIT CHANGES;
+-------+----------+
|OP     |EVENTS    |
+-------+----------+
|u      |3         |
|c      |502       |
|d      |5         |

ksql> SELECT 'ORDERS_UPDATES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT 
        FROM ORDERS_UPDATES GROUP BY 'ORDERS_UPDATES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME      |EVENT_COUNT  |
+----------------+-------------+
|ORDERS_UPDATES  |3            |
Limit Reached
Query terminated

ksql> SELECT 'ORDERS_CREATES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT 
        FROM ORDERS_CREATES GROUP BY 'ORDERS_CREATES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME      |EVENT_COUNT  |
+----------------+-------------+
|ORDERS_CREATES  |503          |
Limit Reached
Query terminated

ksql> SELECT 'ORDERS_DELETES' AS TOPIC_NAME ,COUNT(*) AS EVENT_COUNT 
        FROM ORDERS_DELETES GROUP BY 'ORDERS_DELETES' EMIT CHANGES LIMIT 1 ;
+----------------+-------------+
|TOPIC_NAME      |EVENT_COUNT  |
+----------------+-------------+
|ORDERS_DELETES  |5            |
Limit Reached
Query terminated