如何将消息传递给另一个订阅者?
How to pass a message to another Subscriber?
我正在学习 Java 9.
的 Reactive Stream API
因为它有Publisher和Subscriber,而Subscriber订阅Publisher,Subscriber也实现了如下消息:
public class TestSubscriber<T> implements Subscriber<T> {
@Override
public void onComplete() {
// TODO Auto-generated method stub
}
@Override
public void onError(Throwable arg0) {
// TODO Auto-generated method stub
}
@Override
public void onNext(T arg0) {
// TODO Auto-generated method stub
}
@Override
public void onSubscribe(Subscription arg0) {
// TODO Auto-generated method stub
}
}
我没有在 Subscriber 中找到任何 passes/transfers 将消息发送给另一个订阅者的方法。
有什么建议吗?
这可以通过实施 Flow.Processor 来完成,如下所示:
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
public class MyTransformer<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
private Function<T, R> function;
private Flow.Subscription subscription;
public MyTransformer(Function<T, R> function) {
super();
this.function = function;
}
@Override
public void onComplete() {
System.out.println("Transformer Completed");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(T item) {
System.out.println("Transformer Got : "+item);
submit(function.apply(item));
subscription.request(1);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
}
调用时使用如下:
public class TestTransformer {
public static void main(String... args) {
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
MyTransformer<String, Integer> transformProcessor = new MyTransformer<>(Integer::parseInt);
TestSubscriber<Integer> subscriber = new TestSubscriber<>();
List<String> items = List.of("1", "2", "3");
List<Integer> expectedResult = List.of(1, 2, 3);
publisher.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
items.forEach(publisher::submit);
publisher.close();
}
}
我正在学习 Java 9.
的 Reactive Stream API因为它有Publisher和Subscriber,而Subscriber订阅Publisher,Subscriber也实现了如下消息:
public class TestSubscriber<T> implements Subscriber<T> {
@Override
public void onComplete() {
// TODO Auto-generated method stub
}
@Override
public void onError(Throwable arg0) {
// TODO Auto-generated method stub
}
@Override
public void onNext(T arg0) {
// TODO Auto-generated method stub
}
@Override
public void onSubscribe(Subscription arg0) {
// TODO Auto-generated method stub
}
}
我没有在 Subscriber 中找到任何 passes/transfers 将消息发送给另一个订阅者的方法。 有什么建议吗?
这可以通过实施 Flow.Processor 来完成,如下所示:
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
public class MyTransformer<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
private Function<T, R> function;
private Flow.Subscription subscription;
public MyTransformer(Function<T, R> function) {
super();
this.function = function;
}
@Override
public void onComplete() {
System.out.println("Transformer Completed");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(T item) {
System.out.println("Transformer Got : "+item);
submit(function.apply(item));
subscription.request(1);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
}
调用时使用如下:
public class TestTransformer {
public static void main(String... args) {
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
MyTransformer<String, Integer> transformProcessor = new MyTransformer<>(Integer::parseInt);
TestSubscriber<Integer> subscriber = new TestSubscriber<>();
List<String> items = List.of("1", "2", "3");
List<Integer> expectedResult = List.of(1, 2, 3);
publisher.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
items.forEach(publisher::submit);
publisher.close();
}
}