如何使用 CometD 和 Jetty 创建 Pub/Sub 服务
How to make a Pub/Sub service with CometD and Jetty
我需要创建一个基于 Java 8 的服务,该服务提供多个客户端可以订阅的 CometD 频道。这个想法是服务器可以在某些事件发生时向客户端发送通知。
我正在使用 Jetty 9 作为我的 servlet 容器(必须满足我的组的要求)。我一直在阅读 CometD 文档并寻找某种我可以使用的示例。文档内容广泛但无济于事(缺乏上下文),而且我一直无法找到一个像样的例子来说明我正在尝试做的事情。
有人可以在 Java 中提供一个创建可与 Jetty 一起使用的发布机制的简单示例吗?如果做不到这一点,有人可以指出我如何做的例子吗?
请指教
CometD Project has an outstanding task带回教程。
服务器端股票价格教程回答了这个特定问题,您可以找到源代码here,同时我们正在努力将其作为文档的一部分重新上线。
略过一些细节,您需要编写的服务类似于教程中的股票价格服务:在接收到外部事件后,该服务应将事件广播给订阅者。
@Service
public class StockPriceService implements StockPriceEmitter.Listener
{
@Inject
private BayeuxServer bayeuxServer;
@Session
private LocalSession sender;
public void onUpdates(List<StockPriceEmitter.Update> updates)
{
for (StockPriceEmitter.Update update : updates)
{
// Create the channel name using the stock symbol.
String channelName = "/stock/" + update.getSymbol().toLowerCase(Locale.ENGLISH);
// Initialize the channel, making it persistent and lazy.
bayeuxServer.createChannelIfAbsent(channelName, new ConfigurableServerChannel.Initializer()
{
public void configureChannel(ConfigurableServerChannel channel)
{
channel.setPersistent(true);
channel.setLazy(true);
}
});
// Convert the Update business object to a CometD-friendly format.
Map<String, Object> data = new HashMap<>(4);
data.put("symbol", update.getSymbol());
data.put("oldValue", update.getOldValue());
data.put("newValue", update.getNewValue());
// Publish to all subscribers.
ServerChannel channel = bayeuxServer.getChannel(channelName);
channel.publish(sender, data);
}
}
}
Class StockPriceEmitter
是您的外部事件的来源,并以 StockPriceEmitter.Update
事件的形式将它们发布到 StockPriceEmitter.Listener
。
外部事件如何中继到 CometD 服务器是 StockPriceEmitter
隐藏的细节;它可以通过 JMS 消息,或通过轮询外部 REST 服务,或通过自定义网络协议,或通过轮询数据库等来完成。
重要的是当外部事件到达时,调用StockPriceService.onUpdates(...)
,在那里你可以将事件转换成CometD友好的JSON格式,然后发布到CometD频道.
反过来,发布到 CometD 频道会将消息发送给该频道的所有订阅者,通常是浏览器等远程客户端。
CometD 通道已设为 lazy,因为这是一种避免以非常频繁的更新率(例如,高于每秒 2-4 次更新)轰炸客户端的方法.
您将需要根据您的特定用例来决定通道的惰性。
我需要创建一个基于 Java 8 的服务,该服务提供多个客户端可以订阅的 CometD 频道。这个想法是服务器可以在某些事件发生时向客户端发送通知。
我正在使用 Jetty 9 作为我的 servlet 容器(必须满足我的组的要求)。我一直在阅读 CometD 文档并寻找某种我可以使用的示例。文档内容广泛但无济于事(缺乏上下文),而且我一直无法找到一个像样的例子来说明我正在尝试做的事情。
有人可以在 Java 中提供一个创建可与 Jetty 一起使用的发布机制的简单示例吗?如果做不到这一点,有人可以指出我如何做的例子吗?
请指教
CometD Project has an outstanding task带回教程。
服务器端股票价格教程回答了这个特定问题,您可以找到源代码here,同时我们正在努力将其作为文档的一部分重新上线。
略过一些细节,您需要编写的服务类似于教程中的股票价格服务:在接收到外部事件后,该服务应将事件广播给订阅者。
@Service
public class StockPriceService implements StockPriceEmitter.Listener
{
@Inject
private BayeuxServer bayeuxServer;
@Session
private LocalSession sender;
public void onUpdates(List<StockPriceEmitter.Update> updates)
{
for (StockPriceEmitter.Update update : updates)
{
// Create the channel name using the stock symbol.
String channelName = "/stock/" + update.getSymbol().toLowerCase(Locale.ENGLISH);
// Initialize the channel, making it persistent and lazy.
bayeuxServer.createChannelIfAbsent(channelName, new ConfigurableServerChannel.Initializer()
{
public void configureChannel(ConfigurableServerChannel channel)
{
channel.setPersistent(true);
channel.setLazy(true);
}
});
// Convert the Update business object to a CometD-friendly format.
Map<String, Object> data = new HashMap<>(4);
data.put("symbol", update.getSymbol());
data.put("oldValue", update.getOldValue());
data.put("newValue", update.getNewValue());
// Publish to all subscribers.
ServerChannel channel = bayeuxServer.getChannel(channelName);
channel.publish(sender, data);
}
}
}
Class StockPriceEmitter
是您的外部事件的来源,并以 StockPriceEmitter.Update
事件的形式将它们发布到 StockPriceEmitter.Listener
。
外部事件如何中继到 CometD 服务器是 StockPriceEmitter
隐藏的细节;它可以通过 JMS 消息,或通过轮询外部 REST 服务,或通过自定义网络协议,或通过轮询数据库等来完成。
重要的是当外部事件到达时,调用StockPriceService.onUpdates(...)
,在那里你可以将事件转换成CometD友好的JSON格式,然后发布到CometD频道.
反过来,发布到 CometD 频道会将消息发送给该频道的所有订阅者,通常是浏览器等远程客户端。
CometD 通道已设为 lazy,因为这是一种避免以非常频繁的更新率(例如,高于每秒 2-4 次更新)轰炸客户端的方法. 您将需要根据您的特定用例来决定通道的惰性。