如何重命名 debezium mongodb 连接器发件箱消息的 ID header
How to rename the id header of a debezium mongodb connector outbox message
我正在尝试将 debezium 的发件箱事件路由器用于 mongodb。消费者是 spring 云流应用程序。我无法反序列化该消息,因为 spring 云希望消息 ID header 为 UUID,但它接收到 byte[]。我尝试了不同的反序列化器无济于事。我正在考虑重命名 id header 以跳过此 spring 云检查,或将其完全删除。我试过 ReplaceField SMT 但它似乎没有修改 header 字段。
在 spring 中还有解决这个问题的方法吗?
最初问题的解决方案是使用 DroHeaders SMT(https://docs.confluent.io/platform/current/connect/transforms/dropheaders.html)。
这将删除由 debezium 填充的 ID header。
但是正如 Oleg Zhurakousky 提到的,在没有 @StreamListener 的情况下迁移到更新版本的 spring-cloud-stream 可以解决根本问题。
显然 @StreamListener 检查消息是否有一个 id header 并且它要求是 Uuid 类型。通过使用使用 spring-cloud-stream 的新功能方式,id header 实际上被新生成的值覆盖。这意味着 debezium 填充的值(发件箱 table 中的 id 列)将被忽略。我想如果您需要检查是否重复交付,也许最好创建自己的 header 而不是使用 id。我不知道spring-cloud-stream是否会为同一条消息重新投递生成相同的id。
还要记住,即使在 spring-cloud-stream 的较新版本中,如果您使用已弃用的 @StreamListener,也会遇到同样的问题。
我正在尝试将 debezium 的发件箱事件路由器用于 mongodb。消费者是 spring 云流应用程序。我无法反序列化该消息,因为 spring 云希望消息 ID header 为 UUID,但它接收到 byte[]。我尝试了不同的反序列化器无济于事。我正在考虑重命名 id header 以跳过此 spring 云检查,或将其完全删除。我试过 ReplaceField SMT 但它似乎没有修改 header 字段。
在 spring 中还有解决这个问题的方法吗?
最初问题的解决方案是使用 DroHeaders SMT(https://docs.confluent.io/platform/current/connect/transforms/dropheaders.html)。 这将删除由 debezium 填充的 ID header。
但是正如 Oleg Zhurakousky 提到的,在没有 @StreamListener 的情况下迁移到更新版本的 spring-cloud-stream 可以解决根本问题。 显然 @StreamListener 检查消息是否有一个 id header 并且它要求是 Uuid 类型。通过使用使用 spring-cloud-stream 的新功能方式,id header 实际上被新生成的值覆盖。这意味着 debezium 填充的值(发件箱 table 中的 id 列)将被忽略。我想如果您需要检查是否重复交付,也许最好创建自己的 header 而不是使用 id。我不知道spring-cloud-stream是否会为同一条消息重新投递生成相同的id。
还要记住,即使在 spring-cloud-stream 的较新版本中,如果您使用已弃用的 @StreamListener,也会遇到同样的问题。