如何使用 JMS 和 MDB 向特定用户和所有用户发送消息和通知?
How To send messages and notification To Particular User AND All Users Using JMS and MDB?
我使用 Wildfly 10.0.0 最终独立部署、EJB、Hibernate 开发了一个 Web 应用程序。我参考并创建了 MDB、MessageProducerBean (@RequestScoped)、Queue 和 连接工厂 .
当我从 MessageProducerBean 发送消息时,我可以在 MDB 中接收它。
我的问题是我的应用程序可以被许多用户访问。我需要向特定用户(例如:User-A)发送一些消息,并向所有用户发送一些消息。
我阅读了一些文章,如果我们使用 Queue 可以发送给一个客户端,如果我们使用 Topic 可以发送给多个客户端,但我对此没有任何清晰的认识。我的下一个问题是此消息仅在用户处于活动状态时才有效。
无论他们在线与否,我都需要随时发送消息。如果他们在登录后处于离线状态,则应该显示按摩。任何人都可以帮忙吗?
我将在下面总结我的问题。
1) 需要用户发送消息和通知。
2) 需要保留所有消息,即使用户处于离线状态。
这是我的代码
独立-full.xml
<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
<server name="default">
<security-setting name="#">
<role name="guest" delete-non-durable-queue="true" create-non-durable-queue="true" consume="true" send="true"/>
</security-setting>
<address-setting name="#" message-counter-history-day-limit="10" page-size-bytes="2097152" max-size-bytes="10485760" expiry-address="jms.queue.ExpiryQueue" dead-letter-address="jms.queue.DLQ"/>
<http-connector name="http-connector" endpoint="http-acceptor" socket-binding="http"/>
<http-connector name="http-connector-throughput" endpoint="http-acceptor-throughput" socket-binding="http">
<param name="batch-delay" value="50"/>
</http-connector>
<in-vm-connector name="in-vm" server-id="0"/>
<http-acceptor name="http-acceptor" http-listener="default"/>
<http-acceptor name="http-acceptor-throughput" http-listener="default">
<param name="batch-delay" value="50"/>
<param name="direct-deliver" value="false"/>
</http-acceptor>
<in-vm-acceptor name="in-vm" server-id="0"/>
<jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>
<jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>
<jms-queue name="testQueue" entries="queue/testQueue java:jboss/exported/jms/queue/testQueue"/>
<connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm"/>
<connection-factory name="RemoteConnectionFactory" entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector"/>
<pooled-connection-factory name="activemq-ra" transaction="xa" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm"/>
</server>
</subsystem>
MDB
@MessageDriven(mappedName = "queue/testQueue", activationConfig = {
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "queue/testQueue"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge") })
public class MessageDrivenBean implements MessageListener {
private final static Logger LOGGER = Logger.getLogger(MessageDrivenBean.class.toString());
public MessageDrivenBean() {
System.out.println("TextMDB.ctor, this=" + hashCode());
}
@Override
public void onMessage(Message rcvMessage) {
TextMessage msg = null;
try {
if (rcvMessage instanceof TextMessage) {
msg = (TextMessage) rcvMessage;
LOGGER.log(Level.INFO, "Received Message from topic: {0}", msg.getText());
} else {
LOGGER.log(Level.WARNING, "Message of wrong type: {0}", rcvMessage.getClass().getName());
}
} catch (JMSException e) {
throw new RuntimeException(e);
}
}}
MessageProducerBean
@JMSDestinationDefinition(name = "queue/testQueue",
interfaceName = "javax.jms.Queue",
destinationName = "queue/testQueue")
@Named("messageProducerBean")
@RequestScopedpublic class MessageProducerBean {
@Inject
private JMSContext context;
@Resource(mappedName = "queue/testQueue")
private Queue queue;
private final static Logger LOGGER = Logger.getLogger(MessageProducerBean.class.toString());
@Resource(mappedName = "ConnectionFactory")
private ConnectionFactory factory;
private String message;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public void sendMessage() {
try {
String text = "Message from producer: " + message;
context.createProducer().send(queue, text);
FacesMessage facesMessage
= new FacesMessage("Sent message: " + text);
FacesContext.getCurrentInstance().addMessage(null, facesMessage);
} catch (Throwable t) {
LOGGER.log(Level.SEVERE, "SenderBean.sendMessage: Exception: {0}", t.toString());
}
} }
提前致谢!
对于单个消费者,添加消息 属性 标识消费者并让消费者使用消息选择器仅接收其消息。
对于所有消费者,最好的解决方案是使用一个主题并让所有消费者订阅它。如果您了解所有用户,则可以为上述同一队列中的每个消费者发送单独的消息,但这绝不是最佳做法。
终于找到方法了。我在这里发帖是因为它可能对某人有帮助。
如果我总结一下我所做的。
首先我们需要一个wildfly的应用程序用户。为此,请转到 {wildflyHome}/bin 和 运行 ./add-user.sh .
将用户类型指定为应用程序(选项 b)。并提供用户名和密码。
供参考:link 1
在上面 link 执行第 1 步和第 2 步。
修改standalone-full.xml为
<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
<server name="default">
<security-setting name="#">
<role name="guest" delete-non-durable-queue="true" create-non-durable-queue="true" delete-durable-queue="true" create-durable-queue="true" consume="true" send="true"/>
</security-setting>
<address-setting name="#" message-counter-history-day-limit="10" page-size-bytes="2097152" max-size-bytes="10485760" expiry-address="jms.queue.ExpiryQueue" dead-letter-address="jms.queue.DLQ"/>
<http-connector name="http-connector" endpoint="http-acceptor" socket-binding="http"/>
<http-connector name="http-connector-throughput" endpoint="http-acceptor-throughput" socket-binding="http">
<param name="batch-delay" value="50"/>
</http-connector>
<in-vm-connector name="in-vm" server-id="0"/>
<http-acceptor name="http-acceptor" http-listener="default"/>
<http-acceptor name="http-acceptor-throughput" http-listener="default">
<param name="batch-delay" value="50"/>
<param name="direct-deliver" value="false"/>
</http-acceptor>
<in-vm-acceptor name="in-vm" server-id="0"/>
<jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>
<jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>
<jms-queue name="JMSQueue" entries="java:/jboss/exported/jms/queue/JMSQueue jms/queue/JMSQueue"/>
<jms-topic name="JMSTopic" entries="jms/topic/JMSTopic java:jboss/exported/jms/topic/JMSTopic"/>
<connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm"/>
<connection-factory name="RemoteConnectionFactory" entries="java:jboss/exported/jms/RemoteConnectionFactory jms/RemoteConnectionFactory" connectors="http-connector"/>
<connection-factory name="TopicConnectionFactory" entries="java:jboss/exported/jms/TopicConnectionFactory jms/TopicConnectionFactory" connectors="http-connector"/>
<pooled-connection-factory name="activemq-ra" transaction="xa" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm"/>
</server>
</subsystem>
在安全设置下的这个子系统中,我们允许删除和创建持久队列。
和我的javaclass发送和接收消息如下,
首先注入资源为
@Resource(mappedName = "jms/topic/EMSTopic")
private Topic topic;
@Resource(mappedName = "jms/TopicConnectionFactory")
private TopicConnectionFactory connectionFactory;
消息发送方法如下,
public void send() throws JMSException {
TopicConnection connection = connectionFactory.createTopicConnection("JMSUser", "jmsuser@123");
connection.setClientID("userA");
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(topic);
TextMessage outMessage = session.createTextMessage();
outMessage.setText("Hi User");
outMessage.setStringProperty("name", "usera");
messageProducer.send(outMessage);
System.out.println("message sent ");
connection.close();
}
在这个方法中,我通过提供我的应用程序用户名和密码来创建 TopicConnection。
对于连接,我设置了 clientId,对于消息,我设置了一个 stringProperty,当用户收到消息时,他们只需要接收他们的消息。
我的接收方法如下,
public void recieve() throws JMSException {
System.out.println("message reday to recieve ");
String selector = "name = 'usera'";
try (TopicConnection connection = connectionFactory.createTopicConnection("JMSUser", "jmsuser@123")) {
connection.setClientID("userA");
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "usera", selector, true);
connection.start();
TextMessage message = (TextMessage) subscriber.receive(2000);
if (message != null) {
System.out.println("printing ==> " + message.getText());
} else {
System.out.println("no messages");
}
connection.close();
}
}
在这个方法中,我创建了一个 durableTopicConsumer,用于在用户不活动时保留所有消息。
我将一个参数值作为选择器传递给每个只接收消息的用户。
如果您需要进一步了解此内容,请参阅 link 2
希望这会有所帮助。
我使用 Wildfly 10.0.0 最终独立部署、EJB、Hibernate 开发了一个 Web 应用程序。我参考并创建了 MDB、MessageProducerBean (@RequestScoped)、Queue 和 连接工厂 .
当我从 MessageProducerBean 发送消息时,我可以在 MDB 中接收它。
我的问题是我的应用程序可以被许多用户访问。我需要向特定用户(例如:User-A)发送一些消息,并向所有用户发送一些消息。
我阅读了一些文章,如果我们使用 Queue 可以发送给一个客户端,如果我们使用 Topic 可以发送给多个客户端,但我对此没有任何清晰的认识。我的下一个问题是此消息仅在用户处于活动状态时才有效。
无论他们在线与否,我都需要随时发送消息。如果他们在登录后处于离线状态,则应该显示按摩。任何人都可以帮忙吗? 我将在下面总结我的问题。
1) 需要用户发送消息和通知。
2) 需要保留所有消息,即使用户处于离线状态。
这是我的代码
独立-full.xml
<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
<server name="default">
<security-setting name="#">
<role name="guest" delete-non-durable-queue="true" create-non-durable-queue="true" consume="true" send="true"/>
</security-setting>
<address-setting name="#" message-counter-history-day-limit="10" page-size-bytes="2097152" max-size-bytes="10485760" expiry-address="jms.queue.ExpiryQueue" dead-letter-address="jms.queue.DLQ"/>
<http-connector name="http-connector" endpoint="http-acceptor" socket-binding="http"/>
<http-connector name="http-connector-throughput" endpoint="http-acceptor-throughput" socket-binding="http">
<param name="batch-delay" value="50"/>
</http-connector>
<in-vm-connector name="in-vm" server-id="0"/>
<http-acceptor name="http-acceptor" http-listener="default"/>
<http-acceptor name="http-acceptor-throughput" http-listener="default">
<param name="batch-delay" value="50"/>
<param name="direct-deliver" value="false"/>
</http-acceptor>
<in-vm-acceptor name="in-vm" server-id="0"/>
<jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>
<jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>
<jms-queue name="testQueue" entries="queue/testQueue java:jboss/exported/jms/queue/testQueue"/>
<connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm"/>
<connection-factory name="RemoteConnectionFactory" entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector"/>
<pooled-connection-factory name="activemq-ra" transaction="xa" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm"/>
</server>
</subsystem>
MDB
@MessageDriven(mappedName = "queue/testQueue", activationConfig = {
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "queue/testQueue"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge") })
public class MessageDrivenBean implements MessageListener {
private final static Logger LOGGER = Logger.getLogger(MessageDrivenBean.class.toString());
public MessageDrivenBean() {
System.out.println("TextMDB.ctor, this=" + hashCode());
}
@Override
public void onMessage(Message rcvMessage) {
TextMessage msg = null;
try {
if (rcvMessage instanceof TextMessage) {
msg = (TextMessage) rcvMessage;
LOGGER.log(Level.INFO, "Received Message from topic: {0}", msg.getText());
} else {
LOGGER.log(Level.WARNING, "Message of wrong type: {0}", rcvMessage.getClass().getName());
}
} catch (JMSException e) {
throw new RuntimeException(e);
}
}}
MessageProducerBean
@JMSDestinationDefinition(name = "queue/testQueue",
interfaceName = "javax.jms.Queue",
destinationName = "queue/testQueue")
@Named("messageProducerBean")
@RequestScopedpublic class MessageProducerBean {
@Inject
private JMSContext context;
@Resource(mappedName = "queue/testQueue")
private Queue queue;
private final static Logger LOGGER = Logger.getLogger(MessageProducerBean.class.toString());
@Resource(mappedName = "ConnectionFactory")
private ConnectionFactory factory;
private String message;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public void sendMessage() {
try {
String text = "Message from producer: " + message;
context.createProducer().send(queue, text);
FacesMessage facesMessage
= new FacesMessage("Sent message: " + text);
FacesContext.getCurrentInstance().addMessage(null, facesMessage);
} catch (Throwable t) {
LOGGER.log(Level.SEVERE, "SenderBean.sendMessage: Exception: {0}", t.toString());
}
} }
提前致谢!
对于单个消费者,添加消息 属性 标识消费者并让消费者使用消息选择器仅接收其消息。
对于所有消费者,最好的解决方案是使用一个主题并让所有消费者订阅它。如果您了解所有用户,则可以为上述同一队列中的每个消费者发送单独的消息,但这绝不是最佳做法。
终于找到方法了。我在这里发帖是因为它可能对某人有帮助。
如果我总结一下我所做的。
首先我们需要一个wildfly的应用程序用户。为此,请转到 {wildflyHome}/bin 和 运行 ./add-user.sh .
将用户类型指定为应用程序(选项 b)。并提供用户名和密码。
供参考:link 1
在上面 link 执行第 1 步和第 2 步。
修改standalone-full.xml为
<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
<server name="default">
<security-setting name="#">
<role name="guest" delete-non-durable-queue="true" create-non-durable-queue="true" delete-durable-queue="true" create-durable-queue="true" consume="true" send="true"/>
</security-setting>
<address-setting name="#" message-counter-history-day-limit="10" page-size-bytes="2097152" max-size-bytes="10485760" expiry-address="jms.queue.ExpiryQueue" dead-letter-address="jms.queue.DLQ"/>
<http-connector name="http-connector" endpoint="http-acceptor" socket-binding="http"/>
<http-connector name="http-connector-throughput" endpoint="http-acceptor-throughput" socket-binding="http">
<param name="batch-delay" value="50"/>
</http-connector>
<in-vm-connector name="in-vm" server-id="0"/>
<http-acceptor name="http-acceptor" http-listener="default"/>
<http-acceptor name="http-acceptor-throughput" http-listener="default">
<param name="batch-delay" value="50"/>
<param name="direct-deliver" value="false"/>
</http-acceptor>
<in-vm-acceptor name="in-vm" server-id="0"/>
<jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>
<jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>
<jms-queue name="JMSQueue" entries="java:/jboss/exported/jms/queue/JMSQueue jms/queue/JMSQueue"/>
<jms-topic name="JMSTopic" entries="jms/topic/JMSTopic java:jboss/exported/jms/topic/JMSTopic"/>
<connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm"/>
<connection-factory name="RemoteConnectionFactory" entries="java:jboss/exported/jms/RemoteConnectionFactory jms/RemoteConnectionFactory" connectors="http-connector"/>
<connection-factory name="TopicConnectionFactory" entries="java:jboss/exported/jms/TopicConnectionFactory jms/TopicConnectionFactory" connectors="http-connector"/>
<pooled-connection-factory name="activemq-ra" transaction="xa" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm"/>
</server>
</subsystem>
在安全设置下的这个子系统中,我们允许删除和创建持久队列。
和我的javaclass发送和接收消息如下,
首先注入资源为
@Resource(mappedName = "jms/topic/EMSTopic")
private Topic topic;
@Resource(mappedName = "jms/TopicConnectionFactory")
private TopicConnectionFactory connectionFactory;
消息发送方法如下,
public void send() throws JMSException {
TopicConnection connection = connectionFactory.createTopicConnection("JMSUser", "jmsuser@123");
connection.setClientID("userA");
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(topic);
TextMessage outMessage = session.createTextMessage();
outMessage.setText("Hi User");
outMessage.setStringProperty("name", "usera");
messageProducer.send(outMessage);
System.out.println("message sent ");
connection.close();
}
在这个方法中,我通过提供我的应用程序用户名和密码来创建 TopicConnection。
对于连接,我设置了 clientId,对于消息,我设置了一个 stringProperty,当用户收到消息时,他们只需要接收他们的消息。
我的接收方法如下,
public void recieve() throws JMSException {
System.out.println("message reday to recieve ");
String selector = "name = 'usera'";
try (TopicConnection connection = connectionFactory.createTopicConnection("JMSUser", "jmsuser@123")) {
connection.setClientID("userA");
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "usera", selector, true);
connection.start();
TextMessage message = (TextMessage) subscriber.receive(2000);
if (message != null) {
System.out.println("printing ==> " + message.getText());
} else {
System.out.println("no messages");
}
connection.close();
}
}
在这个方法中,我创建了一个 durableTopicConsumer,用于在用户不活动时保留所有消息。
我将一个参数值作为选择器传递给每个只接收消息的用户。
如果您需要进一步了解此内容,请参阅 link 2
希望这会有所帮助。