Reactor 中的 EventBus 3.x
EventBus in Reactor 3.x
我知道 Reactor3.x 中不推荐使用 EventBus,建议的解决方案是 ReplayProcessor。我已阅读 https://github.com/reactor/reactor-core/issues/375。但是这里的代码太草稿了。我创建了一个演示项目来证明这里的想法。有人可以给点意见吗?
======== Application.java
package hello;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.BaseSubscriber;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
@Configuration
@EnableAutoConfiguration
@ComponentScan
public class Application implements CommandLineRunner {
private static final int NUMBER_OF_QUOTES = 10;
@Bean
ReplayProcessor createReplayProcessor() {
ReplayProcessor<MyEvent> rp = ReplayProcessor.create();
Flux<MyEvent> interest1 = rp.filter(ev -> filterInterest1(ev));
Flux<MyEvent> interest2 = rp.filter(ev -> filterInterest2(ev));
interest1.subscribe(new BaseSubscriber<MyEvent>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
requestUnbounded();
}
@Override
protected void hookOnNext(MyEvent value) {
//todo: call service method
System.out.println("event 1 handler -> event name:" + value.getEventName());
}
});
interest2.subscribe(new BaseSubscriber<MyEvent>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
requestUnbounded();
}
@Override
protected void hookOnNext(MyEvent value) {
//todo: call service method
System.out.println("event2 handler -> event name:" + value.getEventName());
}
});
return rp;
}
public boolean filterInterest1(MyEvent myEvent) {
if (myEvent != null && myEvent.getEventName() != null
&& myEvent.getEventName().equalsIgnoreCase("event1")) {
return true;
}
return false;
}
public boolean filterInterest2(MyEvent myEvent) {
if (myEvent != null && myEvent.getEventName() != null
&& myEvent.getEventName().equalsIgnoreCase("event2")) {
return true;
}
return false;
}
@Autowired
private Publisher publisher;
@Bean
public CountDownLatch latch() {
return new CountDownLatch(NUMBER_OF_QUOTES);
}
@Override
public void run(String... args) throws Exception {
publisher.publishQuotes(NUMBER_OF_QUOTES);
}
public static void main(String[] args) throws InterruptedException {
ApplicationContext app = SpringApplication.run(Application.class, args);
app.getBean(CountDownLatch.class).await(10, TimeUnit.SECONDS);
}
}
==========MyEvent.java=============
package hello;
public class MyEvent {
private String eventName = "";
public String getEventName() {
return eventName;
}
public void setEventName(String eventName) {
this.eventName = eventName;
}
public MyEvent(String eventName) {
this.eventName = eventName;
}
public void filterInterest1(MyEvent myEvent) {
}
}
=============Publisher.java ===========
package hello;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.ReplayProcessor;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@Service
public class Publisher {
@Autowired
ReplayProcessor rp;
@Autowired
CountDownLatch latch;
public void publishQuotes(int numberOfQuotes) throws InterruptedException {
long start = System.currentTimeMillis();
rp.onNext(new MyEvent("event1"));
rp.onNext(new MyEvent("event2"));
rp.onNext(new MyEvent("event3"));
long elapsed = System.currentTimeMillis() - start;
System.out.println("Elapsed time: " + elapsed + "ms");
System.out.println("Average time per quote: " + elapsed / numberOfQuotes + "ms");
}
}
恕我直言,您可以中继 Spring 事件处理程序。 Matt Raible 和 Josh Long 在这对教程中使用了它:
- https://developer.okta.com/blog/2018/09/24/reactive-apis-with-spring-webflux
- https://developer.okta.com/blog/2018/09/25/spring-webflux-websockets-react
要点:
@Component
class ProfileCreatedEventPublisher implements
ApplicationListener<ProfileCreatedEvent>,
Consumer<FluxSink<ProfileCreatedEvent>>
使用事件循环从 LinkedBlockingQueue
.
中获取事件
@Override
public void onApplicationEvent(ProfileCreatedEvent event)
排队可以在您的应用程序中的任何位置发布的事件。
ProfileCreatedEventPublisher
在 ServerSentEventController
中用于创建 Flux
事件(可以与 filter
链接),它转换并将它们发送到网络客户。
我知道 Reactor3.x 中不推荐使用 EventBus,建议的解决方案是 ReplayProcessor。我已阅读 https://github.com/reactor/reactor-core/issues/375。但是这里的代码太草稿了。我创建了一个演示项目来证明这里的想法。有人可以给点意见吗?
======== Application.java
package hello;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.BaseSubscriber;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
@Configuration
@EnableAutoConfiguration
@ComponentScan
public class Application implements CommandLineRunner {
private static final int NUMBER_OF_QUOTES = 10;
@Bean
ReplayProcessor createReplayProcessor() {
ReplayProcessor<MyEvent> rp = ReplayProcessor.create();
Flux<MyEvent> interest1 = rp.filter(ev -> filterInterest1(ev));
Flux<MyEvent> interest2 = rp.filter(ev -> filterInterest2(ev));
interest1.subscribe(new BaseSubscriber<MyEvent>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
requestUnbounded();
}
@Override
protected void hookOnNext(MyEvent value) {
//todo: call service method
System.out.println("event 1 handler -> event name:" + value.getEventName());
}
});
interest2.subscribe(new BaseSubscriber<MyEvent>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
requestUnbounded();
}
@Override
protected void hookOnNext(MyEvent value) {
//todo: call service method
System.out.println("event2 handler -> event name:" + value.getEventName());
}
});
return rp;
}
public boolean filterInterest1(MyEvent myEvent) {
if (myEvent != null && myEvent.getEventName() != null
&& myEvent.getEventName().equalsIgnoreCase("event1")) {
return true;
}
return false;
}
public boolean filterInterest2(MyEvent myEvent) {
if (myEvent != null && myEvent.getEventName() != null
&& myEvent.getEventName().equalsIgnoreCase("event2")) {
return true;
}
return false;
}
@Autowired
private Publisher publisher;
@Bean
public CountDownLatch latch() {
return new CountDownLatch(NUMBER_OF_QUOTES);
}
@Override
public void run(String... args) throws Exception {
publisher.publishQuotes(NUMBER_OF_QUOTES);
}
public static void main(String[] args) throws InterruptedException {
ApplicationContext app = SpringApplication.run(Application.class, args);
app.getBean(CountDownLatch.class).await(10, TimeUnit.SECONDS);
}
}
==========MyEvent.java=============
package hello;
public class MyEvent {
private String eventName = "";
public String getEventName() {
return eventName;
}
public void setEventName(String eventName) {
this.eventName = eventName;
}
public MyEvent(String eventName) {
this.eventName = eventName;
}
public void filterInterest1(MyEvent myEvent) {
}
}
=============Publisher.java ===========
package hello;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.ReplayProcessor;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@Service
public class Publisher {
@Autowired
ReplayProcessor rp;
@Autowired
CountDownLatch latch;
public void publishQuotes(int numberOfQuotes) throws InterruptedException {
long start = System.currentTimeMillis();
rp.onNext(new MyEvent("event1"));
rp.onNext(new MyEvent("event2"));
rp.onNext(new MyEvent("event3"));
long elapsed = System.currentTimeMillis() - start;
System.out.println("Elapsed time: " + elapsed + "ms");
System.out.println("Average time per quote: " + elapsed / numberOfQuotes + "ms");
}
}
恕我直言,您可以中继 Spring 事件处理程序。 Matt Raible 和 Josh Long 在这对教程中使用了它:
- https://developer.okta.com/blog/2018/09/24/reactive-apis-with-spring-webflux
- https://developer.okta.com/blog/2018/09/25/spring-webflux-websockets-react
要点:
@Component
class ProfileCreatedEventPublisher implements
ApplicationListener<ProfileCreatedEvent>,
Consumer<FluxSink<ProfileCreatedEvent>>
使用事件循环从 LinkedBlockingQueue
.
@Override
public void onApplicationEvent(ProfileCreatedEvent event)
排队可以在您的应用程序中的任何位置发布的事件。
ProfileCreatedEventPublisher
在 ServerSentEventController
中用于创建 Flux
事件(可以与 filter
链接),它转换并将它们发送到网络客户。