Reactor 中 Listener 的主题是什么?

What's a Listener's subject in Reactor?

我有一个简单的 Listener 模式实现(以压缩形式)如下所示:

public class EmailProducer implements Runnable {
    private List<Consumer<EmailMessage>> listeners = new ArrayList<>();
    private IdleManager idleManager;

    public void registerListener(Consumer<EmailMessage> listener) {
        listeners.add(listener);
    }

    public void run() {
        Session session = Session.getDefaultInstance(new Properties());
        idleManager = new IdleManager(session, Executors.newCachedThreadPool());
        // use IMAP IDLE to listen for new incoming emails
        Folder inbox = openInbox();
        inbox.addMessageCountListener(new MessageCountAdapter() {
            public void messagesAdded(MessageCountEvent ev) {
                Message[] msgs = ev.getMessages();
                for (Message msg : msgs) {
                    EmailMessage info = EmailMessage.from(msg);
                    for (Consumer<EmailMessage> listener : listeners) {
                        listener.accept(info);
                    }
                }
            }
        });
        try {
            idleManager.watch(inbox);
        catch (FolderClosedException|StoreClosedException ex) {
            run(); // restart, need to re-establish connection
        }
    }
}

所以本质上,这是从我的收件箱中抓取电子邮件,从每封电子邮件中提取信息并将其转换为内部消息格式。

一个典型的 Consumer 然后会保留该消息,另一个会在 HTML 页面上显示信息,还有一个可能会触发第 3 方系统执行某些操作。所有这一切都很好。

现在,我正试图在面对 Consumer 失败时获得更多的弹性。例如,如果持久性消费者失败(抛出异常)并结束处理,我希望我的 EmailMessage 继续存在。在那种情况下,我希望在暂停间隔后重试处理。现在,如果消费者失败,消息就会丢失。

我最近发现了 Reactor,我认为它的编排可以保存消息并使我能够做我需要的事情。但是,我看不出如何将当前的 Runnable 调整到它的模型中。从 documentation 看来我需要一个 RingBufferProcessor 来保存我的消息。我找不到另一种方法将处理器传递给我的 Runnable 因为它调用 onNext() 方法而不是单独的侦听器的 accept 方法。我是不是遗漏了什么,或者这是它应该工作的方式吗?

(额外的 cookie 展示了我如何在消费者失败时重试)

您可以看看 RingBufferProcessor,它允许您使用单个 EmailMessage 执行并行任务。您可以将其设置为:

ReactorProcessor<Message, Message> processor = RingBufferProcessor.share("email-processor", 1024);
Stream<Message> s = Streams.wrap(processor);

s.consume(m -> firstTask(m));
s.consume(m -> secondTask(m));
s.consume(m -> thirdTask(m));

这会将工作并行化到 3 个单独的线程(每个 Consumer 一个)。要重试或捕获错误,您需要 Stream.retryStream.retryWhen(退避)或仅 Stream.when(不会给您导致错误的消息)。

s.retry(t -> t instanceof IllegalStateException)
 .consume(m -> retryMessage(m));

s.when(NumberFormatException.class, e -> e.printStackTrace());

然后要向此 Processor 发布消息,只需调用 onNext:

for(EmailMessage msg : msgs) {
    processor.onNext(msg);
}