Apache Pulsar JDBC 接收器:insert/update/delete 之间的区别

Apache Pulsar JDBC sink: differentiation between insert/update/delete

我目前正在检查 Pulsar JDBC 接收器,因为我们计划很快使用 PostgresSQL 接收器。 现在,有人提到 JDBC sinks 支持 insert/update/delete 操作,但我找不到任何关于 sink 连接器如何实际决定执行什么的文档(是插入、更新还是更新)为新活动删除?)

浏览源代码并浏览 JdbcAbstractSink.java 我想我现在可能有了一个想法,但如果我的想法是正确的,我需要一些确认。

请告诉我这是否正确:

1.) 1 个数据库实体类型需要 3 个不同的主题。一个用于将实体类型插入 table 的主题,一个用于更新相同的实体类型,一个用于删除。还需要 3 个不同的接收器连接器,每个都有不同的配置。

2.) 命令决定由配置属性决定:

是这样做的吗?

在提到的源代码中class有一个代码位

 for (Record<T> record : swapList) {
                String action = record.getProperties().get(ACTION);
                if (action == null) {
                    action = INSERT;
                }
                switch (action) {
                    case DELETE: ...
                    case UPDATE: ...

但没有提到在哪里以及如何设置记录的 ACTION 属性...

如果我只是以某种方式错过了相关文档,那么很高兴为我提供 link。 我知道这个配置文档页面:https://pulsar.apache.org/docs/en/io-jdbc-sink/#configuration 但是很模糊,没有真实的例子

此连接的文档至少可以说是缺乏,所以我会尽力解释它。从代码中可以看出,要采取的“行动”,例如插入、更新或删除在 Pulsar 消息本身中作为 属性 传递。

String action = record.getProperties().get(ACTION);

因此,为了控制 Sink 采取的操作,您需要将 属性 添加到您在 JDBC Sink 连接器的“源”主题中发布的消息中(除非您希望操作为 INSERT,这是默认操作)。

下面是一个示例,说明如何在消息属性中使用不同的操作发布消息:

producer.newMessage().value("1234").property("action", "delete").send();

现在当JDBC Sink连接器读取这条消息时,它将对主键值为“1234”的记录执行DELETE操作。