如何在 Project Reactor 3 中将冷流转换为热流?
How to convert cold stream into hot stream in Project Reactor 3?
按照Mono和Flux的定义,都是异步的数据序列,订阅之前什么都不会发生。
出版商有两大类:热的和冷的。
Mono 和 Flux 为每个订阅重新生成数据。如果没有创建订阅,则永远不会生成数据。
另一方面,热门发布者不依赖于任何数量的订阅者。
这是我的冷流代码:
System.out.println("*********Calling coldStream************");
Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
.doOnNext(System.out::println)
.filter(s -> s.startsWith("l"))
.map(String::toUpperCase);
source.subscribe(d -> System.out.println("Subscriber 1: "+d));
source.subscribe(d -> System.out.println("Subscriber 2: "+d));
System.out.println("-------------------------------------");
这是输出:
*********Calling composeStream************
ram
sam
dam
lam
Subscriber 1: LAM
ram
sam
dam
lam
Subscriber 2: LAM
-------------------------------------
如何将上面的冷流转换成热流?
热门发布者不依赖于任何数量的订阅者。他们可能会立即开始发布数据,并会在新订阅者进来时继续这样做(在这种情况下,订阅者只会看到订阅后发出的新元素)。 Reactor 中的大多数其他热门发布者都扩展了 Processor(在本例中为 UnicastProcessor)。
这是我们如何实现的
System.out.println("*********Calling hotStream************");
UnicastProcessor<String> hotSource = UnicastProcessor.create();
Flux<String> hotFlux = hotSource.publish()
.autoConnect()
.map(String::toUpperCase);
hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));
hotSource.onNext("ram");
hotSource.onNext("sam");
hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));
hotSource.onNext("dam");
hotSource.onNext("lam");
hotSource.onComplete();
System.out.println("-------------------------------------");
这里是输出结果:
*********Calling hotStream************
Subscriber 1 to Hot Source: RAM
Subscriber 1 to Hot Source: SAM
Subscriber 1 to Hot Source: DAM
Subscriber 2 to Hot Source: DAM
Subscriber 1 to Hot Source: LAM
Subscriber 2 to Hot Source: LAM
-------------------------------------
订阅者 1 抓住了所有四种颜色。订阅者 2 在前两种颜色生成后创建,它只捕获最后两种颜色。
无论何时附加订阅,操作员描述的流程都会在这个 Flux 上运行。
您可以通过在冷流上调用 "publish" 将冷流转换为热流,它将创建一个 ConnectableFlux。
因为它是一个热门流,所以在你调用它的连接方法之前不会发生任何事情,即使你订阅了也是如此。试试这个例子:
Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
.doOnNext(System.out::println)
.filter(s -> s.startsWith("l"))
.map(String::toUpperCase);
ConnectableFlux<String> connectable = source.publish();
connectable.subscribe(d -> System.out.println("Subscriber 1: "+d));
connectable.subscribe(d -> System.out.println("Subscriber 2: "+d));
connectable.connect();
输出是:
ram sam dam lam Subscriber 1: LAM Subscriber 2: LAM
第二个例子:
Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
.doOnNext(System.out::println)
.filter(s -> s.startsWith("l"))
.map(String::toUpperCase);
ConnectableFlux<String> connectable = source.publish();
connectable.subscribe(d -> System.out.println("Subscriber 1: "+d));
connectable.connect();
connectable.subscribe(d -> System.out.println("Subscriber 2: "+d));
输出为:
ram
sam
dam
lam
Subscriber 1: LAM
通过这两个示例,您可以看到数据从我们调用 "connect" 方法的那一刻开始流动
按照Mono和Flux的定义,都是异步的数据序列,订阅之前什么都不会发生。
出版商有两大类:热的和冷的。 Mono 和 Flux 为每个订阅重新生成数据。如果没有创建订阅,则永远不会生成数据。
另一方面,热门发布者不依赖于任何数量的订阅者。
这是我的冷流代码:
System.out.println("*********Calling coldStream************");
Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
.doOnNext(System.out::println)
.filter(s -> s.startsWith("l"))
.map(String::toUpperCase);
source.subscribe(d -> System.out.println("Subscriber 1: "+d));
source.subscribe(d -> System.out.println("Subscriber 2: "+d));
System.out.println("-------------------------------------");
这是输出:
*********Calling composeStream************
ram
sam
dam
lam
Subscriber 1: LAM
ram
sam
dam
lam
Subscriber 2: LAM
-------------------------------------
如何将上面的冷流转换成热流?
热门发布者不依赖于任何数量的订阅者。他们可能会立即开始发布数据,并会在新订阅者进来时继续这样做(在这种情况下,订阅者只会看到订阅后发出的新元素)。 Reactor 中的大多数其他热门发布者都扩展了 Processor(在本例中为 UnicastProcessor)。
这是我们如何实现的
System.out.println("*********Calling hotStream************");
UnicastProcessor<String> hotSource = UnicastProcessor.create();
Flux<String> hotFlux = hotSource.publish()
.autoConnect()
.map(String::toUpperCase);
hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));
hotSource.onNext("ram");
hotSource.onNext("sam");
hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));
hotSource.onNext("dam");
hotSource.onNext("lam");
hotSource.onComplete();
System.out.println("-------------------------------------");
这里是输出结果:
*********Calling hotStream************
Subscriber 1 to Hot Source: RAM
Subscriber 1 to Hot Source: SAM
Subscriber 1 to Hot Source: DAM
Subscriber 2 to Hot Source: DAM
Subscriber 1 to Hot Source: LAM
Subscriber 2 to Hot Source: LAM
-------------------------------------
订阅者 1 抓住了所有四种颜色。订阅者 2 在前两种颜色生成后创建,它只捕获最后两种颜色。 无论何时附加订阅,操作员描述的流程都会在这个 Flux 上运行。
您可以通过在冷流上调用 "publish" 将冷流转换为热流,它将创建一个 ConnectableFlux。 因为它是一个热门流,所以在你调用它的连接方法之前不会发生任何事情,即使你订阅了也是如此。试试这个例子:
Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
.doOnNext(System.out::println)
.filter(s -> s.startsWith("l"))
.map(String::toUpperCase);
ConnectableFlux<String> connectable = source.publish();
connectable.subscribe(d -> System.out.println("Subscriber 1: "+d));
connectable.subscribe(d -> System.out.println("Subscriber 2: "+d));
connectable.connect();
输出是:
ram sam dam lam Subscriber 1: LAM Subscriber 2: LAM
第二个例子:
Flux<String> source = Flux.fromIterable(Arrays.asList("ram", "sam", "dam", "lam"))
.doOnNext(System.out::println)
.filter(s -> s.startsWith("l"))
.map(String::toUpperCase);
ConnectableFlux<String> connectable = source.publish();
connectable.subscribe(d -> System.out.println("Subscriber 1: "+d));
connectable.connect();
connectable.subscribe(d -> System.out.println("Subscriber 2: "+d));
输出为:
ram sam dam lam Subscriber 1: LAM
通过这两个示例,您可以看到数据从我们调用 "connect" 方法的那一刻开始流动