为什么我的 RxJava Flowable 在使用 observeOn 时不考虑背压?
Why doesn't my RxJava Flowable respect backpressure when using observeOn?
我正在尝试创建一个 Flowable
发出有关背压的事件以避免内存问题,同时 运行 并行转换每个阶段以提高效率。我已经创建了一个简单的测试程序来推断我程序不同步骤的行为,以及何时发出事件与在不同阶段等待。
我的程序如下:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList())
.stream().map(i -> {
System.out.println("emitting:" + i);
return i;
});
Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator());
System.out.println(String.format("Buffer size: %d", flowable.bufferSize()));
Long count = flowable.onBackpressureBuffer(10)
.buffer(10)
.flatMap(buf -> {
System.out.println("Sleeping 500 for batch");
Thread.sleep(500);
System.out.println("Got batch of events");
return Flowable.fromIterable(buf);
}, 1)
.map(x -> x + 1)
.doOnNext(i -> {
System.out.println(String.format("Sleeping : %d", i));
Thread.sleep(100);
System.out.println(i);
})
.count()
.blockingGet();
System.out.println("count: " + count);
}
当我 运行 这样做时,我得到了符合预期的背压输出,其中一批事件被发送到 buffer
中的大小,然后它们被平面映射,最后是一些动作被一张一张打印出来:
Buffer size: 128
emitting:0
emitting:1
emitting:2
emitting:3
emitting:4
emitting:5
emitting:6
emitting:7
emitting:8
emitting:9
Sleeping 500 for batch
Got batch of events
Sleeping : 1
1
Sleeping : 2
2
Sleeping : 3
3
Sleeping : 4
4
Sleeping : 5
5
Sleeping : 6
6
Sleeping : 7
7
Sleeping : 8
8
Sleeping : 9
9
Sleeping : 10
10
emitting:10
emitting:11
emitting:12
emitting:13
emitting:14
emitting:15
emitting:16
emitting:17
emitting:18
emitting:19
Sleeping 500 for batch
Got batch of events
Sleeping : 11
11
Sleeping : 12
12
Sleeping : 13
然而,如果我试图通过添加一些对 .observeOn(Schedulers.computation())
的调用来并行化不同的操作阶段,那么我的程序似乎不再考虑背压。我的代码现在看起来像:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList())
.stream().map(i -> {
System.out.println("emitting:" + i);
return i;
});
Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator());
System.out.println(String.format("Buffer size: %d", flowable.bufferSize()));
Long count = flowable.onBackpressureBuffer(10)
.buffer(10)
.observeOn(Schedulers.computation())
.flatMap(buf -> {
System.out.println("Sleeping 500 for batch");
Thread.sleep(500);
System.out.println("Got batch of events");
return Flowable.fromIterable(buf);
}, 1)
.map(x -> x + 1)
.observeOn(Schedulers.computation())
.doOnNext(i -> {
System.out.println(String.format("Sleeping : %d", i));
Thread.sleep(100);
System.out.println(i);
})
.observeOn(Schedulers.computation())
.count()
.blockingGet();
System.out.println("count: " + count);
}
我的输出如下,我的所有事件都是预先发出的,而不是考虑各个执行阶段指定的背压和缓冲区:
Buffer size: 128
emitting:0
emitting:1
emitting:2
emitting:3
emitting:4
emitting:5
emitting:6
emitting:7
emitting:8
emitting:9
emitting:10
Sleeping 500 for batch
emitting:11
emitting:12
... everything else is emitted here ...
emitting:998
emitting:999
Got batch of events
Sleeping 500 for batch
Sleeping : 1
1
Sleeping : 2
2
Sleeping : 3
3
Sleeping : 4
4
Sleeping : 5
Got batch of events
Sleeping 500 for batch
5
Sleeping : 6
6
Sleeping : 7
7
Sleeping : 8
8
Sleeping : 9
9
Sleeping : 10
Got batch of events
Sleeping 500 for batch
10
Sleeping : 11
11
Sleeping : 12
12
Sleeping : 13
13
Sleeping : 14
14
Sleeping : 15
Got batch of events
Sleeping 500 for batch
15
Sleeping : 16
16
Sleeping : 17
17
Sleeping : 18
18
Sleeping : 19
19
Sleeping : 20
Got batch of events
Sleeping 500 for batch
20
Sleeping : 21
21
Sleeping : 22
22
Sleeping : 23
23
Sleeping : 24
24
Sleeping : 25
Got batch of events
Sleeping 500 for batch
25
假装我的批处理阶段正在调用外部服务,但由于延迟,我希望它们并行 运行。我还想在给定时间控制内存中的项目数量,因为最初发出的项目数量可能变化很大,并且批量运行的阶段 运行 比事件的初始发射慢得多。
如何让我的 Flowable
尊重 Scheduler
的背压?为什么它似乎只在我调用 observeOn
时不尊重背压?
How can I have my Flowable respect backpressure across a Scheduler
实际上,应用 onBackpressureBuffer
会使它上面的源与下游施加的任何背压断开连接,因为它是一个无界运算符。你不需要它,因为 Flowable.fromIterable
(顺便说一下,RxJava 有一个 range
运算符)支持并尊重背压。
Why does it seem to only disrespect backpressure when I sprinkle in calls to observeOn?
在第一个示例中,发生了称为调用堆栈阻塞 的自然背压。 RxJava 默认是同步的,大多数运算符不引入异步,就像第一个例子中的 none 那样。
observeOn
引入了异步边界,因此理论上,阶段可以 运行 相互并行。它有一个默认的 128 元素预取缓冲区,可以通过其重载之一进行调整。但是,在您的情况下,buffer(10) 实际上会将预取量放大到 1280,这仍然可能导致一次完全消耗 1000 个元素的长源。
我正在尝试创建一个 Flowable
发出有关背压的事件以避免内存问题,同时 运行 并行转换每个阶段以提高效率。我已经创建了一个简单的测试程序来推断我程序不同步骤的行为,以及何时发出事件与在不同阶段等待。
我的程序如下:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList())
.stream().map(i -> {
System.out.println("emitting:" + i);
return i;
});
Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator());
System.out.println(String.format("Buffer size: %d", flowable.bufferSize()));
Long count = flowable.onBackpressureBuffer(10)
.buffer(10)
.flatMap(buf -> {
System.out.println("Sleeping 500 for batch");
Thread.sleep(500);
System.out.println("Got batch of events");
return Flowable.fromIterable(buf);
}, 1)
.map(x -> x + 1)
.doOnNext(i -> {
System.out.println(String.format("Sleeping : %d", i));
Thread.sleep(100);
System.out.println(i);
})
.count()
.blockingGet();
System.out.println("count: " + count);
}
当我 运行 这样做时,我得到了符合预期的背压输出,其中一批事件被发送到 buffer
中的大小,然后它们被平面映射,最后是一些动作被一张一张打印出来:
Buffer size: 128
emitting:0
emitting:1
emitting:2
emitting:3
emitting:4
emitting:5
emitting:6
emitting:7
emitting:8
emitting:9
Sleeping 500 for batch
Got batch of events
Sleeping : 1
1
Sleeping : 2
2
Sleeping : 3
3
Sleeping : 4
4
Sleeping : 5
5
Sleeping : 6
6
Sleeping : 7
7
Sleeping : 8
8
Sleeping : 9
9
Sleeping : 10
10
emitting:10
emitting:11
emitting:12
emitting:13
emitting:14
emitting:15
emitting:16
emitting:17
emitting:18
emitting:19
Sleeping 500 for batch
Got batch of events
Sleeping : 11
11
Sleeping : 12
12
Sleeping : 13
然而,如果我试图通过添加一些对 .observeOn(Schedulers.computation())
的调用来并行化不同的操作阶段,那么我的程序似乎不再考虑背压。我的代码现在看起来像:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList())
.stream().map(i -> {
System.out.println("emitting:" + i);
return i;
});
Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator());
System.out.println(String.format("Buffer size: %d", flowable.bufferSize()));
Long count = flowable.onBackpressureBuffer(10)
.buffer(10)
.observeOn(Schedulers.computation())
.flatMap(buf -> {
System.out.println("Sleeping 500 for batch");
Thread.sleep(500);
System.out.println("Got batch of events");
return Flowable.fromIterable(buf);
}, 1)
.map(x -> x + 1)
.observeOn(Schedulers.computation())
.doOnNext(i -> {
System.out.println(String.format("Sleeping : %d", i));
Thread.sleep(100);
System.out.println(i);
})
.observeOn(Schedulers.computation())
.count()
.blockingGet();
System.out.println("count: " + count);
}
我的输出如下,我的所有事件都是预先发出的,而不是考虑各个执行阶段指定的背压和缓冲区:
Buffer size: 128
emitting:0
emitting:1
emitting:2
emitting:3
emitting:4
emitting:5
emitting:6
emitting:7
emitting:8
emitting:9
emitting:10
Sleeping 500 for batch
emitting:11
emitting:12
... everything else is emitted here ...
emitting:998
emitting:999
Got batch of events
Sleeping 500 for batch
Sleeping : 1
1
Sleeping : 2
2
Sleeping : 3
3
Sleeping : 4
4
Sleeping : 5
Got batch of events
Sleeping 500 for batch
5
Sleeping : 6
6
Sleeping : 7
7
Sleeping : 8
8
Sleeping : 9
9
Sleeping : 10
Got batch of events
Sleeping 500 for batch
10
Sleeping : 11
11
Sleeping : 12
12
Sleeping : 13
13
Sleeping : 14
14
Sleeping : 15
Got batch of events
Sleeping 500 for batch
15
Sleeping : 16
16
Sleeping : 17
17
Sleeping : 18
18
Sleeping : 19
19
Sleeping : 20
Got batch of events
Sleeping 500 for batch
20
Sleeping : 21
21
Sleeping : 22
22
Sleeping : 23
23
Sleeping : 24
24
Sleeping : 25
Got batch of events
Sleeping 500 for batch
25
假装我的批处理阶段正在调用外部服务,但由于延迟,我希望它们并行 运行。我还想在给定时间控制内存中的项目数量,因为最初发出的项目数量可能变化很大,并且批量运行的阶段 运行 比事件的初始发射慢得多。
如何让我的 Flowable
尊重 Scheduler
的背压?为什么它似乎只在我调用 observeOn
时不尊重背压?
How can I have my Flowable respect backpressure across a Scheduler
实际上,应用 onBackpressureBuffer
会使它上面的源与下游施加的任何背压断开连接,因为它是一个无界运算符。你不需要它,因为 Flowable.fromIterable
(顺便说一下,RxJava 有一个 range
运算符)支持并尊重背压。
Why does it seem to only disrespect backpressure when I sprinkle in calls to observeOn?
在第一个示例中,发生了称为调用堆栈阻塞 的自然背压。 RxJava 默认是同步的,大多数运算符不引入异步,就像第一个例子中的 none 那样。
observeOn
引入了异步边界,因此理论上,阶段可以 运行 相互并行。它有一个默认的 128 元素预取缓冲区,可以通过其重载之一进行调整。但是,在您的情况下,buffer(10) 实际上会将预取量放大到 1280,这仍然可能导致一次完全消耗 1000 个元素的长源。