如何正确使用 Reactor Publisher
How to properly use a Reactor Publisher
我不知道如何使用 Reactor 正确实施 Publisher/Subscriber 场景。我有一个有效的解决方案,但对我来说实施似乎不正确:
我的问题是我需要手动实现发布者来注册订阅者并传递事件:
public void publishQuotes(String ticker) throws InterruptedException {
// [..] Here I generate some "lines" to be publisher
for (Subscriber<? super String> subscriber : subscribers) {
lineList.forEach(line -> subscriber.onNext(line));
}
}
@Override
public void subscribe(Subscriber<? super String> subscriber) {
subscribers.add(subscriber);
}
然后,我有一个 WorkQueue 处理器(应该是消费者):
WorkQueueProcessor<String> sink = WorkQueueProcessor.create();
// Here I subscribe to my publiser
publisher.subscribe(sink);
// Creates a Reactive Stream from the processor (having converted the lines to Quotations)
Flux<StockQuotation> mappedRS = sink.map(quotationConverter::convertHistoricalCSVToStockQuotation);
// Here I perform a number of stream transformations
// Each call to consume will be executed in a separated Thread
filteredRS.consume(i -> System.out.println(Thread.currentThread() + " data=" + i));
它工作正常,但非常丑陋。 In this example 取自 Spring 指南,他们使用 EventBus 将事件从发布者路由到消费者,但是,当我尝试使用我的处理器 link 它时,我得到了这个编译器错误:
eventBus.on($("quotes"),sink);
The method on(Selector, Consumer<T>) in the type EventBus is not applicable for the arguments (Selector<String>, WorkQueueProcessor<String>)
我在这里迷路了,link 拥有处理器的发布者的最佳方式是什么?你会推荐使用 EventBus 吗?如果是这样,正确的调用是什么?
谢谢!
如果您使用 EventBus,您将通过
发布您的台词
eventBus.notify("quotes", Event.wrap(line);
并通过
订阅
eventBus.on($("quotes"), e -> System.out.println(Thread.currentThread() + " data=" + e);
其中 "e" 属于事件类型 。
我不知道如何使用 Reactor 正确实施 Publisher/Subscriber 场景。我有一个有效的解决方案,但对我来说实施似乎不正确:
我的问题是我需要手动实现发布者来注册订阅者并传递事件:
public void publishQuotes(String ticker) throws InterruptedException {
// [..] Here I generate some "lines" to be publisher
for (Subscriber<? super String> subscriber : subscribers) {
lineList.forEach(line -> subscriber.onNext(line));
}
}
@Override
public void subscribe(Subscriber<? super String> subscriber) {
subscribers.add(subscriber);
}
然后,我有一个 WorkQueue 处理器(应该是消费者):
WorkQueueProcessor<String> sink = WorkQueueProcessor.create();
// Here I subscribe to my publiser
publisher.subscribe(sink);
// Creates a Reactive Stream from the processor (having converted the lines to Quotations)
Flux<StockQuotation> mappedRS = sink.map(quotationConverter::convertHistoricalCSVToStockQuotation);
// Here I perform a number of stream transformations
// Each call to consume will be executed in a separated Thread
filteredRS.consume(i -> System.out.println(Thread.currentThread() + " data=" + i));
它工作正常,但非常丑陋。 In this example 取自 Spring 指南,他们使用 EventBus 将事件从发布者路由到消费者,但是,当我尝试使用我的处理器 link 它时,我得到了这个编译器错误:
eventBus.on($("quotes"),sink);
The method on(Selector, Consumer<T>) in the type EventBus is not applicable for the arguments (Selector<String>, WorkQueueProcessor<String>)
我在这里迷路了,link 拥有处理器的发布者的最佳方式是什么?你会推荐使用 EventBus 吗?如果是这样,正确的调用是什么?
谢谢!
如果您使用 EventBus,您将通过
发布您的台词 eventBus.notify("quotes", Event.wrap(line);
并通过
订阅eventBus.on($("quotes"), e -> System.out.println(Thread.currentThread() + " data=" + e);
其中 "e" 属于事件类型