如何在 FUSE 中监听 AMQ broker 启动

How to listen for AMQ broker start in FUSE

我想添加一个功能,利用 JBoss FUSE 中内置的 AMQ,它将随容器一起启动——即,将添加到启动功能中,启动级别 <= 100 .

此功能将立即尝试通过 AMQ 发送一些消息,因此我希望向 'listen' 添加逻辑或者只是等待代理启动事件。

我目前拥有的唯一句柄是一条记录的消息,例如:

... | ActiveMQServiceFactory           | 196 - io.fabric8.mq.mq-fabric
      - 1.2.0.redhat-143 | Broker amq has started.

在我的功能文件中,我尝试创建几个功能依赖项,例如:

<feature name="myFeature" ...
    <feature version="1.2.0.redhat-133">mq-fabric</feature>
    <feature version="5.11.0.redhat-620133">activemq</feature>

...但这没有用。我也试过将开始级别设置为 90 年代的某个值,但这也不起作用。

我是不是错过了这些方法中的某些东西?

是否有可以在我的蓝图中注入的相关 OSGi 服务? -- 即,也许有一些 .isBrokerStarted() 方法?

我最近的想法是,也许我可以注册一个咨询消息的回调...如果可行,post 会将其作为答案。

不幸的是,我不再认为尝试连接和聆听咨询主题是一种可行的方法。我已经部署了这个功能。如果在上述 'broker start' 日志消息之后安装,它可以正常工作,否则会失败。

问题似乎是因为蓝图上下文初始化是在 AMQ 异步启动时发生的,所以任何生成的连接都是无效的。我原以为 AMQ 连接工厂或 Camel 组件会不透明地提供一些针对这种情况的保证。显然不是这样。调查他们的 API(具体来说,org.apache.activemq.ActiveMQSslConnectionFactory 和 org.apache.camel.component.jms.JmsConfiguration),我也没有找到用于状态检查的 public 方法...只有设置 "testConnectionOnStartup" 属性 的骆驼组件。据推测,这会按照记录完成,并抛出 1+ 异常,但事实并非如此。很混乱。

2016 年 8 月 24 日更新:

终于有更多的时间来处理这个问题,并尝试了一些东西...... 我在 camel-amq 上添加了一个特性依赖,因为它位于特性依赖链的 'top' 处:

camel-amq -> 依赖于 mq-fabric -> 依赖于 activemq -> 依赖于 activemq-client

这没有用。不过,在对日志记录进行了更多调整之后,我相信我已经确认了上面的暗示。 Start-levels 和功能依赖不能解决这个问题。所有 dependent-features 或较低启动级别的捆绑包将在 amq 代理实际准备就绪(并记录上述消息)之前报告 installed/started。

因此,我的功能将安装、启动并失败——除非我能找到一种方法来完成 post 标题所建议的操作:'listen for AMQ broker start'。有谁知道这样做的方法吗?

2016 年 8 月 25 日更新:

我看到 ActiveMQServiceFactory(记录代理启动消息)本身在其上下文本地的 org.apache.activemq.broker.BrokerService 实例上有一个句柄。如果这也作为 OSGi 服务提供,那么我认为我可以等待“broker.waitUntilStarted();”然后以编程方式创建我的 JMS 连接。当我有时间尝试这个时会报告...

首先,我无法在 ActiveMQServiceFactory 上找到可用的句柄...它是通过 org.osgi.service.cm.ManagedServiceFactory 接口提供的,但也以某种方式代理,因此它不能转换为 ActiveMQServiceFactory :-/

因此,我最终创建了一个功能,该功能不断循环尝试通过代理的最慢启动连接器(在我的例子中是 ssl 连接器)以编程方式创建 connection/session/temp-queue,然后 delete/close 所有。之后,此功能以编程方式安装我的其他功能。

这有效。

基本上:

// ... setup amq conn factory
while (true) {
  Thread.sleep(5000L);
  try { 
    amqConnFactory.createConnection()
                  .start()
                  .createSession(fals‌​e, Session.AUTO_ACKNOWLEDGE)
                  .createTemporaryQueue();
    break;
  } catch (Exception e) {
    // debug log 'not up yet'
  }
}