如何配置 Kafka 主题,以便 RDMS 和图形等数据库可以以事件的形式使用互连的实体模式
How to configure Kafka topics so an interconnected entity schema can be consumed in the form of events by databases like RDMS and graph
我有一个案例,我有 Information
个包含 Element
个对象的对象。如果我存储一个 Information
对象,它将尝试根据唯一值字段查找预先存在的 Element
对象,否则将它们插入。 Information
个对象和 Element
个对象暂时无法删除。添加父级需要两个预先存在的 Element
对象。我计划使用三个主题:CreateElement
、CreateInformation
、AddParentOfElement
事件 Created Element Event
、Created Information Event
和 Added Parent Event
。我意识到,因为主题之间和主题分区之间没有顺序保证,所以图片中显示的那些事件可能会以不同的顺序使用,因此模式将无法持久保存到 RDBMS 等示例中。我假设 ids 像往常一样用于 Topics 的分区分配。
这是我的图表:
场景是
Element
(id=1)由用户 创建
Information
with (id=1) containing Elements
(1,2,3) 已创建
按用户
Element
(id=5)由用户 创建
- 具有 (id=5) 的
Element
的父级被设置为具有 (id=3) 的 Element
由用户
Information
with (id=2) containing Elements
(1,3 and 5) was
由用户创建
我很好奇我的主题选择是否有意义,如果有任何关于如何让消费者数据库服务处理的事件是幂等的建议,我将不胜感激 - 不要将系统置于错误状态。
谢谢!
考虑此解决方案后: but not being satisfied with the suggestions. I investigated Confluent Bottled Water (https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/) and later the more active but similar Debezium (http://debezium.io/)
我决定遵循 Debezium 的方式。 Debezium 是一个插件,它直接从 Mysql/Postgres binlog 中读取并在 Kafka 中发布这些更改(模式和数据)。
我使用的示例设置涉及 docker,这是我为 Docker 工具箱 (Windows) 和 Docker (Linux).
1a) Linux (Docker)
sudo docker stop $(sudo docker ps -a -q) \
sudo docker rm -f $(sudo docker ps -a -q) \
sudo docker run -d --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.5 \
sudo docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper \
sudo docker run -d --name kafka -e ADVERTISED_HOST_NAME=<YOUR_IP> -e ZOOKEEPER_CONNECT=<YOUR_IP> --link zookeeper:zookeeper -p 9092:9092 debezium/kafka \
sudo docker run -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=<YOUR_IP> --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect \
sudo docker run -d --net=host -e "PROXY=true" -e ADV_HOST=<YOUR_IP> -e "KAFKA_REST_PROXY_URL=http://<YOUR_IP>:8082" -e "SCHEMAREGISTRY_UI_URL=http://<YOUR_IP>:8081" landoop/kafka-topics-ui \
sudo docker run -p 8082:8082 --name kafka-rest --env ZK_CONNECTION_STRING=<YOUR_IP>:2181 frontporch/kafka-rest:latest
1b) Windows (Docker Toolbox)
docker stop $(docker ps -a -q) ;
docker rm -f $(docker ps -a -q) ;
docker run -d --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.5 ;
docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper ;
docker run -d --name kafka -e ADVERTISED_HOST_NAME=192.168.99.100 -e ZOOKEEPER_CONNECT=192.168.99.100 --link zookeeper:zookeeper -p 9092:9092 debezium/kafka ;
docker run -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=192.168.99.100 --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect ;
docker run -d --net=host -e "PROXY=true" -e ADV_HOST=192.168.99.100 -e "KAFKA_REST_PROXY_URL=http://192.168.99.100:8082" -e "SCHEMAREGISTRY_UI_URL=http://192.168.99.100:8081" landoop/kafka-topics-ui ;
docker run -p 8082:8082 --name kafka-rest --env ZK_CONNECTION_STRING=192.168.99.100:2181 frontporch/kafka-rest:latest ;
2) connect the databse to the debezium connect
send a POST application/json to <YOUR_IP>/connectors (for Linux) or 192.168.99.100:8083/connectors (for Windows Docker Toolbox) with body
{
"name": "inventory-connector",
"config": {
"name": "inventory-connector",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
Debezium 为每个 table 创建一个 kafka 主题 - 通过导航到端口 8000 上的 landoop/kafka-topics-ui 服务器,您可以查看消息有效负载的架构如下所示。重要的部分是 payload
before
和 after
发送相应数据库行的旧值和新值。另外 op
是 'c' 用于创建 'u' 用于更新等
每个消费微服务都在使用 spring-使用这些 Maven 依赖项的云 kafka 绑定器:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Brixton.SR7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.RELEASE</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
[...]
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
[...]
</dependencies>
然后我在每个消费 Spring 云微服务中都有一个监听器,它会立即监听它感兴趣的所有主题,并将每个主题事件委托给专用的事件处理程序:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
@Component
public class Listener {
public final CountDownLatch countDownLatch1 = new CountDownLatch(1);
@KafkaListener(id = "listener", topics = {
"dbserver1.inventory.entity",
"dbserver1.inventory.attribute",
"dbserver1.inventory.entity_types"
} , group = "group1")
public void listen(ConsumerRecord<?, ?> record) {
String topic = record.topic();
if (topic.equals("dbserver1.inventory.entity") {
// delegate to appropriate handler
// EntityEventHandler.handle(record);
}
else if (...) {}
}
}
在我的例子中,我想根据 RDBMS 端发生的变化更新图表。当然图数据库最终会和RDBMS保持一致。我担心的是,由于主题包括更改,例如 join_tables 以及连接的 table 边,我将无法在不知道每个顶点的情况下创建相应的边和顶点边缘存在。所以我决定问 debezium gitter (https://gitter.im/debezium/dev):
从下面的讨论中,存在两种方法。要么使用占位符为尚未使用的主题创建边和顶点,要么使用 Kafka Streams 将主题缝合回其原始结构,这对我来说似乎比第一种方式。所以我决定采用第一种方式:)
Michail Michailidis @zifnab87 Apr 17 11:23 Hi I was able to integrate
Mysql with Debezium Connect and using landoop/topics-ui I am able to
see that the topics are picked up properly and messages are sent the
way they have to. I saw that for each of the tables there is a topic.
e.g join tables are separate topics too.. If lets say I have three
tables order, product and order_product and I have a service consuming
all three topics.. I might get first the insertion on order_product
and then the insertion of order.. That may cause a problem if I am
trying to push this information to a graph database.. I will try to
create an edge on vertex that is not there yet.. how can I make
consumers that consume events lets say based on a transactionId or at
least are aware of the boundary context.. is there an easy way to
listen to those events and then deserialize them to a real java object
so I can push that to a graph database or search index? If not how
would you approach this problem? Thanks!
Randall Hauch @rhauch Apr 17 19:19 @zifnab87 Debezium CDC is purely a
row-based approach, so by default all consumers see the row-level
change events in an eventually consistent manner. Of course, the
challenge to eventual consistency of downstream systems is that they
might potentially leak data states that never really existed in the
upstream source. But with that come lots of other really huge
benefits: downstream consumers are much simpler, more resilient to
failure, have lower latency (since there’s no need to wait for the
appearance of the upstream transaction’s completion before
processing), and are less decoupled to the upstream system. You gave
the example of an order and product tables with an order_product
intersect table. I agree that when thinking transactionally it does
not make sense for an order_product relationship to be added before
both the order and product instances exist. But are you required to
live with that constraint? Can the order_product consumer create
placeholder nodes in the graph database for any missing order and/or
product values referenced by the relationship? In this case when the
order_product consumer is a bit ahead of the order consumer, it might
create an empty order node with the proper key or identifier, and when
the order consumer finally processes the new order it would find the
existing placeholder node and fill in the details. Of course, when the
order arrives before the order_product relationships, then everything
works as one might expect. This kind of approach might not be allowed
by the downstream graph database system or the business-level
constraints defined in the graph database. But if it is allowed and
the downstream applications and services are designed to handle such
states, then you’ll benefit from the significant simplicity that this
approach affords, as the consumers become almost trivial. You’ll be
managing less intermediate state and your system will be more likely
to continue operating when things go wrong (e.g., consumers crash or
are taken down for maintenance). If your downstream consumers do have
to stick with ahering to the transaction boundaries in the source
database, then you might consider using Kafka Streams to join the
order and order_product topics and produce a single aggregate order
object with all the relationships to the referenced products. If you
can’t assume the product already exists, then you could also join with
the product topic to add more product detail to the aggregate order
object. Of course, there still are lots of challenges, since the only
way for a stream processor consuming these streams to know it’s seen
all of the row-level change events for a given transaction is when a
subsequent transaction is seen on each of the streams. As you might
expect, this is not ideal, since the last transaction prior to any
quiet period will not complete immediately.
Michail Michailidis @zifnab87 Apr 17 23:49 Thanks @rhauch really well
explained! I was investigating Kafka Streams while waiting for your
answers! now I am thinking I will try to code the placeholder
variation e.g when a vertex is not there etc
Randall Hauch @rhauch Apr 17 23:58 @zifnab87 glad it helped, at least
a bit! Be sure you also consider the fact that the consumer might see
a sequence of messages that it already has consumed. That will only
happen when something goes wrong (e.g., with the connector or the
process(es) where the connector is running, or the broker, network
partition, etc.); when everything is operating normally, the consumer
should see no duplicate messages.
Michail Michailidis @zifnab87 Apr 18 01:15 @rhauch Sure it helped!
Yeap I have that in mind - consumer processes need to be idempotent. I
am curious if for example sinks for lets say elastic search, mongodb
and graph databases can be implemented to consolidate events produced
from debezium-mysql no matter what the order by using placeholders for
missing things.. e.g the mountaineer sinks are doing that alreadu if
you know by any chance? I am trying to avoid reimplementing things
that already exist.. Also my solutions might be very fragile if mysql
schema changes and I dont consume the new events.. I feel so many
things are missing around the microservices world
Randall Hauch @rhauch Apr 18 03:30 I'm not sure how those sinks work.
Ideally they should handle create, update, and delete events
correctly. But because Debezium events have an envelope at the top
level of every event, you'll probably have to use SMTs to grab the
contents of the after field (or exclude the before field) so the
"meaningful" parts are put into the sink system. This will probably
get easier as more SMTs get added to KC. If you find that it takes too
many SMTs and would rather Debezium added an SMT that did this, please
log a feature request in JIRA.
希望这个 answer/guide 能帮助其他人快速启动以 Kafka 等消息代理为核心的事件溯源。
我有一个案例,我有 Information
个包含 Element
个对象的对象。如果我存储一个 Information
对象,它将尝试根据唯一值字段查找预先存在的 Element
对象,否则将它们插入。 Information
个对象和 Element
个对象暂时无法删除。添加父级需要两个预先存在的 Element
对象。我计划使用三个主题:CreateElement
、CreateInformation
、AddParentOfElement
事件 Created Element Event
、Created Information Event
和 Added Parent Event
。我意识到,因为主题之间和主题分区之间没有顺序保证,所以图片中显示的那些事件可能会以不同的顺序使用,因此模式将无法持久保存到 RDBMS 等示例中。我假设 ids 像往常一样用于 Topics 的分区分配。
这是我的图表:
场景是
Element
(id=1)由用户 创建
Information
with (id=1) containingElements
(1,2,3) 已创建 按用户Element
(id=5)由用户 创建
- 具有 (id=5) 的
Element
的父级被设置为具有 (id=3) 的Element
由用户 Information
with (id=2) containingElements
(1,3 and 5) was 由用户创建
我很好奇我的主题选择是否有意义,如果有任何关于如何让消费者数据库服务处理的事件是幂等的建议,我将不胜感激 - 不要将系统置于错误状态。
谢谢!
考虑此解决方案后:
我决定遵循 Debezium 的方式。 Debezium 是一个插件,它直接从 Mysql/Postgres binlog 中读取并在 Kafka 中发布这些更改(模式和数据)。
我使用的示例设置涉及 docker,这是我为 Docker 工具箱 (Windows) 和 Docker (Linux).
1a) Linux (Docker)
sudo docker stop $(sudo docker ps -a -q) \
sudo docker rm -f $(sudo docker ps -a -q) \
sudo docker run -d --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.5 \
sudo docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper \
sudo docker run -d --name kafka -e ADVERTISED_HOST_NAME=<YOUR_IP> -e ZOOKEEPER_CONNECT=<YOUR_IP> --link zookeeper:zookeeper -p 9092:9092 debezium/kafka \
sudo docker run -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=<YOUR_IP> --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect \
sudo docker run -d --net=host -e "PROXY=true" -e ADV_HOST=<YOUR_IP> -e "KAFKA_REST_PROXY_URL=http://<YOUR_IP>:8082" -e "SCHEMAREGISTRY_UI_URL=http://<YOUR_IP>:8081" landoop/kafka-topics-ui \
sudo docker run -p 8082:8082 --name kafka-rest --env ZK_CONNECTION_STRING=<YOUR_IP>:2181 frontporch/kafka-rest:latest
1b) Windows (Docker Toolbox)
docker stop $(docker ps -a -q) ;
docker rm -f $(docker ps -a -q) ;
docker run -d --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.5 ;
docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper ;
docker run -d --name kafka -e ADVERTISED_HOST_NAME=192.168.99.100 -e ZOOKEEPER_CONNECT=192.168.99.100 --link zookeeper:zookeeper -p 9092:9092 debezium/kafka ;
docker run -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my-connect-configs -e OFFSET_STORAGE_TOPIC=my-connect-offsets -e ADVERTISED_HOST_NAME=192.168.99.100 --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect ;
docker run -d --net=host -e "PROXY=true" -e ADV_HOST=192.168.99.100 -e "KAFKA_REST_PROXY_URL=http://192.168.99.100:8082" -e "SCHEMAREGISTRY_UI_URL=http://192.168.99.100:8081" landoop/kafka-topics-ui ;
docker run -p 8082:8082 --name kafka-rest --env ZK_CONNECTION_STRING=192.168.99.100:2181 frontporch/kafka-rest:latest ;
2) connect the databse to the debezium connect
send a POST application/json to <YOUR_IP>/connectors (for Linux) or 192.168.99.100:8083/connectors (for Windows Docker Toolbox) with body
{
"name": "inventory-connector",
"config": {
"name": "inventory-connector",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
Debezium 为每个 table 创建一个 kafka 主题 - 通过导航到端口 8000 上的 landoop/kafka-topics-ui 服务器,您可以查看消息有效负载的架构如下所示。重要的部分是 payload
before
和 after
发送相应数据库行的旧值和新值。另外 op
是 'c' 用于创建 'u' 用于更新等
每个消费微服务都在使用 spring-使用这些 Maven 依赖项的云 kafka 绑定器:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Brixton.SR7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.2.0.RELEASE</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
[...]
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
[...]
</dependencies>
然后我在每个消费 Spring 云微服务中都有一个监听器,它会立即监听它感兴趣的所有主题,并将每个主题事件委托给专用的事件处理程序:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
@Component
public class Listener {
public final CountDownLatch countDownLatch1 = new CountDownLatch(1);
@KafkaListener(id = "listener", topics = {
"dbserver1.inventory.entity",
"dbserver1.inventory.attribute",
"dbserver1.inventory.entity_types"
} , group = "group1")
public void listen(ConsumerRecord<?, ?> record) {
String topic = record.topic();
if (topic.equals("dbserver1.inventory.entity") {
// delegate to appropriate handler
// EntityEventHandler.handle(record);
}
else if (...) {}
}
}
在我的例子中,我想根据 RDBMS 端发生的变化更新图表。当然图数据库最终会和RDBMS保持一致。我担心的是,由于主题包括更改,例如 join_tables 以及连接的 table 边,我将无法在不知道每个顶点的情况下创建相应的边和顶点边缘存在。所以我决定问 debezium gitter (https://gitter.im/debezium/dev):
从下面的讨论中,存在两种方法。要么使用占位符为尚未使用的主题创建边和顶点,要么使用 Kafka Streams 将主题缝合回其原始结构,这对我来说似乎比第一种方式。所以我决定采用第一种方式:)
Michail Michailidis @zifnab87 Apr 17 11:23 Hi I was able to integrate Mysql with Debezium Connect and using landoop/topics-ui I am able to see that the topics are picked up properly and messages are sent the way they have to. I saw that for each of the tables there is a topic. e.g join tables are separate topics too.. If lets say I have three tables order, product and order_product and I have a service consuming all three topics.. I might get first the insertion on order_product and then the insertion of order.. That may cause a problem if I am trying to push this information to a graph database.. I will try to create an edge on vertex that is not there yet.. how can I make consumers that consume events lets say based on a transactionId or at least are aware of the boundary context.. is there an easy way to listen to those events and then deserialize them to a real java object so I can push that to a graph database or search index? If not how would you approach this problem? Thanks!
Randall Hauch @rhauch Apr 17 19:19 @zifnab87 Debezium CDC is purely a row-based approach, so by default all consumers see the row-level change events in an eventually consistent manner. Of course, the challenge to eventual consistency of downstream systems is that they might potentially leak data states that never really existed in the upstream source. But with that come lots of other really huge benefits: downstream consumers are much simpler, more resilient to failure, have lower latency (since there’s no need to wait for the appearance of the upstream transaction’s completion before processing), and are less decoupled to the upstream system. You gave the example of an order and product tables with an order_product intersect table. I agree that when thinking transactionally it does not make sense for an order_product relationship to be added before both the order and product instances exist. But are you required to live with that constraint? Can the order_product consumer create placeholder nodes in the graph database for any missing order and/or product values referenced by the relationship? In this case when the order_product consumer is a bit ahead of the order consumer, it might create an empty order node with the proper key or identifier, and when the order consumer finally processes the new order it would find the existing placeholder node and fill in the details. Of course, when the order arrives before the order_product relationships, then everything works as one might expect. This kind of approach might not be allowed by the downstream graph database system or the business-level constraints defined in the graph database. But if it is allowed and the downstream applications and services are designed to handle such states, then you’ll benefit from the significant simplicity that this approach affords, as the consumers become almost trivial. You’ll be managing less intermediate state and your system will be more likely to continue operating when things go wrong (e.g., consumers crash or are taken down for maintenance). If your downstream consumers do have to stick with ahering to the transaction boundaries in the source database, then you might consider using Kafka Streams to join the order and order_product topics and produce a single aggregate order object with all the relationships to the referenced products. If you can’t assume the product already exists, then you could also join with the product topic to add more product detail to the aggregate order object. Of course, there still are lots of challenges, since the only way for a stream processor consuming these streams to know it’s seen all of the row-level change events for a given transaction is when a subsequent transaction is seen on each of the streams. As you might expect, this is not ideal, since the last transaction prior to any quiet period will not complete immediately.
Michail Michailidis @zifnab87 Apr 17 23:49 Thanks @rhauch really well explained! I was investigating Kafka Streams while waiting for your answers! now I am thinking I will try to code the placeholder variation e.g when a vertex is not there etc
Randall Hauch @rhauch Apr 17 23:58 @zifnab87 glad it helped, at least a bit! Be sure you also consider the fact that the consumer might see a sequence of messages that it already has consumed. That will only happen when something goes wrong (e.g., with the connector or the process(es) where the connector is running, or the broker, network partition, etc.); when everything is operating normally, the consumer should see no duplicate messages.
Michail Michailidis @zifnab87 Apr 18 01:15 @rhauch Sure it helped! Yeap I have that in mind - consumer processes need to be idempotent. I am curious if for example sinks for lets say elastic search, mongodb and graph databases can be implemented to consolidate events produced from debezium-mysql no matter what the order by using placeholders for missing things.. e.g the mountaineer sinks are doing that alreadu if you know by any chance? I am trying to avoid reimplementing things that already exist.. Also my solutions might be very fragile if mysql schema changes and I dont consume the new events.. I feel so many things are missing around the microservices world
Randall Hauch @rhauch Apr 18 03:30 I'm not sure how those sinks work. Ideally they should handle create, update, and delete events correctly. But because Debezium events have an envelope at the top level of every event, you'll probably have to use SMTs to grab the contents of the after field (or exclude the before field) so the "meaningful" parts are put into the sink system. This will probably get easier as more SMTs get added to KC. If you find that it takes too many SMTs and would rather Debezium added an SMT that did this, please log a feature request in JIRA.
希望这个 answer/guide 能帮助其他人快速启动以 Kafka 等消息代理为核心的事件溯源。