如何在 FUSE 中监听 AMQ broker 启动
How to listen for AMQ broker start in FUSE
我想添加一个功能,利用 JBoss FUSE 中内置的 AMQ,它将随容器一起启动——即,将添加到启动功能中,启动级别 <= 100 .
此功能将立即尝试通过 AMQ 发送一些消息,因此我希望向 'listen' 添加逻辑或者只是等待代理启动事件。
我目前拥有的唯一句柄是一条记录的消息,例如:
... | ActiveMQServiceFactory | 196 - io.fabric8.mq.mq-fabric
- 1.2.0.redhat-143 | Broker amq has started.
在我的功能文件中,我尝试创建几个功能依赖项,例如:
<feature name="myFeature" ...
<feature version="1.2.0.redhat-133">mq-fabric</feature>
<feature version="5.11.0.redhat-620133">activemq</feature>
...但这没有用。我也试过将开始级别设置为 90 年代的某个值,但这也不起作用。
我是不是错过了这些方法中的某些东西?
是否有可以在我的蓝图中注入的相关 OSGi 服务? -- 即,也许有一些 .isBrokerStarted() 方法?
我最近的想法是,也许我可以注册一个咨询消息的回调...如果可行,post 会将其作为答案。
不幸的是,我不再认为尝试连接和聆听咨询主题是一种可行的方法。我已经部署了这个功能。如果在上述 'broker start' 日志消息之后安装,它可以正常工作,否则会失败。
问题似乎是因为蓝图上下文初始化是在 AMQ 异步启动时发生的,所以任何生成的连接都是无效的。我原以为 AMQ 连接工厂或 Camel 组件会不透明地提供一些针对这种情况的保证。显然不是这样。调查他们的 API(具体来说,org.apache.activemq.ActiveMQSslConnectionFactory 和 org.apache.camel.component.jms.JmsConfiguration),我也没有找到用于状态检查的 public 方法...只有设置 "testConnectionOnStartup" 属性 的骆驼组件。据推测,这会按照记录完成,并抛出 1+ 异常,但事实并非如此。很混乱。
2016 年 8 月 24 日更新:
终于有更多的时间来处理这个问题,并尝试了一些东西......
我在 camel-amq 上添加了一个特性依赖,因为它位于特性依赖链的 'top' 处:
camel-amq -> 依赖于 mq-fabric -> 依赖于 activemq -> 依赖于 activemq-client
这没有用。不过,在对日志记录进行了更多调整之后,我相信我已经确认了上面的暗示。 Start-levels 和功能依赖不能解决这个问题。所有 dependent-features 或较低启动级别的捆绑包将在 amq 代理实际准备就绪(并记录上述消息)之前报告 installed/started。
因此,我的功能将安装、启动并失败——除非我能找到一种方法来完成 post 标题所建议的操作:'listen for AMQ broker start'。有谁知道这样做的方法吗?
2016 年 8 月 25 日更新:
我看到 ActiveMQServiceFactory(记录代理启动消息)本身在其上下文本地的 org.apache.activemq.broker.BrokerService 实例上有一个句柄。如果这也作为 OSGi 服务提供,那么我认为我可以等待“broker.waitUntilStarted();”然后以编程方式创建我的 JMS 连接。当我有时间尝试这个时会报告...
首先,我无法在 ActiveMQServiceFactory
上找到可用的句柄...它是通过 org.osgi.service.cm.ManagedServiceFactory
接口提供的,但也以某种方式代理,因此它不能转换为 ActiveMQServiceFactory
:-/
因此,我最终创建了一个功能,该功能不断循环尝试通过代理的最慢启动连接器(在我的例子中是 ssl 连接器)以编程方式创建 connection/session/temp-queue,然后 delete/close 所有。之后,此功能以编程方式安装我的其他功能。
这有效。
基本上:
// ... setup amq conn factory
while (true) {
Thread.sleep(5000L);
try {
amqConnFactory.createConnection()
.start()
.createSession(false, Session.AUTO_ACKNOWLEDGE)
.createTemporaryQueue();
break;
} catch (Exception e) {
// debug log 'not up yet'
}
}
我想添加一个功能,利用 JBoss FUSE 中内置的 AMQ,它将随容器一起启动——即,将添加到启动功能中,启动级别 <= 100 .
此功能将立即尝试通过 AMQ 发送一些消息,因此我希望向 'listen' 添加逻辑或者只是等待代理启动事件。
我目前拥有的唯一句柄是一条记录的消息,例如:
... | ActiveMQServiceFactory | 196 - io.fabric8.mq.mq-fabric
- 1.2.0.redhat-143 | Broker amq has started.
在我的功能文件中,我尝试创建几个功能依赖项,例如:
<feature name="myFeature" ...
<feature version="1.2.0.redhat-133">mq-fabric</feature>
<feature version="5.11.0.redhat-620133">activemq</feature>
...但这没有用。我也试过将开始级别设置为 90 年代的某个值,但这也不起作用。
我是不是错过了这些方法中的某些东西?
是否有可以在我的蓝图中注入的相关 OSGi 服务? -- 即,也许有一些 .isBrokerStarted() 方法?
我最近的想法是,也许我可以注册一个咨询消息的回调...如果可行,post 会将其作为答案。
不幸的是,我不再认为尝试连接和聆听咨询主题是一种可行的方法。我已经部署了这个功能。如果在上述 'broker start' 日志消息之后安装,它可以正常工作,否则会失败。
问题似乎是因为蓝图上下文初始化是在 AMQ 异步启动时发生的,所以任何生成的连接都是无效的。我原以为 AMQ 连接工厂或 Camel 组件会不透明地提供一些针对这种情况的保证。显然不是这样。调查他们的 API(具体来说,org.apache.activemq.ActiveMQSslConnectionFactory 和 org.apache.camel.component.jms.JmsConfiguration),我也没有找到用于状态检查的 public 方法...只有设置 "testConnectionOnStartup" 属性 的骆驼组件。据推测,这会按照记录完成,并抛出 1+ 异常,但事实并非如此。很混乱。
2016 年 8 月 24 日更新:
终于有更多的时间来处理这个问题,并尝试了一些东西...... 我在 camel-amq 上添加了一个特性依赖,因为它位于特性依赖链的 'top' 处:
camel-amq -> 依赖于 mq-fabric -> 依赖于 activemq -> 依赖于 activemq-client
这没有用。不过,在对日志记录进行了更多调整之后,我相信我已经确认了上面的暗示。 Start-levels 和功能依赖不能解决这个问题。所有 dependent-features 或较低启动级别的捆绑包将在 amq 代理实际准备就绪(并记录上述消息)之前报告 installed/started。
因此,我的功能将安装、启动并失败——除非我能找到一种方法来完成 post 标题所建议的操作:'listen for AMQ broker start'。有谁知道这样做的方法吗?
2016 年 8 月 25 日更新:
我看到 ActiveMQServiceFactory(记录代理启动消息)本身在其上下文本地的 org.apache.activemq.broker.BrokerService 实例上有一个句柄。如果这也作为 OSGi 服务提供,那么我认为我可以等待“broker.waitUntilStarted();”然后以编程方式创建我的 JMS 连接。当我有时间尝试这个时会报告...
首先,我无法在 ActiveMQServiceFactory
上找到可用的句柄...它是通过 org.osgi.service.cm.ManagedServiceFactory
接口提供的,但也以某种方式代理,因此它不能转换为 ActiveMQServiceFactory
:-/
因此,我最终创建了一个功能,该功能不断循环尝试通过代理的最慢启动连接器(在我的例子中是 ssl 连接器)以编程方式创建 connection/session/temp-queue,然后 delete/close 所有。之后,此功能以编程方式安装我的其他功能。
这有效。
基本上:
// ... setup amq conn factory
while (true) {
Thread.sleep(5000L);
try {
amqConnFactory.createConnection()
.start()
.createSession(false, Session.AUTO_ACKNOWLEDGE)
.createTemporaryQueue();
break;
} catch (Exception e) {
// debug log 'not up yet'
}
}