如何使用 CometD 和 Jetty 创建 Pub/Sub 服务

How to make a Pub/Sub service with CometD and Jetty

我需要创建一个基于 Java 8 的服务,该服务提供多个客户端可以订阅的 CometD 频道。这个想法是服务器可以在某些事件发生时向客户端发送通知。

我正在使用 Jetty 9 作为我的 servlet 容器(必须满足我的组的要求)。我一直在阅读 CometD 文档并寻找某种我可以使用的示例。文档内容广泛但无济于事(缺乏上下文),而且我一直无法找到一个像样的例子来说明我正在尝试做的事情。

有人可以在 Java 中提供一个创建可与 Jetty 一起使用的发布机制的简单示例吗?如果做不到这一点,有人可以指出我如何做的例子吗?

请指教

CometD Project has an outstanding task带回教程。

服务器端股票价格教程回答了这个特定问题,您可以找到源代码here,同时我们正在努力将其作为文档的一部分重新上线。

略过一些细节,您需要编写的服务类似于教程中的股票价格服务:在接收到外部事件后,该服务应将事件广播给订阅者。

@Service
public class StockPriceService implements StockPriceEmitter.Listener
{
    @Inject
    private BayeuxServer bayeuxServer;
    @Session
    private LocalSession sender;

    public void onUpdates(List<StockPriceEmitter.Update> updates)
    {
        for (StockPriceEmitter.Update update : updates)
        {
            // Create the channel name using the stock symbol.
            String channelName = "/stock/" + update.getSymbol().toLowerCase(Locale.ENGLISH);

            // Initialize the channel, making it persistent and lazy.
            bayeuxServer.createChannelIfAbsent(channelName, new ConfigurableServerChannel.Initializer()
            {
                public void configureChannel(ConfigurableServerChannel channel)
                {
                    channel.setPersistent(true);
                    channel.setLazy(true);
                }
            });

            // Convert the Update business object to a CometD-friendly format.
            Map<String, Object> data = new HashMap<>(4);
            data.put("symbol", update.getSymbol());
            data.put("oldValue", update.getOldValue());
            data.put("newValue", update.getNewValue());

            // Publish to all subscribers.
            ServerChannel channel = bayeuxServer.getChannel(channelName);
            channel.publish(sender, data);
        }
    }
}

Class StockPriceEmitter 是您的外部事件的来源,并以 StockPriceEmitter.Update 事件的形式将它们发布到 StockPriceEmitter.Listener

外部事件如何中继到 CometD 服务器是 StockPriceEmitter 隐藏的细节;它可以通过 JMS 消息,或通过轮询外部 REST 服务,或通过自定义网络协议,或通过轮询数据库等来完成。

重要的是当外部事件到达时,调用StockPriceService.onUpdates(...),在那里你可以将事件转换成CometD友好的JSON格式,然后发布到CometD频道.

反过来,发布到 CometD 频道会将消息发送给该频道的所有订阅者,通常是浏览器等远程客户端。

CometD 通道已设为 lazy,因为这是一种避免以非常频繁的更新率(例如,高于每秒 2-4 次更新)轰炸客户端的方法. 您将需要根据您的特定用例来决定通道的惰性。