在tibco ems中实现异步发布订阅主题

Implementing asynchronous publish subscribe topic in tibco ems

我的情况是,我作为电影发行商,需要向我的客户更新新电影,我发布此信息的主题与长期订阅者和想要购买电影的客户会表达他们的兴趣。

然而,这就是事情的发展方向,我的发布者实施在收到第一个回复后立即停止侦听。任何帮助将不胜感激。谢谢。

请求(消息消息) 发送请求并等待回复。 临时主题用于 JMSReplyTo 目的地;返回第一个回复,并丢弃任何后续回复。

https://docs.oracle.com/javaee/6/api/javax/jms/TopicRequestor.html

您想继续循环阅读邮件。这是一个例子:

    /* read messages */
    while (true)
    {
        /* receive the message */
        msg = msgConsumer.receive();
        if (msg == null)
           break;

        if (ackMode == Session.CLIENT_ACKNOWLEDGE ||
            ackMode == Tibjms.EXPLICIT_CLIENT_ACKNOWLEDGE ||
            ackMode == Tibjms.EXPLICIT_CLIENT_DUPS_OK_ACKNOWLEDGE)
        {
            msg.acknowledge();
        }

        System.err.println("Received message: "+ msg);
    }

您可能还想考虑耐用消费品可能存在的问题。如果您的消费者从不接收他们的消息,则服务器端的存储空间将继续增长。出于这个原因,您可能希望发送带有过期时间的消息,and/or 限制您正在使用的 JMS 主题的最大消息数(或 KB/MB/GB 中的大小)。

首先...我对场景有疑问。这是某种 test/exercice,还是我们在谈论真实世界的场景?

是否所有客户都对电影 SEPARATE 主题订阅者感兴趣?规模如何?我计划为每部电影设置一个主题,并且可能感兴趣的各方声明持久订阅者(每部电影一个)?这似乎是对持久订阅者的滥用......我建议只使用一个订阅者(在系统 B 中)到 "Movie Released" event/topic(来自系统 A),并有一些代码(在系统 B 中)从数据库中读取所有客户端以发送 emails/messages/whatever。 (如果系统 A 和 B 相同,使用 EMS 可能是个好主意,也可能不是个好主意……取决于。)

如果这不是练习,我必须评论:不要使用 MOM(EMS、ActiveMQ)来完成 DBMS(Oracle、PostGreSQL)的工作!

免责声明部分完成后,我建议采用异步订阅方式(这两个剪辑是为EMS示例目录拍摄的。文件tibjmsAsyncMsgConsumer.java)。

从构造函数中提取(主要class必须实现ExceptionListener、MessageListener):

        ConnectionFactory factory = new com.tibco.tibjms.TibjmsConnectionFactory(serverUrl);

        /* create the connection */
        connection = factory.createConnection(userName,password);

        /* create the session */
        session = connection.createSession();

        /* set the exception listener */
        connection.setExceptionListener(this);

        /* create the destination */
        if (useTopic)
            destination = session.createTopic(name);
        else
            destination = session.createQueue(name);

        System.err.println("Subscribing to destination: "+name);

        /* create the consumer */
        msgConsumer = session.createConsumer(destination);

        /* set the message listener */
        msgConsumer.setMessageListener(this);

        /* start the connection */
        connection.start();

每次消息到达时都会调用该方法。

public void onMessage(Message msg)
{
    try
    {
        System.err.println("Received message: " + msg);
    }
    catch (Exception e)
    {
        System.err.println("Unexpected exception in the message callback!");
        e.printStackTrace();
        System.exit(-1);
    }
}