在 Websphere MQ Exit 中解析 byteBuffer

Parse byteBuffer in Websphere MQ Exit

我正在尝试构建自定义 mq 出口来存档到达队列的消息。我有以下代码。

class MyMqExits implements WMQSendExit, WMQReceiveExit{

@Override
public ByteBuffer channelReceiveExit(MQCXP arg0, MQCD arg1, ByteBuffer arg2) {
    // TODO Auto-generated method stub

    if ( arg2){
        def _bytes = arg2.array()
        def results = new String(_bytes)
        println results;
    }

    return arg2;
}
...

消息的内容(header/body)在字节缓冲区中,还有一些不可读的二进制信息。如何解析来自 arg2 的消息(包括正文和队列名称)?我们已经浏览了 IBM 的文档,但还没有找到一个对象或任何可以使这变得简单的东西。

接收出口不会给您完整的消息。发送和接收出口按通道对传输缓冲区 sent/received 进行操作。这些将包含未记录的各种协议流,因为协议不是 public,并且这些协议流的一部分将是消息块,分解为 32Kb 块。

你在问题中没有提供足够的信息让我知道你使用的是什么类型的频道,但我猜它是在客户端,因为你是在 Java 中写的,而且是唯一适用的环境。

在客户端写出口,你需要小心处理消息没有成功放入目标队列的情况,你需要管理同步点等

如果您使用的是 QMgr-QMgr 通道,您应该使用消息出口来捕获 MQXR_MSG 调用,其中将整个消息提供给您。如果您在频道消息出口中放置任何其他消息,您放置的消息将包含在频道的同步点中,并且如果原始消息已提交,则已提交。

由于您使用的是客户端 QMgr 通道,因此您可以查看 QMgr 端的 API 退出(目前客户端 API 退出仅支持 C 客户端)并捕获所有MQPUT 调用。此出口还会为您提供 MQPUT return 代码,因此您可以对出口进行编码以查找和处理失败的放置。

当然,编写出口是一项复杂的任务,因此可能值得找出是否有任何预先编写的工具可以为您完成这项工作,而不是从头开始。

假设以下两点:

1) 您的发件人应用程序尚未对放置消息的队列名称进行硬编码。因此,您可以更改应用程序配置以将消息发送到不同的对象。

2)归档消息的MessageId不重要,重要的是消息体。

然后我能想到的一种替代方法是创建一个解析为主题的别名队列,并使用两个订阅者接收消息。

1) 订阅者 1:管理上定义的持久订阅者,提供了一个队列来接收消息。提供您现有的消费者应用程序从中接收消息的相同队列名称。

2) 订阅者 2:另一个管理定义的持久订阅者,提供队列。您可以编写一个简单的 java 应用程序来从此队列和存档中获取消息。

3) 两个订阅者都订阅了相同的主题。

步骤如下:

// Create a topic
define topic(ANY.TOPIC) TOPICSTR('/ANY_TOPIC')
// Create an alias queue that points to above created topic
define qalias(QA.APP) target(ANY.TOPIC) targtype(TOPIC)
// Create a queue for your application that does business logic. If one is available already then no need to create.
define ql(Q.BUSLOGIC)
// Create a durable subscription with destination queue as created in previous step.
define sub(SB.BUSLOGIC) topicstr('/ANY_TOPIC') dest(Q.BUSLOGIC)
// Create a queue for application that archives messages.
define ql(Q.ARCHIVE)
// Create another subscription with destination queue as created in previous step.
define sub(SB.ARCHIVE) topicstr('/ANY_TOPIC') dest(Q.ARCHIVE)

编写一个简单的 MQ Java/JMS 应用程序以从 Q.ARCHIVE 获取消息并存档消息。

我完全同意Morag & Shashi,错误的做法。有一个名为 Message Multiplexer (MMX) 的开源项目,它会从队列中获取消息并将其输出到一个或多个队列。跨消息放置维护上下文信息。有关 MMX 的更多信息,请访问:http://www.capitalware.com/mmx_overview.html

如果您无法更改源队列或目标队列以将 MMX 插入混合中,那么 API 退出可能会起作用。这是一篇关于通过 API 出口复制消息的博客:http://www.capitalware.com/rl_blog/?p=3304

这是一个很老的问题,但值得用与 MQ 9.2.3 或更高版本相关的更新来回答。有一个称为流队列的新功能(请参阅 https://www.ibm.com/docs/en/ibm-mq/9.2?topic=scenarios-streaming-queues),它旨在支持的 use-cases 之一是将发送到给定队列的每条消息的副本放入另一个队列。然后另一个应用程序可以使用重复消息并将它们单独归档到正在处理原始消息的应用程序。