Apache Pulsar JDBC 接收器:insert/update/delete 之间的区别
Apache Pulsar JDBC sink: differentiation between insert/update/delete
我目前正在检查 Pulsar JDBC 接收器,因为我们计划很快使用 PostgresSQL 接收器。
现在,有人提到 JDBC sinks 支持 insert/update/delete 操作,但我找不到任何关于 sink 连接器如何实际决定执行什么的文档(是插入、更新还是更新)为新活动删除?)
浏览源代码并浏览 JdbcAbstractSink.java 我想我现在可能有了一个想法,但如果我的想法是正确的,我需要一些确认。
请告诉我这是否正确:
1.) 1 个数据库实体类型需要 3 个不同的主题。一个用于将实体类型插入 table 的主题,一个用于更新相同的实体类型,一个用于删除。还需要 3 个不同的接收器连接器,每个都有不同的配置。
2.) 命令决定由配置属性决定:
如果 nonKey 和 key 属性都缺失 --> 执行插入
如果同时提供了 nonKey 和 key 道具 --> 执行更新,如
更新非键列,其中键列 = event.value
如果只提供关键列 -->
删除关键列 = event.value
是这样做的吗?
在提到的源代码中class有一个代码位
for (Record<T> record : swapList) {
String action = record.getProperties().get(ACTION);
if (action == null) {
action = INSERT;
}
switch (action) {
case DELETE: ...
case UPDATE: ...
但没有提到在哪里以及如何设置记录的 ACTION 属性...
如果我只是以某种方式错过了相关文档,那么很高兴为我提供 link。
我知道这个配置文档页面:https://pulsar.apache.org/docs/en/io-jdbc-sink/#configuration
但是很模糊,没有真实的例子
此连接的文档至少可以说是缺乏,所以我会尽力解释它。从代码中可以看出,要采取的“行动”,例如插入、更新或删除在 Pulsar 消息本身中作为 属性 传递。
String action = record.getProperties().get(ACTION);
因此,为了控制 Sink 采取的操作,您需要将 属性 添加到您在 JDBC Sink 连接器的“源”主题中发布的消息中(除非您希望操作为 INSERT,这是默认操作)。
下面是一个示例,说明如何在消息属性中使用不同的操作发布消息:
producer.newMessage().value("1234").property("action", "delete").send();
现在当JDBC Sink连接器读取这条消息时,它将对主键值为“1234”的记录执行DELETE操作。
我目前正在检查 Pulsar JDBC 接收器,因为我们计划很快使用 PostgresSQL 接收器。 现在,有人提到 JDBC sinks 支持 insert/update/delete 操作,但我找不到任何关于 sink 连接器如何实际决定执行什么的文档(是插入、更新还是更新)为新活动删除?)
浏览源代码并浏览 JdbcAbstractSink.java 我想我现在可能有了一个想法,但如果我的想法是正确的,我需要一些确认。
请告诉我这是否正确:
1.) 1 个数据库实体类型需要 3 个不同的主题。一个用于将实体类型插入 table 的主题,一个用于更新相同的实体类型,一个用于删除。还需要 3 个不同的接收器连接器,每个都有不同的配置。
2.) 命令决定由配置属性决定:
如果 nonKey 和 key 属性都缺失 --> 执行插入
如果同时提供了 nonKey 和 key 道具 --> 执行更新,如
更新非键列,其中键列 = event.value
如果只提供关键列 -->
删除关键列 = event.value
是这样做的吗?
在提到的源代码中class有一个代码位
for (Record<T> record : swapList) {
String action = record.getProperties().get(ACTION);
if (action == null) {
action = INSERT;
}
switch (action) {
case DELETE: ...
case UPDATE: ...
但没有提到在哪里以及如何设置记录的 ACTION 属性...
如果我只是以某种方式错过了相关文档,那么很高兴为我提供 link。 我知道这个配置文档页面:https://pulsar.apache.org/docs/en/io-jdbc-sink/#configuration 但是很模糊,没有真实的例子
此连接的文档至少可以说是缺乏,所以我会尽力解释它。从代码中可以看出,要采取的“行动”,例如插入、更新或删除在 Pulsar 消息本身中作为 属性 传递。
String action = record.getProperties().get(ACTION);
因此,为了控制 Sink 采取的操作,您需要将 属性 添加到您在 JDBC Sink 连接器的“源”主题中发布的消息中(除非您希望操作为 INSERT,这是默认操作)。
下面是一个示例,说明如何在消息属性中使用不同的操作发布消息:
producer.newMessage().value("1234").property("action", "delete").send();
现在当JDBC Sink连接器读取这条消息时,它将对主键值为“1234”的记录执行DELETE操作。