使用通配符语法的消费者
Consumer with wildcard syntax
我正在使用 ActiveMQ Artemis 2.17.0。我想创建一个使用通配符语法的使用者,它会使用来自多个地址的消息。我写了下一个消费者。但它从地址 news.europe.#
消耗,而不是从匹配通配符语法(news.europe.sport
、news.europe.politics
等)的地址消耗。我做错了什么?
场景:
- 启动 Artemis 代理
- 在
news.europe.sport
、news.europe.politics
中与生产者发送 2 条消息
- 启动消费者
预期行为:
- 消费者收到 2 条消息
观察到的行为
- 消息留在队列中
- 地址
news.europe.#
有一个活跃的消费者
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import javax.jms.*;
public class ArtemisConsumer {
public static void main(String[] args) throws JMSException, InterruptedException {
String brokerURL = "tcp://localhost:61716";
String queueName = "news.europe.#";
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
connectionFactory.setUser("user");
connectionFactory.setPassword("pass");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination destination = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new ConsumerMessageListener("Consumer"));
Thread.sleep(60000);
session.commit();
session.close();
connection.close();
}
}
broker.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration xmlns="urn:activemq">
<core xmlns="urn:activemq:core">
<name>QMA</name>
<max-disk-usage>100</max-disk-usage>
<configuration-file-refresh-period>9223372036854775807</configuration-file-refresh-period>
<bindings-directory>${ARTEMISMQ_DATA}/bindings</bindings-directory>
<journal-directory>${ARTEMISMQ_DATA}/journal</journal-directory>
<large-messages-directory>${ARTEMISMQ_DATA}/largemessages</large-messages-directory>
<paging-directory>.${ARTEMISMQ_DATA}/paging</paging-directory>
<cluster-user>user</cluster-user>
<cluster-password>password</cluster-password>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">tcp://0.0.0.0:61716</acceptor>
<acceptor name="in-vm">vm://0</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission roles="user-group" type="createNonDurableQueue"/>
<permission roles="user-group" type="deleteNonDurableQueue"/>
<permission roles="user-group" type="createDurableQueue"/>
<permission roles="user-group" type="deleteDurableQueue"/>
<permission roles="user-group" type="createAddress"/>
<permission roles="user-group" type="deleteAddress"/>
<permission roles="user-group" type="consume"/>
<permission roles="user-group" type="browse"/>
<permission roles="user-group" type="send"/>
<permission roles="user-group" type="manage"/>
</security-setting>
</security-settings>
</core>
</configuration>
制作人
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import javax.jms.*;
public class ArtemisProducer {
public static void main(final String[] args) throws Exception {
String brokerURL = "tcp://localhost:61716";
ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(brokerURL);
connFactory.setUser("user");
connFactory.setPassword("password");
final Connection conn = connFactory.createConnection();
conn.start();
final Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
final Destination dest = sess.createQueue("news.europe.politics");
final MessageProducer prod = sess.createProducer(dest);
final Message msg = sess.createTextMessage("Sample message");
prod.send(msg);
sess.commit();
conn.close();
}
}
您看到了预期的行为。这是因为您使用的功能是 wildcard address。简而言之,发送到匹配地址的任何消息也将被路由到通配符地址(以及根据其语义(即任播或多播)绑定到该地址的任何队列)。
在您的情况下,当您发送邮件时尚未创建通配符地址,因此无法将这些邮件路由到它。
FWIW,您可以在 topic-hierarchies
示例中看到此功能的实际效果,该示例随代理一起提供在 examples/features/standard
目录中。
我正在使用 ActiveMQ Artemis 2.17.0。我想创建一个使用通配符语法的使用者,它会使用来自多个地址的消息。我写了下一个消费者。但它从地址 news.europe.#
消耗,而不是从匹配通配符语法(news.europe.sport
、news.europe.politics
等)的地址消耗。我做错了什么?
场景:
- 启动 Artemis 代理
- 在
news.europe.sport
、news.europe.politics
中与生产者发送 2 条消息
- 启动消费者
预期行为:
- 消费者收到 2 条消息
观察到的行为
- 消息留在队列中
- 地址
news.europe.#
有一个活跃的消费者
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import javax.jms.*;
public class ArtemisConsumer {
public static void main(String[] args) throws JMSException, InterruptedException {
String brokerURL = "tcp://localhost:61716";
String queueName = "news.europe.#";
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
connectionFactory.setUser("user");
connectionFactory.setPassword("pass");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination destination = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new ConsumerMessageListener("Consumer"));
Thread.sleep(60000);
session.commit();
session.close();
connection.close();
}
}
broker.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration xmlns="urn:activemq">
<core xmlns="urn:activemq:core">
<name>QMA</name>
<max-disk-usage>100</max-disk-usage>
<configuration-file-refresh-period>9223372036854775807</configuration-file-refresh-period>
<bindings-directory>${ARTEMISMQ_DATA}/bindings</bindings-directory>
<journal-directory>${ARTEMISMQ_DATA}/journal</journal-directory>
<large-messages-directory>${ARTEMISMQ_DATA}/largemessages</large-messages-directory>
<paging-directory>.${ARTEMISMQ_DATA}/paging</paging-directory>
<cluster-user>user</cluster-user>
<cluster-password>password</cluster-password>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">tcp://0.0.0.0:61716</acceptor>
<acceptor name="in-vm">vm://0</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission roles="user-group" type="createNonDurableQueue"/>
<permission roles="user-group" type="deleteNonDurableQueue"/>
<permission roles="user-group" type="createDurableQueue"/>
<permission roles="user-group" type="deleteDurableQueue"/>
<permission roles="user-group" type="createAddress"/>
<permission roles="user-group" type="deleteAddress"/>
<permission roles="user-group" type="consume"/>
<permission roles="user-group" type="browse"/>
<permission roles="user-group" type="send"/>
<permission roles="user-group" type="manage"/>
</security-setting>
</security-settings>
</core>
</configuration>
制作人
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import javax.jms.*;
public class ArtemisProducer {
public static void main(final String[] args) throws Exception {
String brokerURL = "tcp://localhost:61716";
ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(brokerURL);
connFactory.setUser("user");
connFactory.setPassword("password");
final Connection conn = connFactory.createConnection();
conn.start();
final Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
final Destination dest = sess.createQueue("news.europe.politics");
final MessageProducer prod = sess.createProducer(dest);
final Message msg = sess.createTextMessage("Sample message");
prod.send(msg);
sess.commit();
conn.close();
}
}
您看到了预期的行为。这是因为您使用的功能是 wildcard address。简而言之,发送到匹配地址的任何消息也将被路由到通配符地址(以及根据其语义(即任播或多播)绑定到该地址的任何队列)。
在您的情况下,当您发送邮件时尚未创建通配符地址,因此无法将这些邮件路由到它。
FWIW,您可以在 topic-hierarchies
示例中看到此功能的实际效果,该示例随代理一起提供在 examples/features/standard
目录中。