订阅者可以充当发布者吗?

Can a Subscriber act as a Publisher?

就 Reactive Streams 而言,有一个 Publisher,它可以有多个 Subscriber。

但是假设订阅者从发布者那里得到一条消息。现在这个订阅者(比如 Subs1)changes/modifies 消息并将它传递给其他订阅者(比如 Subs2),后者使用修改后的消息。

那么这个 Subs1 订阅者可以充当发布者,将消息传递给新的 Subs2 订阅者吗?

我不确定是否可能,但我认为这种情况是可能的。

如果可能,请提出可行的方法。

If we want to transform incoming message and pass it further to the next Subscriber, we need to implement the Processor interface. This acts both as a Subscriber because it receives messages, and as the Publisher because it processes those messages and sends them for further processing.

下面是完整的实现:

  1. 创建一个 MyTransformer class,它实现 Processor 并扩展 SubmissionPublisher,因为它将同时充当订阅者和发布者:

    import java.util.concurrent.Flow;
    import java.util.concurrent.Flow.Subscription;
    import java.util.concurrent.SubmissionPublisher;
    import java.util.function.Function;
    
    public class MyTransformer<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
    
    private Function<T, R> function;
    private Flow.Subscription subscription;
    
    public MyTransformer(Function<T, R> function) {
        super();
        this.function = function;
    }
    
    @Override
    public void onComplete() {
        System.out.println("Transformer Completed");
    }
    
    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }
    
    @Override
    public void onNext(T item) {
        System.out.println("Transformer Got : "+item);
        submit(function.apply(item));
        subscription.request(1);
    
    }
    
    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
    
    
    
    }
    
  2. 创建一个 TestSubscriber class 实现 Subscriber 接口并实现所需的方法:

在处理开始之前调用 onSubscribe() 方法。订阅的实例作为参数传递。它是一个class,用于控制订阅者和发布者之间的消息流。

这里的主要方法是 onNext() – 每当发布者发布新消息时都会调用它。

我们正在使用实现 Publisher 接口的 SubmissionPublisher class。

我们将向发布者提交 N 个元素——我们的测试订阅者将接收这些元素。

请注意,我们正在调用 TestSubscriber 实例上的 close() 方法。它将在给定发布者的每个订阅者下面调用 onComplete() 回调。

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class TestSubscriber<T> implements Subscriber<T> {

    private Subscription subscription;

    public List<T> consumed = new LinkedList<>();

    @Override
    public void onComplete() {
        System.out.println("Subsciber Completed");
    }

    @Override
    public void onError(Throwable arg0) {
        arg0.printStackTrace();
    }

    @Override
    public void onNext(T item) {
        System.out.println("In Subscriber Got : "+item);
        subscription.request(1);

    }

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

}
  1. 这里是Publisher发布String元素的处理流程。

MyTransformer 正在将字符串解析为整数——这意味着需要在此处进行转换。

import java.util.List;
import java.util.concurrent.SubmissionPublisher;;

public class TestTransformer {

    public static void main(String... args) {
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        MyTransformer<String, Integer> transformProcessor = new MyTransformer<>(Integer::parseInt);

        TestSubscriber<Integer> subscriber = new TestSubscriber<>();
        List<String> items = List.of("1", "2", "3");

        List<Integer> expectedResult = List.of(1, 2, 3);

        publisher.subscribe(transformProcessor);
        transformProcessor.subscribe(subscriber);
        items.forEach(publisher::submit);
        publisher.close();

    }
}