如何让应用程序的一个实例只接收每个 AMQP 主题消息(消费者组行为)
How can I make only one instance of an application receive each AMQP topic message (consumer group behaviour)
我正在使用 Apache Camel's AMQP 组件来收听来自 ActiveMQ Artemis 主题的消息。
此应用程序 运行 在具有两个副本的 Kubernetes 上。
我已经为每个 pod 配置了一个唯一的 clientId 和一个通用订阅名称的持久订阅:
<route autoStartup=true" id="myRoute">
<from id="_amqp_topic" uri="amqp:topic:xxx?connectionFactory=#amqpCF&disableReplyTo=true&transacted=false&subscriptionDurable=true&clientId={{container-id}}&durableSubscriptionName=eventSubscription"/>
<log loggingLevel="INFO" message="Received event: ${body}"/>
...
</route>
问题是 pods 都收到了消息,而实际上只有其中一个应该。我正在尝试实现类似于 Kafka's consumer groups 的效果,其中只有该组的一个成员收到每条消息。
如果您希望只有一个 订阅者接收消息,那么订阅者必须共享 相同的订阅。因此,您需要:
- 在您的
_amqp_topic
uri
中设置 subscriptionShared=true
- 使用相同 客户端 ID(即不使用
{{container-id}}
)
例如:
<route autoStartup=true" id="myRoute">
<from id="_amqp_topic" uri="amqp:topic:xxx?connectionFactory=#amqpCF&disableReplyTo=true&transacted=false&subscriptionDurable=true&clientId=myClientID&durableSubscriptionName=eventSubscription&subscriptionShared=true"/>
<log loggingLevel="INFO" message="Received event: ${body}"/>
...
</route>
另一种选择是简单地使用 queue 而不是主题,例如:
<route autoStartup=true" id="myRoute">
<from id="_amqp_queue" uri="amqp:queue:xxx?connectionFactory=#amqpCF&disableReplyTo=true&transacted=false"/>
<log loggingLevel="INFO" message="Received event: ${body}"/>
...
</route>
我正在使用 Apache Camel's AMQP 组件来收听来自 ActiveMQ Artemis 主题的消息。
此应用程序 运行 在具有两个副本的 Kubernetes 上。
我已经为每个 pod 配置了一个唯一的 clientId 和一个通用订阅名称的持久订阅:
<route autoStartup=true" id="myRoute">
<from id="_amqp_topic" uri="amqp:topic:xxx?connectionFactory=#amqpCF&disableReplyTo=true&transacted=false&subscriptionDurable=true&clientId={{container-id}}&durableSubscriptionName=eventSubscription"/>
<log loggingLevel="INFO" message="Received event: ${body}"/>
...
</route>
问题是 pods 都收到了消息,而实际上只有其中一个应该。我正在尝试实现类似于 Kafka's consumer groups 的效果,其中只有该组的一个成员收到每条消息。
如果您希望只有一个 订阅者接收消息,那么订阅者必须共享 相同的订阅。因此,您需要:
- 在您的
_amqp_topic
uri
中设置 - 使用相同 客户端 ID(即不使用
{{container-id}}
)
subscriptionShared=true
例如:
<route autoStartup=true" id="myRoute">
<from id="_amqp_topic" uri="amqp:topic:xxx?connectionFactory=#amqpCF&disableReplyTo=true&transacted=false&subscriptionDurable=true&clientId=myClientID&durableSubscriptionName=eventSubscription&subscriptionShared=true"/>
<log loggingLevel="INFO" message="Received event: ${body}"/>
...
</route>
另一种选择是简单地使用 queue 而不是主题,例如:
<route autoStartup=true" id="myRoute">
<from id="_amqp_queue" uri="amqp:queue:xxx?connectionFactory=#amqpCF&disableReplyTo=true&transacted=false"/>
<log loggingLevel="INFO" message="Received event: ${body}"/>
...
</route>