Reactor 中 Listener 的主题是什么?
What's a Listener's subject in Reactor?
我有一个简单的 Listener 模式实现(以压缩形式)如下所示:
public class EmailProducer implements Runnable {
private List<Consumer<EmailMessage>> listeners = new ArrayList<>();
private IdleManager idleManager;
public void registerListener(Consumer<EmailMessage> listener) {
listeners.add(listener);
}
public void run() {
Session session = Session.getDefaultInstance(new Properties());
idleManager = new IdleManager(session, Executors.newCachedThreadPool());
// use IMAP IDLE to listen for new incoming emails
Folder inbox = openInbox();
inbox.addMessageCountListener(new MessageCountAdapter() {
public void messagesAdded(MessageCountEvent ev) {
Message[] msgs = ev.getMessages();
for (Message msg : msgs) {
EmailMessage info = EmailMessage.from(msg);
for (Consumer<EmailMessage> listener : listeners) {
listener.accept(info);
}
}
}
});
try {
idleManager.watch(inbox);
catch (FolderClosedException|StoreClosedException ex) {
run(); // restart, need to re-establish connection
}
}
}
所以本质上,这是从我的收件箱中抓取电子邮件,从每封电子邮件中提取信息并将其转换为内部消息格式。
一个典型的 Consumer
然后会保留该消息,另一个会在 HTML 页面上显示信息,还有一个可能会触发第 3 方系统执行某些操作。所有这一切都很好。
现在,我正试图在面对 Consumer
失败时获得更多的弹性。例如,如果持久性消费者失败(抛出异常)并结束处理,我希望我的 EmailMessage
继续存在。在那种情况下,我希望在暂停间隔后重试处理。现在,如果消费者失败,消息就会丢失。
我最近发现了 Reactor,我认为它的编排可以保存消息并使我能够做我需要的事情。但是,我看不出如何将当前的 Runnable
调整到它的模型中。从 documentation 看来我需要一个 RingBufferProcessor
来保存我的消息。我找不到另一种方法将处理器传递给我的 Runnable
因为它调用 onNext()
方法而不是单独的侦听器的 accept
方法。我是不是遗漏了什么,或者这是它应该工作的方式吗?
(额外的 cookie 展示了我如何在消费者失败时重试)
您可以看看 RingBufferProcessor
,它允许您使用单个 EmailMessage
执行并行任务。您可以将其设置为:
ReactorProcessor<Message, Message> processor = RingBufferProcessor.share("email-processor", 1024);
Stream<Message> s = Streams.wrap(processor);
s.consume(m -> firstTask(m));
s.consume(m -> secondTask(m));
s.consume(m -> thirdTask(m));
这会将工作并行化到 3 个单独的线程(每个 Consumer
一个)。要重试或捕获错误,您需要 Stream.retry
或 Stream.retryWhen
(退避)或仅 Stream.when
(不会给您导致错误的消息)。
s.retry(t -> t instanceof IllegalStateException)
.consume(m -> retryMessage(m));
s.when(NumberFormatException.class, e -> e.printStackTrace());
然后要向此 Processor
发布消息,只需调用 onNext
:
for(EmailMessage msg : msgs) {
processor.onNext(msg);
}
我有一个简单的 Listener 模式实现(以压缩形式)如下所示:
public class EmailProducer implements Runnable {
private List<Consumer<EmailMessage>> listeners = new ArrayList<>();
private IdleManager idleManager;
public void registerListener(Consumer<EmailMessage> listener) {
listeners.add(listener);
}
public void run() {
Session session = Session.getDefaultInstance(new Properties());
idleManager = new IdleManager(session, Executors.newCachedThreadPool());
// use IMAP IDLE to listen for new incoming emails
Folder inbox = openInbox();
inbox.addMessageCountListener(new MessageCountAdapter() {
public void messagesAdded(MessageCountEvent ev) {
Message[] msgs = ev.getMessages();
for (Message msg : msgs) {
EmailMessage info = EmailMessage.from(msg);
for (Consumer<EmailMessage> listener : listeners) {
listener.accept(info);
}
}
}
});
try {
idleManager.watch(inbox);
catch (FolderClosedException|StoreClosedException ex) {
run(); // restart, need to re-establish connection
}
}
}
所以本质上,这是从我的收件箱中抓取电子邮件,从每封电子邮件中提取信息并将其转换为内部消息格式。
一个典型的 Consumer
然后会保留该消息,另一个会在 HTML 页面上显示信息,还有一个可能会触发第 3 方系统执行某些操作。所有这一切都很好。
现在,我正试图在面对 Consumer
失败时获得更多的弹性。例如,如果持久性消费者失败(抛出异常)并结束处理,我希望我的 EmailMessage
继续存在。在那种情况下,我希望在暂停间隔后重试处理。现在,如果消费者失败,消息就会丢失。
我最近发现了 Reactor,我认为它的编排可以保存消息并使我能够做我需要的事情。但是,我看不出如何将当前的 Runnable
调整到它的模型中。从 documentation 看来我需要一个 RingBufferProcessor
来保存我的消息。我找不到另一种方法将处理器传递给我的 Runnable
因为它调用 onNext()
方法而不是单独的侦听器的 accept
方法。我是不是遗漏了什么,或者这是它应该工作的方式吗?
(额外的 cookie 展示了我如何在消费者失败时重试)
您可以看看 RingBufferProcessor
,它允许您使用单个 EmailMessage
执行并行任务。您可以将其设置为:
ReactorProcessor<Message, Message> processor = RingBufferProcessor.share("email-processor", 1024);
Stream<Message> s = Streams.wrap(processor);
s.consume(m -> firstTask(m));
s.consume(m -> secondTask(m));
s.consume(m -> thirdTask(m));
这会将工作并行化到 3 个单独的线程(每个 Consumer
一个)。要重试或捕获错误,您需要 Stream.retry
或 Stream.retryWhen
(退避)或仅 Stream.when
(不会给您导致错误的消息)。
s.retry(t -> t instanceof IllegalStateException)
.consume(m -> retryMessage(m));
s.when(NumberFormatException.class, e -> e.printStackTrace());
然后要向此 Processor
发布消息,只需调用 onNext
:
for(EmailMessage msg : msgs) {
processor.onNext(msg);
}