在 CompositeQueues 中处理通配符 - ActiveMQ xml 配置文件
Handling wildcards in CompositeQueues - ActiveMQ xml configuration file
我想就 ActiveMQ 的主题征求意见。
我正在使用 ActiveMQ 5.15。我正在尝试修改 xml 配置文件,以使用转发到另一个 queue/topic 的 CompositeQueues 添加虚拟目标。来自 ActiveMQ documentation for this component,架构如下:
<compositeQueue name="IncomingOrders">
<forwardTo>
<topic physicalName="Notifications" />
</forwardTo>
</compositeQueue>
我已经能够转发来自现有队列的消息,这些队列的名称类似于 request.typeA.classC 。但是,我有几个使用相同前缀 request.typeA. 的队列,因此我的目的是使用通配符,而不必为每个具有该前缀的现有队列定义复合队列, 更易于维护。
我需要这样的东西:
<compositeQueue name="request.typeA.>">
<forwardTo>
<topic physicalName="Notifications" />
</forwardTo>
</compositeQueue>
但是那段代码不起作用,我怀疑这是因为它根本不受支持(至少现在还不支持)。我已尝试在 physicalName 属性 中成功使用通配符,但在 name.
中却没有
我有一个先决条件是我必须保留使用相同前缀的不同队列(不能将它们合并为一个)。
我的另一个先决条件是我不能通过代码动态创建新的queues/topics(由于服务器权限)。这就是为什么我有兴趣修改xml配置文件。
所以我想知道你们中是否有人知道是否可以在 name 属性 中使用通配符(我没有在文档中阅读任何证据),如果是这样,我该怎么做。如果您确定无法使用当前的 ActiveMQ 版本执行此操作,我将感谢您确认。
我也将不胜感激 alternatives/advice 您可以提出与我意图相同的建议,并满足我之前提到的先决条件。我也读过 Mirrored Queues,但是这是一个影响所有现有队列的设置(我只对其中的一小部分感兴趣)并且可能会对性能产生相当大的影响。
非常感谢您抽出宝贵时间并致以最诚挚的问候。
我使用的是 5.15.11,我看到通配符适用于复合队列。这是我的复合队列配置。
<compositeQueue name="email.>">
<forwardTo>
<queue physicalName="MY_MAIN_QUEUE" />
</forwardTo>
</compositeQueue>
最后我找到了一个变通方法,允许我在给定前缀的情况下仅为队列的一个子集创建 MirroredQueues。
我所做的是创建自己的 DestinationInterceptor,以便仅为我感兴趣的那些队列创建一个镜像队列,并排除其余的(因为默认 MirroredQueue 实现镜像在系统)。
我是怎么做到的。我将 MirroredQueue.java class 实现从库复制到一个名为 CustomMirroredQueue 的新 class 并向 class 添加了一个名为 mirroring[=28= 的新属性].我从接口 DestinationInterceptor 修改了 intercept(final Destination destination) 实现,考虑到 if 语句中的这个新属性(我创建了一个isPrefixMirrored):
的辅助方法
/*
* This method is responsible for intercepting all the queues/topics that are created in the system.
* In this particular case we are interested only in the queues, in order we can mirror *some* of them and get
* a copy of the messages that are sent to them (With the topics this mirroring is not necessary since we would just
* subscribe to that topic for receiving the same message).
* */
public Destination intercept(final Destination destination) {
if (destination.getActiveMQDestination().isQueue()) {
if (isPrefixMirrored(destination) && (!destination.getActiveMQDestination().isTemporary() || brokerService.isUseTempMirroredQueues())) {
try {
//we create a mirrored queue for that destination
final Destination mirrorDestination = getMirrorDestination(destination);
if (mirrorDestination != null) {
return new DestinationFilter(destination) {
public void send(ProducerBrokerExchange context, Message message) throws Exception {
message.setDestination(mirrorDestination.getActiveMQDestination());
mirrorDestination.send(context, message);
if (isCopyMessage()) {
message = message.copy();
}
message.setDestination(destination.getActiveMQDestination());
message.setMemoryUsage(null); // set this to null so that it will use the queue memoryUsage instance instead of the topic.
super.send(context, message);
}
};
}
} catch (Exception e) {
LOG.error("Failed to lookup the mirror destination for: {}", destination, e);
}
}
}
return destination;
}
/*
* @returns true if the destination passed as parameter will be mirrored. If the value for the attribute "mirroring"
* is an empty string "" then all the queues will be mirrored by default.
**/
private boolean isPrefixMirrored(Destination destination) {
if (mirroring.equals("")) {
return true;
}
List<String> mirroredQueuesPrefixes = Arrays.asList(mirroring.split(","));
final String destinationPhysicalName = destination.getActiveMQDestination().getPhysicalName();
return mirroredQueuesPrefixes.stream().map(String::trim).anyMatch(destinationPhysicalName::contains);
}
我生成了一个仅包含此自定义 class 和依赖项(为此使用 gradle)的 .jar,并添加到 ActimeMQ 代理安装的 lib 文件夹中。然后我能够在 ActiveMQ 配置 XML 文件中将此标记用作 bean:
<destinationInterceptors>
<bean xmlns="http://www.springframework.org/schema/beans" class="package.CustomMirroredQueue" id="CustomMirroredQueue">
<property name="copyMessage" value="true"/>
<property name="postfix" value=""/>
<property name="prefix" value="mirror."/>
<property name="mirroring" value="PREFIX_1, QUEUE2, QUEUE3"/>
</bean>
</destinationInterceptors>
class 必须包含来自 ActiveMQ 库文件夹的 class 的路径。属性 copyMessage、postfix 和 prefix 来自默认的 MirroredQueue 实现。并且镜像 属性 将是一个列表,其中包含所有要镜像的特定 queues/prefixes(并且只有那些)。
我想就 ActiveMQ 的主题征求意见。
我正在使用 ActiveMQ 5.15。我正在尝试修改 xml 配置文件,以使用转发到另一个 queue/topic 的 CompositeQueues 添加虚拟目标。来自 ActiveMQ documentation for this component,架构如下:
<compositeQueue name="IncomingOrders">
<forwardTo>
<topic physicalName="Notifications" />
</forwardTo>
</compositeQueue>
我已经能够转发来自现有队列的消息,这些队列的名称类似于 request.typeA.classC 。但是,我有几个使用相同前缀 request.typeA. 的队列,因此我的目的是使用通配符,而不必为每个具有该前缀的现有队列定义复合队列, 更易于维护。
我需要这样的东西:
<compositeQueue name="request.typeA.>">
<forwardTo>
<topic physicalName="Notifications" />
</forwardTo>
</compositeQueue>
但是那段代码不起作用,我怀疑这是因为它根本不受支持(至少现在还不支持)。我已尝试在 physicalName 属性 中成功使用通配符,但在 name.
中却没有我有一个先决条件是我必须保留使用相同前缀的不同队列(不能将它们合并为一个)。
我的另一个先决条件是我不能通过代码动态创建新的queues/topics(由于服务器权限)。这就是为什么我有兴趣修改xml配置文件。
所以我想知道你们中是否有人知道是否可以在 name 属性 中使用通配符(我没有在文档中阅读任何证据),如果是这样,我该怎么做。如果您确定无法使用当前的 ActiveMQ 版本执行此操作,我将感谢您确认。
我也将不胜感激 alternatives/advice 您可以提出与我意图相同的建议,并满足我之前提到的先决条件。我也读过 Mirrored Queues,但是这是一个影响所有现有队列的设置(我只对其中的一小部分感兴趣)并且可能会对性能产生相当大的影响。
非常感谢您抽出宝贵时间并致以最诚挚的问候。
我使用的是 5.15.11,我看到通配符适用于复合队列。这是我的复合队列配置。
<compositeQueue name="email.>">
<forwardTo>
<queue physicalName="MY_MAIN_QUEUE" />
</forwardTo>
</compositeQueue>
最后我找到了一个变通方法,允许我在给定前缀的情况下仅为队列的一个子集创建 MirroredQueues。
我所做的是创建自己的 DestinationInterceptor,以便仅为我感兴趣的那些队列创建一个镜像队列,并排除其余的(因为默认 MirroredQueue 实现镜像在系统)。
我是怎么做到的。我将 MirroredQueue.java class 实现从库复制到一个名为 CustomMirroredQueue 的新 class 并向 class 添加了一个名为 mirroring[=28= 的新属性].我从接口 DestinationInterceptor 修改了 intercept(final Destination destination) 实现,考虑到 if 语句中的这个新属性(我创建了一个isPrefixMirrored):
的辅助方法/*
* This method is responsible for intercepting all the queues/topics that are created in the system.
* In this particular case we are interested only in the queues, in order we can mirror *some* of them and get
* a copy of the messages that are sent to them (With the topics this mirroring is not necessary since we would just
* subscribe to that topic for receiving the same message).
* */
public Destination intercept(final Destination destination) {
if (destination.getActiveMQDestination().isQueue()) {
if (isPrefixMirrored(destination) && (!destination.getActiveMQDestination().isTemporary() || brokerService.isUseTempMirroredQueues())) {
try {
//we create a mirrored queue for that destination
final Destination mirrorDestination = getMirrorDestination(destination);
if (mirrorDestination != null) {
return new DestinationFilter(destination) {
public void send(ProducerBrokerExchange context, Message message) throws Exception {
message.setDestination(mirrorDestination.getActiveMQDestination());
mirrorDestination.send(context, message);
if (isCopyMessage()) {
message = message.copy();
}
message.setDestination(destination.getActiveMQDestination());
message.setMemoryUsage(null); // set this to null so that it will use the queue memoryUsage instance instead of the topic.
super.send(context, message);
}
};
}
} catch (Exception e) {
LOG.error("Failed to lookup the mirror destination for: {}", destination, e);
}
}
}
return destination;
}
/*
* @returns true if the destination passed as parameter will be mirrored. If the value for the attribute "mirroring"
* is an empty string "" then all the queues will be mirrored by default.
**/
private boolean isPrefixMirrored(Destination destination) {
if (mirroring.equals("")) {
return true;
}
List<String> mirroredQueuesPrefixes = Arrays.asList(mirroring.split(","));
final String destinationPhysicalName = destination.getActiveMQDestination().getPhysicalName();
return mirroredQueuesPrefixes.stream().map(String::trim).anyMatch(destinationPhysicalName::contains);
}
我生成了一个仅包含此自定义 class 和依赖项(为此使用 gradle)的 .jar,并添加到 ActimeMQ 代理安装的 lib 文件夹中。然后我能够在 ActiveMQ 配置 XML 文件中将此标记用作 bean:
<destinationInterceptors>
<bean xmlns="http://www.springframework.org/schema/beans" class="package.CustomMirroredQueue" id="CustomMirroredQueue">
<property name="copyMessage" value="true"/>
<property name="postfix" value=""/>
<property name="prefix" value="mirror."/>
<property name="mirroring" value="PREFIX_1, QUEUE2, QUEUE3"/>
</bean>
</destinationInterceptors>
class 必须包含来自 ActiveMQ 库文件夹的 class 的路径。属性 copyMessage、postfix 和 prefix 来自默认的 MirroredQueue 实现。并且镜像 属性 将是一个列表,其中包含所有要镜像的特定 queues/prefixes(并且只有那些)。