根据自定义属性从 ActiveMQ 队列中过滤/删除重复消息
Filter / drop duplicate messages from ActiveMQ queue based on custom properties
问题
当我的 Web 应用程序更新数据库中的项目时,它通过 Camel 将包含项目 ID 的消息发送到 ActiveMQ 队列,其消费者将更新外部服务 (Solr)。外部服务独立读取数据库。
我想要的是,如果 Web 应用程序发送另一条具有相同项目 ID 的消息,而旧消息仍在队列中,则丢弃新消息以避免 运行 Solr 更新两次。
在处理更新请求并且具有该项目 ID 的消息离开队列后,应再次接受具有相同 ID 的新请求。
有没有办法让这项工作开箱即用?我真的很想放弃 ActiveMQ 并简单地将更新请求队列实现为具有唯一约束的数据库 table,按时间戳或 运行 插入 ID 排序。
到目前为止我尝试了什么
我已阅读 Whosebug 上的 this and this 页面。这些是那里提到的解决方案:
Camel 中的幂等消费者:在这里我可以指定一个表达式来定义什么构成重复项,但这也会阻止所有未来发送相同消息的尝试,即更新相同的项目。我只希望新的更新请求在它们仍在队列中时被丢弃。
"ActiveMQ already does duplicate checks, look at auditDepth
!": 好吧,这看起来是一个好的开始,绝对最接近我想要的,但是它根据我无法设置的消息 ID 来确定相等性。所以 或者 我找到了一种方法让 ActiveMQ 以某种方式为这个队列生成消息 ID 或者 我找到了一种方法来制作审计内容查看我的项目 ID 字段而不是消息 ID。 (一个 comment in my second link 甚至建议使用 "a well defined property you set on the header",但未能解释 如何 。)
编写自定义插件,如果传入消息与队列中已有消息匹配,则将传入消息重定向到死信队列。这似乎是迄今为止提供的最完整的解决方案,但对于我认为相当平凡的日常任务来说,它感觉太过分了。
PS: 我发现 another SO page 问了同样的事情却没有答案。
你想要的不是消息代理功能,跟着我重复,"A message broker is not a database, A message broker is not a database",根据需要重复。
经纪人的工作是可靠地从 A 点到 B 点获取消息。客户端通过消息选择器提供一些过滤功能,但这是最小的,主要用于只阻止单个客户端感兴趣的特定消息流向那里而不是其他客户可能负责处理的其他人。
如您所述,您的用例需要一个更有状态的以数据库为中心的解决方案。创建一个代理插件来遍历队列以检查消息是在重新发明轮子,并且如果队列深度很大则容易出错,因为 ActiveMQ 甚至可能不会根据内存限制为您分页所有消息。
问题
当我的 Web 应用程序更新数据库中的项目时,它通过 Camel 将包含项目 ID 的消息发送到 ActiveMQ 队列,其消费者将更新外部服务 (Solr)。外部服务独立读取数据库。
我想要的是,如果 Web 应用程序发送另一条具有相同项目 ID 的消息,而旧消息仍在队列中,则丢弃新消息以避免 运行 Solr 更新两次。
在处理更新请求并且具有该项目 ID 的消息离开队列后,应再次接受具有相同 ID 的新请求。
有没有办法让这项工作开箱即用?我真的很想放弃 ActiveMQ 并简单地将更新请求队列实现为具有唯一约束的数据库 table,按时间戳或 运行 插入 ID 排序。
到目前为止我尝试了什么
我已阅读 Whosebug 上的 this and this 页面。这些是那里提到的解决方案:
Camel 中的幂等消费者:在这里我可以指定一个表达式来定义什么构成重复项,但这也会阻止所有未来发送相同消息的尝试,即更新相同的项目。我只希望新的更新请求在它们仍在队列中时被丢弃。
"ActiveMQ already does duplicate checks, look at
auditDepth
!": 好吧,这看起来是一个好的开始,绝对最接近我想要的,但是它根据我无法设置的消息 ID 来确定相等性。所以 或者 我找到了一种方法让 ActiveMQ 以某种方式为这个队列生成消息 ID 或者 我找到了一种方法来制作审计内容查看我的项目 ID 字段而不是消息 ID。 (一个 comment in my second link 甚至建议使用 "a well defined property you set on the header",但未能解释 如何 。)编写自定义插件,如果传入消息与队列中已有消息匹配,则将传入消息重定向到死信队列。这似乎是迄今为止提供的最完整的解决方案,但对于我认为相当平凡的日常任务来说,它感觉太过分了。
PS: 我发现 another SO page 问了同样的事情却没有答案。
你想要的不是消息代理功能,跟着我重复,"A message broker is not a database, A message broker is not a database",根据需要重复。
经纪人的工作是可靠地从 A 点到 B 点获取消息。客户端通过消息选择器提供一些过滤功能,但这是最小的,主要用于只阻止单个客户端感兴趣的特定消息流向那里而不是其他客户可能负责处理的其他人。
如您所述,您的用例需要一个更有状态的以数据库为中心的解决方案。创建一个代理插件来遍历队列以检查消息是在重新发明轮子,并且如果队列深度很大则容易出错,因为 ActiveMQ 甚至可能不会根据内存限制为您分页所有消息。