通过 ActiveMQ Broker 插件发送消息
Send Message via ActiveMQ Broker Plugin
我正在使用 ActiveMQ 5.16.1。我正在尝试使用以下代码片段通过 BrokerFilter class 的 send 消息发送有关某个主题的消息。
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
try {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText("hello");
message.setType("abc");
ActiveMQDestination amqDestination = new ActiveMQTopic();
amqDestination.setPhysicalName("HELLO");
message.setDestination(amqDestination);
ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
producerBrokerExchange.setConnectionContext(context);
send(producerBrokerExchange,message);
}
catch (Exception ex) {
logger.error("error while sending message");
logger.error(ex.toString());
}
}
public void send(final ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception {
super.send(producerExchange,messageSend);
}
在调用覆盖的 send 方法时,我得到:
java.lang.NullPointerException
at org.apache.activemq.broker.region.Topic.send(Topic.java:367)
at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:508)
at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:477)
at org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:293)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:295)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
at com.ef.apps.amq.plugin.TopicAuthorization.send(TopicAuthorization.java:85)
at com.ef.apps.amq.plugin.TopicAuthorization.addConnection(TopicAuthorization.java:67)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)
at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:844)
at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77)
at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:331)
at org.apache.activemq.broker.TransportConnection.onCommand(TransportConnection.java:200)
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125)
at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
at java.lang.Thread.run(Thread.java:748)
我的要求是在创建新连接时发送有关主题的消息。
您需要正确初始化 ProducerBrokerExchange 和出站消息才能使其正常工作。您还需要注意,如果配置启用了生产者流量控制,发送可能会被阻止。
这里有一个来自 'AdvisoryBroker.java' 的示例方法,它触发 ActiveMQ 咨询消息以获得灵感,请注意填写的 MessageID 等细节,这些细节是消息正确发送所必需的。
public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
//set properties
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
String url = getBrokerService().getVmConnectorURI().toString();
//try and find the URL on the transport connector and use if it exists else
//try and find a default URL
if (context.getConnector() instanceof TransportConnector
&& ((TransportConnector) context.getConnector()).getPublishableConnectString() != null) {
url = ((TransportConnector) context.getConnector()).getPublishableConnectString();
} else if (getBrokerService().getDefaultSocketURIString() != null) {
url = getBrokerService().getDefaultSocketURIString();
}
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
//set the data structure
advisoryMessage.setDataStructure(command);
advisoryMessage.setPersistent(false);
advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
advisoryMessage.setTargetConsumerId(targetConsumerId);
advisoryMessage.setDestination(topic);
advisoryMessage.setResponseRequired(false);
advisoryMessage.setProducerId(advisoryProducerId);
boolean originalFlowControl = context.isProducerFlowControl();
final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setConnectionContext(context);
producerExchange.setMutable(true);
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
try {
context.setProducerFlowControl(false);
next.send(producerExchange, advisoryMessage);
} finally {
context.setProducerFlowControl(originalFlowControl);
}
}
我正在使用 ActiveMQ 5.16.1。我正在尝试使用以下代码片段通过 BrokerFilter class 的 send 消息发送有关某个主题的消息。
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
try {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText("hello");
message.setType("abc");
ActiveMQDestination amqDestination = new ActiveMQTopic();
amqDestination.setPhysicalName("HELLO");
message.setDestination(amqDestination);
ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
producerBrokerExchange.setConnectionContext(context);
send(producerBrokerExchange,message);
}
catch (Exception ex) {
logger.error("error while sending message");
logger.error(ex.toString());
}
}
public void send(final ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception {
super.send(producerExchange,messageSend);
}
在调用覆盖的 send 方法时,我得到:
java.lang.NullPointerException
at org.apache.activemq.broker.region.Topic.send(Topic.java:367)
at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:508)
at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:477)
at org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:293)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:295)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
at com.ef.apps.amq.plugin.TopicAuthorization.send(TopicAuthorization.java:85)
at com.ef.apps.amq.plugin.TopicAuthorization.addConnection(TopicAuthorization.java:67)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)
at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:844)
at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77)
at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:331)
at org.apache.activemq.broker.TransportConnection.onCommand(TransportConnection.java:200)
at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125)
at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
at java.lang.Thread.run(Thread.java:748)
我的要求是在创建新连接时发送有关主题的消息。
您需要正确初始化 ProducerBrokerExchange 和出站消息才能使其正常工作。您还需要注意,如果配置启用了生产者流量控制,发送可能会被阻止。
这里有一个来自 'AdvisoryBroker.java' 的示例方法,它触发 ActiveMQ 咨询消息以获得灵感,请注意填写的 MessageID 等细节,这些细节是消息正确发送所必需的。
public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
//set properties
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
String url = getBrokerService().getVmConnectorURI().toString();
//try and find the URL on the transport connector and use if it exists else
//try and find a default URL
if (context.getConnector() instanceof TransportConnector
&& ((TransportConnector) context.getConnector()).getPublishableConnectString() != null) {
url = ((TransportConnector) context.getConnector()).getPublishableConnectString();
} else if (getBrokerService().getDefaultSocketURIString() != null) {
url = getBrokerService().getDefaultSocketURIString();
}
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
//set the data structure
advisoryMessage.setDataStructure(command);
advisoryMessage.setPersistent(false);
advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
advisoryMessage.setTargetConsumerId(targetConsumerId);
advisoryMessage.setDestination(topic);
advisoryMessage.setResponseRequired(false);
advisoryMessage.setProducerId(advisoryProducerId);
boolean originalFlowControl = context.isProducerFlowControl();
final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setConnectionContext(context);
producerExchange.setMutable(true);
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
try {
context.setProducerFlowControl(false);
next.send(producerExchange, advisoryMessage);
} finally {
context.setProducerFlowControl(originalFlowControl);
}
}