Reactor 中的 EventBus 3.x

EventBus in Reactor 3.x

我知道 Reactor3.x 中不推荐使用 EventBus,建议的解决方案是 ReplayProcessor。我已阅读 https://github.com/reactor/reactor-core/issues/375。但是这里的代码太草稿了。我创建了一个演示项目来证明这里的想法。有人可以给点意见吗?

======== Application.java

package hello;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

import reactor.core.publisher.Flux;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.BaseSubscriber;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

@Configuration
@EnableAutoConfiguration
@ComponentScan
public class Application implements CommandLineRunner {

    private static final int NUMBER_OF_QUOTES = 10;

    @Bean
    ReplayProcessor createReplayProcessor() {

        ReplayProcessor<MyEvent> rp = ReplayProcessor.create();

        Flux<MyEvent> interest1 = rp.filter(ev -> filterInterest1(ev));

        Flux<MyEvent> interest2 = rp.filter(ev -> filterInterest2(ev));

        interest1.subscribe(new BaseSubscriber<MyEvent>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                requestUnbounded();
            }
            @Override
            protected void hookOnNext(MyEvent value) {
                //todo: call service method
                System.out.println("event 1 handler -> event name:" + value.getEventName());
            }

        });


        interest2.subscribe(new BaseSubscriber<MyEvent>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                requestUnbounded();
            }
            @Override
            protected void hookOnNext(MyEvent value) {
                //todo: call service method
                System.out.println("event2 handler -> event name:" + value.getEventName());
            }
        });

        return rp;
    }

    public boolean filterInterest1(MyEvent myEvent) {
        if (myEvent != null && myEvent.getEventName() != null
                && myEvent.getEventName().equalsIgnoreCase("event1")) {
            return true;
        }
        return false;
    }

    public boolean filterInterest2(MyEvent myEvent) {
        if (myEvent != null && myEvent.getEventName() != null
                && myEvent.getEventName().equalsIgnoreCase("event2")) {
            return true;
        }
        return false;
    }


    @Autowired
    private Publisher publisher;

    @Bean
    public CountDownLatch latch() {
        return new CountDownLatch(NUMBER_OF_QUOTES);
    }

    @Override
    public void run(String... args) throws Exception {
        publisher.publishQuotes(NUMBER_OF_QUOTES);
    }

    public static void main(String[] args) throws InterruptedException {
        ApplicationContext app = SpringApplication.run(Application.class, args);

        app.getBean(CountDownLatch.class).await(10, TimeUnit.SECONDS);


    }

}

==========MyEvent.java=============

package hello;

public class MyEvent {

    private String eventName = "";

    public String getEventName() {
        return eventName;
    }

    public void setEventName(String eventName) {
        this.eventName = eventName;
    }

    public MyEvent(String eventName) {
        this.eventName =  eventName;
    }


    public void filterInterest1(MyEvent myEvent) {

    }
}

=============Publisher.java ===========

package hello;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import reactor.core.publisher.ReplayProcessor;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

@Service
public class Publisher {

    @Autowired
    ReplayProcessor rp;

    @Autowired
    CountDownLatch latch;

    public void publishQuotes(int numberOfQuotes) throws InterruptedException {
        long start = System.currentTimeMillis();

        rp.onNext(new MyEvent("event1"));
        rp.onNext(new MyEvent("event2"));
        rp.onNext(new MyEvent("event3"));

        long elapsed = System.currentTimeMillis() - start;

        System.out.println("Elapsed time: " + elapsed + "ms");
        System.out.println("Average time per quote: " + elapsed / numberOfQuotes + "ms");
    }

}

整个代码是https://github.com/yigubigu/reactor-start-sample.git

恕我直言,您可以中继 Spring 事件处理程序。 Matt Raible 和 Josh Long 在这对教程中使用了它:

要点:

@Component class ProfileCreatedEventPublisher implements ApplicationListener<ProfileCreatedEvent>, Consumer<FluxSink<ProfileCreatedEvent>>

使用事件循环从 LinkedBlockingQueue.

中获取事件

@Override public void onApplicationEvent(ProfileCreatedEvent event)

排队可以在您的应用程序中的任何位置发布的事件。

ProfileCreatedEventPublisherServerSentEventController 中用于创建 Flux 事件(可以与 filter 链接),它转换并将它们发送到网络客户。