在 Reactor 中使用 Flux.push() 会丢失一些物品
lose some items by using Flux.push() in Reactor
我的代码如下:
public class SequenceCreator {
public Consumer<List<Integer>> consumer;
public Flux<Integer> createNumberSequence() {
return Flux.push(sink -> consumer = items -> items.forEach(sink::next));
}
public static void main(String[] args) throws InterruptedException {
SequenceCreator sequenceCreator = new SequenceCreator();
List<Integer> sequence1 = Lists.newArrayList(1,2,3,4,5);
List<Integer> sequence2 = Lists.newArrayList(6,7,8,9,10);
Thread producingThread1 = new Thread(
() -> sequenceCreator.consumer.accept(sequence1));
Thread producingThread2 = new Thread(
() -> sequenceCreator.consumer.accept(sequence2));
sequenceCreator.createNumberSequence().subscribe(System.out::println);
producingThread1.start();
producingThread2.start();
while (true) {
Thread.sleep(1000);
}
}
}
输出为
1
2个
3个
4个
5个
7
8个
9
10
不知道为什么没有输出数字6?是不是多线程的原因?
I don't know why the number 6 is not output?Is it the cause of multi-thread?
是的,几乎可以肯定。查看 Flux.push
:
的 Javadoc
Programmatically create a Flux with the capability of emitting multiple elements from a single-threaded producer through the FluxSink API. For a multi-threaded capable alternative, see create(Consumer).
您没有使用单线程生产者(违反了记录的要求),因此在这种情况下行为基本上是未定义的。您需要按照文档的建议切换到 Flux.create
,因为您正在使用多个线程进行发布。
我的代码如下:
public class SequenceCreator {
public Consumer<List<Integer>> consumer;
public Flux<Integer> createNumberSequence() {
return Flux.push(sink -> consumer = items -> items.forEach(sink::next));
}
public static void main(String[] args) throws InterruptedException {
SequenceCreator sequenceCreator = new SequenceCreator();
List<Integer> sequence1 = Lists.newArrayList(1,2,3,4,5);
List<Integer> sequence2 = Lists.newArrayList(6,7,8,9,10);
Thread producingThread1 = new Thread(
() -> sequenceCreator.consumer.accept(sequence1));
Thread producingThread2 = new Thread(
() -> sequenceCreator.consumer.accept(sequence2));
sequenceCreator.createNumberSequence().subscribe(System.out::println);
producingThread1.start();
producingThread2.start();
while (true) {
Thread.sleep(1000);
}
}
}
输出为
1 2个 3个 4个 5个 7 8个 9 10
不知道为什么没有输出数字6?是不是多线程的原因?
I don't know why the number 6 is not output?Is it the cause of multi-thread?
是的,几乎可以肯定。查看 Flux.push
:
Programmatically create a Flux with the capability of emitting multiple elements from a single-threaded producer through the FluxSink API. For a multi-threaded capable alternative, see create(Consumer).
您没有使用单线程生产者(违反了记录的要求),因此在这种情况下行为基本上是未定义的。您需要按照文档的建议切换到 Flux.create
,因为您正在使用多个线程进行发布。