Artemis 消息路由
Artemis message routing
我正在使用 ActiveMQ Artemis 2.17.0,但我遇到了路由问题。
我已经实现了一个记录之前消息路由的插件,我看到一些消息从 topic.private.abc.task.V1
路由到 topic.abc.rawmessage.V1
。
没有转移设置,主题和队列由生产者和消费者动态创建。有一个将目标 clustered.*.>
映射到虚拟主题
的设置
private TransportConfiguration getServerTransportConfiguration() {
Map<String, Object> extraProps = new HashMap<>();
extraProps.put("virtualTopicConsumerWildcards", "clustered.*.>;2");
Map<String, Object> params = new HashMap<>();
params.put("scheme", "tcp");
params.put("port", port);
params.put("host", hostname);
return new TransportConfiguration("org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory", params, "netty-acceptor", extraProps);
}
topic.private.abc.task.V1
和 topic.abc.rawmessage.V1
都是有效的主题,但不应将它们链接起来。
什么可以解释这种行为?
插件代码如下:
@Override
public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException {
Map<String, Object> map = new HashMap<>();
map.put("RoutingContext", new RoutingContextLogView(context));
logger.info(mapper.writeValueAsString(map));
ActiveMQServerPlugin.super.beforeMessageRoute(message, context, direct, rejectDuplicates);
}
public class RoutingContextLogView {
private RoutingContext routingContext;
public RoutingContextLogView(RoutingContext routingContext) {
this.routingContext = routingContext;
}
public String getAddress() {
return routingContext.getAddress() != null ? routingContext.getAddress().toString() : null;
}
public String getPreviousAddress() {
return routingContext.getPreviousAddress() != null ? routingContext.getPreviousAddress().toString() : null;
}
public String getRoutingType() {
return routingContext.getRoutingType() != null ? routingContext.getRoutingType().name() : null;
}
public String getPreviousRoutingType() {
return routingContext.getPreviousRoutingType() != null ? routingContext.getPreviousRoutingType().name() : null;
}
}
尽管日志记录很奇怪,但消息之后的流程似乎没问题(即消息生成到 topic.abc.rawmessage.V1
并从 topic.abc.rawmessage.V1
消耗)。我只是在徘徊为什么会有消息路由以及为什么RoutingContext
中的previousAddress
是错误的。
代理内部使用的RoutingContext
对象可重用。这样做是出于性能原因,以避免无论如何都必须为每个路由操作重新创建 RoutingContext
。可能有人会猜到,路由消息是代理中非常常见的操作,因此尽可能对其进行优化是值得的。重用 RoutingContext
意味着创建和丢弃的对象更少,这意味着需要清理的垃圾更少,这意味着代理的暂停更少,整体性能更好。
这里的 previousAddress
不同于当前邮件将被路由到的地址这一事实 不是 问题。这只是意味着上下文不会再用于此路由操作,因此将被清除。顾名思义,beforeMessageRoute
方法在 执行任何路由逻辑之前 被调用(例如清除 RoutingContext
)。如果您使用 afterMessageRoute
检查 RoutingContext
,那么您应该会看到它已被清除并填充了正确的详细信息。
消息“发送”和消息“路由”(两者都有插件挂钩)是相关但不同的操作。 “发送”消息以响应客户端操作。发送总是导致路由。但是,并非所有路由都是发送的结果。由于不涉及发送的内部代理操作可以路由消息(例如,在集群中移动消息、使消息过期、取消无法传递到死信地址的消息、使用 divert 等) .
我会提醒您不要检查内部代理状态(这可能是微妙的和细微差别的)并在其他一切都表明代理正常运行时假设存在问题。在这种情况下,您说您“面临路由问题”并且“一些消息从 topic.private.abc.task.V1
路由到 topic.abc.rawmessage.V1
”,而实际上没有路由问题,消息是 不是 实际上是从 topic.private.abc.task.V1
路由到 topic.abc.rawmessage.V1
。据我所知,实际上一切都在正常运行。
我正在使用 ActiveMQ Artemis 2.17.0,但我遇到了路由问题。
我已经实现了一个记录之前消息路由的插件,我看到一些消息从 topic.private.abc.task.V1
路由到 topic.abc.rawmessage.V1
。
没有转移设置,主题和队列由生产者和消费者动态创建。有一个将目标 clustered.*.>
映射到虚拟主题
private TransportConfiguration getServerTransportConfiguration() {
Map<String, Object> extraProps = new HashMap<>();
extraProps.put("virtualTopicConsumerWildcards", "clustered.*.>;2");
Map<String, Object> params = new HashMap<>();
params.put("scheme", "tcp");
params.put("port", port);
params.put("host", hostname);
return new TransportConfiguration("org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory", params, "netty-acceptor", extraProps);
}
topic.private.abc.task.V1
和 topic.abc.rawmessage.V1
都是有效的主题,但不应将它们链接起来。
什么可以解释这种行为?
插件代码如下:
@Override
public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException {
Map<String, Object> map = new HashMap<>();
map.put("RoutingContext", new RoutingContextLogView(context));
logger.info(mapper.writeValueAsString(map));
ActiveMQServerPlugin.super.beforeMessageRoute(message, context, direct, rejectDuplicates);
}
public class RoutingContextLogView {
private RoutingContext routingContext;
public RoutingContextLogView(RoutingContext routingContext) {
this.routingContext = routingContext;
}
public String getAddress() {
return routingContext.getAddress() != null ? routingContext.getAddress().toString() : null;
}
public String getPreviousAddress() {
return routingContext.getPreviousAddress() != null ? routingContext.getPreviousAddress().toString() : null;
}
public String getRoutingType() {
return routingContext.getRoutingType() != null ? routingContext.getRoutingType().name() : null;
}
public String getPreviousRoutingType() {
return routingContext.getPreviousRoutingType() != null ? routingContext.getPreviousRoutingType().name() : null;
}
}
尽管日志记录很奇怪,但消息之后的流程似乎没问题(即消息生成到 topic.abc.rawmessage.V1
并从 topic.abc.rawmessage.V1
消耗)。我只是在徘徊为什么会有消息路由以及为什么RoutingContext
中的previousAddress
是错误的。
代理内部使用的RoutingContext
对象可重用。这样做是出于性能原因,以避免无论如何都必须为每个路由操作重新创建 RoutingContext
。可能有人会猜到,路由消息是代理中非常常见的操作,因此尽可能对其进行优化是值得的。重用 RoutingContext
意味着创建和丢弃的对象更少,这意味着需要清理的垃圾更少,这意味着代理的暂停更少,整体性能更好。
这里的 previousAddress
不同于当前邮件将被路由到的地址这一事实 不是 问题。这只是意味着上下文不会再用于此路由操作,因此将被清除。顾名思义,beforeMessageRoute
方法在 执行任何路由逻辑之前 被调用(例如清除 RoutingContext
)。如果您使用 afterMessageRoute
检查 RoutingContext
,那么您应该会看到它已被清除并填充了正确的详细信息。
消息“发送”和消息“路由”(两者都有插件挂钩)是相关但不同的操作。 “发送”消息以响应客户端操作。发送总是导致路由。但是,并非所有路由都是发送的结果。由于不涉及发送的内部代理操作可以路由消息(例如,在集群中移动消息、使消息过期、取消无法传递到死信地址的消息、使用 divert 等) .
我会提醒您不要检查内部代理状态(这可能是微妙的和细微差别的)并在其他一切都表明代理正常运行时假设存在问题。在这种情况下,您说您“面临路由问题”并且“一些消息从 topic.private.abc.task.V1
路由到 topic.abc.rawmessage.V1
”,而实际上没有路由问题,消息是 不是 实际上是从 topic.private.abc.task.V1
路由到 topic.abc.rawmessage.V1
。据我所知,实际上一切都在正常运行。