OpenNMS v18 AMQP 消息发送问题

OpenNMS v18 AMQP Message Sending Issue

我无法让 OpenNMS 将消息发送到 AMQP 端点。 我从来没有用过这个,这是我第一次使用 OpenNMS 和 AMQP,所以这可能是我缺乏经验。

我已经配置了 RabbitMQ 3.5.7 并按照 question 对其进行了测试。 它在使用外部 QPID 0.32 客户端时工作正常,在使用 python 或 perl 时也工作正常。 正常工作的定义是建立通信并将消息有效负载传输到交换器中,然后传递到后端队列中。 然后可以在 RabbitMQ 管理 GUI 中查看消息。

在 OpenNMS 中,我一直按照说明进行操作 here 从 EventForwarder 开始,然后尝试 AlarmNorthbounder 都产生 NullPointerException。

我设置 Karaf 如下使用这些语句设置属性:-

opennms> config:edit org.opennms.features.amqp.alarmnorthbounder
opennms> propset connectionUrl amqp://simon:simon@/test?brokerlist=\'localhost:5672\'
opennms> propset destination "amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} }"
opennms> propset processorName default-alarm-northbounder-processor
opennms> config:update
opennms> config:list '(service.pid=org.opennms.features.amqp.alarmnorthbounder)'
----------------------------------------------------------------
Pid:            org.opennms.features.amqp.alarmnorthbounder
BundleLocation: mvn:org.opennms.features.amqp/org.opennms.features.amqp.alarm-northbounder/18.0.0
Properties:
   connectionUrl = amqp://simon:simon@/test?brokerlist='localhost:5672'
   destination = amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} }
   felix.fileinstall.filename = file:/usr/share/opennms/etc/org.opennms.features.amqp.alarmnorthbounder.cfg
   processorName = default-alarm-northbounder-processor
   service.pid = org.opennms.features.amqp.alarmnorthbounder

我在日志中收到以下错误消息

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[forwardAlarm      ] [forwardAlarm      ] [seda://forwardAlarm                                                           ] [         8]
[forwardAlarm      ] [convertBodyTo3    ] [convertBodyTo[org.opennms.netmgt.alarmd.api.NorthboundAlarm]                  ] [         0]
[forwardAlarm      ] [log3              ] [log                                                                           ] [         1]
[forwardAlarm      ] [bean3             ] [bean[ref:dynamicallyTrackedProcessor]                                         ] [         0]
[forwardAlarm      ] [to3               ] [amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} }                 ] [         7]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
        Id                  ID-ubuntu-1604-35241-1465752556843-2-60
        ExchangePattern     InOnly
        Headers             {breadcrumbId=ID-ubuntu-1604-35241-1465752556843-2-58, CamelRedelivered=false, CamelRedeliveryCounter=0}
        BodyType            String
        Body                NorthboundAlarm[id=3, uei='uei.opennms.org/generic/traps/EnterpriseDefault', nodeId=1]
]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.NullPointerException
        at org.apache.qpid.client.BasicMessageProducer_0_8.declareDestination(BasicMessageProducer_0_8.java:63)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
        at org.apache.qpid.client.BasicMessageProducer.<init>(BasicMessageProducer.java:136)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
        at org.apache.qpid.client.BasicMessageProducer_0_8.<init>(BasicMessageProducer_0_8.java:55)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
        at org.apache.qpid.client.AMQSession_0_8.createMessageProducer(AMQSession_0_8.java:559)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
        at org.apache.qpid.client.AMQSession_0_8.createMessageProducer(AMQSession_0_8.java:62)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]

我的期望是它应该能够使用与我在外部使用的相同的库将消息传递到队列。

为 org.apache.qpid 启用 DEBUG 我得到:-

2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [org.apache.qpid.client.state.StateWaiter@5d7e5d44]
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.framing.FieldTable: FieldTable::writeToBuffer: Writing encoded length of 254...
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.framing.FieldTable: {instance=[LONG_STRING: ubuntu-16041465757105204], product=[LONG_STRING: qpid], version=[LONG_STRING: 0.28], platform=[LONG_STRING: Java(TM) SE Runtime Environment, 1.8.0_45-b14, Oracle Corporation, amd64, Linux, 4.4.0-22-generic, unknown], qpid.client_process=[LONG_STRING: Qpid Java Client], qpid.client_pid=[INT: 1318]}
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ConnectionTuneBodyImpl: channelMax=0, frameMax=131072, heartbeat=60]
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.handler.ConnectionTuneMethodHandler: ConnectionTune frame received
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: State changing to AMQState: id = 3 name: CONNECTION_NOT_OPENED from old state AMQState: id = 2 name: CONNECTION_NOT_TUNED
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [org.apache.qpid.client.state.StateWaiter@5d7e5d44]
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ConnectionOpenOkBodyImpl: knownHosts=null]
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: State changing to AMQState: id = 4 name: CONNECTION_OPEN from old state AMQState: id = 3 name: CONNECTION_NOT_OPENED
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [org.apache.qpid.client.state.StateWaiter@5d7e5d44]
2016-06-12 19:00:38,726 INFO  org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Connection 44 now connected from /127.0.0.1:45388 to localhost/127.0.0.1:5672
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Are we connected:true
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Connected with ProtocolHandler Version:0-91
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnectionDelegate_8_0: Write channel open frame for channel id 1
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQSession: Created session:org.apache.qpid.client.AMQSession_0_8@179483f0
2016-06-12 19:00:38,728 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ChannelOpenOkBodyImpl: channelId=null]
2016-06-12 19:00:38,728 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [BasicQosOkBodyImpl: ]
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQDestination: Based on onms3/Simon;{'create':'always','node':{'type':'topic'} } the selected destination syntax is ADDR
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQSession: Closing session: org.apache.qpid.client.AMQSession_0_8@179483f0
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.protocol.AMQProtocolSession: closeSession called on protocol session for session 1

它在写入任何信息之前关闭会话。

当我从外部 qpid 客户端执行基本相同的操作时 - 执行如下:-

#!/bin/bash

rm log.out
java -Dqpid.amqp.version=0-91 -Dlog4j.debug -Dlog4j.configuration=file:./log4j.properties -cp "client/example/target/classes/:client/example/target/dependency/*:slf4j-1.7.21/slf4j-log4j12-1.7.
21.jar:apache-log4j-1.2.17/log4j-1.2.17.jar" \
    org.apache.qpid.example.ListSender

我明白了:-

142  [main] DEBUG org.apache.qpid.client.AMQConnection  - Are we connected:true
142  [main] DEBUG org.apache.qpid.client.AMQConnection  - Connected with ProtocolHandler Version:0-91
146  [main] DEBUG org.apache.qpid.client.AMQConnectionDelegate_8_0  - Write channel open frame for channel id 1
162  [main] DEBUG org.apache.qpid.client.AMQSession  - Created session:org.apache.qpid.client.AMQSession_0_8@6bdf28bb
164  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ChannelOpenOkBody]
165  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [BasicQosOkBodyImpl: ]
173  [main] DEBUG org.apache.qpid.client.AMQDestination  - Based on onms3/Simon;{create: always, node:{type: topic } } the selected destination syntax is ADDR
177  [main] DEBUG org.apache.qpid.framing.FieldTable  - FieldTable::writeToBuffer: Writing encoded length of 0...
178  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ]
179  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ]
179  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ]
180  [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8  - MessageProducer org.apache.qpid.client.BasicMessageProducer_0_8@1936f0f5 using publish mode : ASYNC_PUBLISH_ALL
190  [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8  - Sending content body frames to 'onms3'/'Simon'; {
  'create': 'always',
  'node': {
    'type': 'topic'
  }
}
190  [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8  - Sending content header frame to 'onms3'/'Simon'; {
  'create': 'always',
  'node': {
    'type': 'topic'
  }
}
190  [main] DEBUG org.apache.qpid.framing.FieldTable  - FieldTable::writeToBuffer: Writing encoded length of 90...
191  [main] DEBUG org.apache.qpid.framing.FieldTable  - {Id=[INT: 987654321], name=[LONG_STRING: WidgetSimon], price=[DOUBLE: 0.99], qpid.subject=[LONG_STRING: Simon], JMS_QPID_DESTTYPE=[INT: 2]}
192  [main] DEBUG org.apache.qpid.client.AMQSession  - Closing session: org.apache.qpid.client.AMQSession_0_8@6bdf28bb
192  [main] DEBUG org.apache.qpid.client.protocol.AMQProtocolSession  - closeSession called on protocol session for session 1
194  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ChannelCloseOkBody]
194  [IoReceiver - localhost/127.0.0.1:5672] INFO  org.apache.qpid.client.handler.ChannelCloseOkMethodHandler  - Received channel-close-ok for channel-id 1
195  [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - (333274164)Method frame received: [ConnectionCloseOkBody]
196  [main] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler  - Session closed called by client

ListSender.java如下:-

package org.apache.qpid.example;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;

import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.ListMessage;


public class ListSender {

    public static void main(String[] args) throws Exception
    {
        Connection connection =
            new AMQConnection("amqp://simon:simon@localhost/test?brokerlist='tcp://localhost:5672'");
                                                  AMQShortString a1 = new AMQShortString("");
                                                   AMQShortString a2 = new AMQShortString("");
        AMQShortString[] bindvars = new AMQShortString[]{a1,a2};
        boolean is_durable = true;
/*      
        Destination queue = new AMQAnyDestination( new AMQShortString("onms2"), 
                                                   new AMQShortString("direct"),
                                                   new AMQShortString("Simon"),
                                                   true,        
                                                   true,        
                                                   new AMQShortString(""),
                                                   false,       
                                                   bindvars);
        */

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //Destination queue = new AMQAnyDestination("onms3/Simon");
        Destination queue = new AMQAnyDestination("onms3/Simon;{create: always, node:{type: topic } }");
        //Destination queue = new AMQAnyDestination("onms3/Simon;%7Bcreate%3A%20always%2C%20node%3A%7Btype%3A%20topic%20%7D%20%7D");
        //Destination queue = new AMQAnyDestination("amqp:OpenNMSExchange/Taylor; {create: always}");
        //Destination queue = new AMQAnyDestination("OpenNMSExchange; {create: always}");
        MessageProducer producer = session.createProducer(queue);

ListMessage m = ((org.apache.qpid.jms.Session)session).createListMessage();
        m.setIntProperty("Id", 987654321);
        m.setStringProperty("name", "WidgetSimon");
        m.setDoubleProperty("price", 0.99);

        List<String> colors = new ArrayList<String>();
        colors.add("red");
        colors.add("green");
        colors.add("white");
        m.add(colors);

        Map<String,Double> dimensions = new HashMap<String,Double>();
        dimensions.put("length",10.2);
        dimensions.put("width",5.1);
        dimensions.put("depth",2.0);
        m.add(dimensions);
        List<List<Integer>> parts = new ArrayList<List<Integer>>();
        parts.add(Arrays.asList(new Integer[] {1,2,5}));
        parts.add(Arrays.asList(new Integer[] {8,2,5}));
        m.add(parts);

        Map<String,Object> specs = new HashMap<String,Object>();
        specs.put("colours", colors);
        specs.put("dimensions", dimensions);
        specs.put("parts", parts);
        m.add(specs);

        producer.send((Message)m);
        System.out.println("Sent: " + m);
        connection.close();
    }

}

我的假设是,这是与某种描述的 AMQP 服务器的连接问题。如果我遇到来自外部 jar 的问题,那么在解决这个问题时遇到了许多问题,从日志中可以清楚地看出问题是什么。 这是 OpenNMS 的问题吗? 有没有人有这个工作成功? 有什么想法吗?

干杯

西蒙

根据 OpenNMS 列表中的指导,我决定安装 QPID Broker v6.0.3 来代替 RabbitMQ,现在消息传输没有问题。