通过 ActiveMQ Broker 插件发送消息

Send Message via ActiveMQ Broker Plugin

我正在使用 ActiveMQ 5.16.1。我正在尝试使用以下代码片段通过 BrokerFilter class 的 send 消息发送有关某个主题的消息。

public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {

    try {
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setText("hello");
        message.setType("abc");
        ActiveMQDestination amqDestination = new ActiveMQTopic();
        amqDestination.setPhysicalName("HELLO");
        message.setDestination(amqDestination);
        ProducerBrokerExchange producerBrokerExchange = new ProducerBrokerExchange();
        producerBrokerExchange.setConnectionContext(context);
        send(producerBrokerExchange,message);

    }
    catch (Exception ex) {

        logger.error("error while sending message");
        logger.error(ex.toString());
    }
}
   
public void send(final ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception {
     super.send(producerExchange,messageSend);
}

在调用覆盖的 send 方法时,我得到:

java.lang.NullPointerException
    at org.apache.activemq.broker.region.Topic.send(Topic.java:367)
    at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:508)
    at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:477)
    at org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:293)
    at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
    at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:96)
    at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:295)
    at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:154)
    at com.ef.apps.amq.plugin.TopicAuthorization.send(TopicAuthorization.java:85)
    at com.ef.apps.amq.plugin.TopicAuthorization.addConnection(TopicAuthorization.java:67)
    at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)
    at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:844)
    at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77)
    at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)
    at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:331)
    at org.apache.activemq.broker.TransportConnection.onCommand(TransportConnection.java:200)
    at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
    at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125)
    at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:301)
    at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
    at java.lang.Thread.run(Thread.java:748)

我的要求是在创建新连接时发送有关主题的消息。

您需要正确初始化 ProducerBrokerExchange 和出站消息才能使其正常工作。您还需要注意,如果配置启用了生产者流量控制,发送可能会被阻止。

这里有一个来自 'AdvisoryBroker.java' 的示例方法,它触发 ActiveMQ 咨询消息以获得灵感,请注意填写的 MessageID 等细节,这些细节是消息正确发送所必需的。

public void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
    //set properties
    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
    String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);

    String url = getBrokerService().getVmConnectorURI().toString();
    //try and find the URL on the transport connector and use if it exists else
    //try and find a default URL
    if (context.getConnector() instanceof TransportConnector
            && ((TransportConnector) context.getConnector()).getPublishableConnectString() != null) {
        url = ((TransportConnector) context.getConnector()).getPublishableConnectString();
    } else if (getBrokerService().getDefaultSocketURIString() != null) {
        url = getBrokerService().getDefaultSocketURIString();
    }
    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);

    //set the data structure
    advisoryMessage.setDataStructure(command);
    advisoryMessage.setPersistent(false);
    advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
    advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
    advisoryMessage.setTargetConsumerId(targetConsumerId);
    advisoryMessage.setDestination(topic);
    advisoryMessage.setResponseRequired(false);
    advisoryMessage.setProducerId(advisoryProducerId);
    boolean originalFlowControl = context.isProducerFlowControl();
    final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
    producerExchange.setConnectionContext(context);
    producerExchange.setMutable(true);
    producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
    try {
        context.setProducerFlowControl(false);
        next.send(producerExchange, advisoryMessage);
    } finally {
        context.setProducerFlowControl(originalFlowControl);
    }
}