Akka 流:使用 OverflowStrategy.fail() 模拟失败的流
Akka streams: Simulating a failed stream with OverflowStrategy.fail()
我是 Akka 和 Akka 流的新手。我创建了一个虚拟流,我希望它以异常结束,因为我的 map() 函数非常慢,我将缓冲区设置为 1
.
所以我的问题分为两部分:
- 为什么这段代码可以正常运行?
- 如何模拟溢出? (用于学习目的)
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
public class Application {
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create("reactive-test");
Source<Integer, NotUsed> source =
Source.range(0, 10000000)
.buffer(1, OverflowStrategy.fail())
.map(Application::doubleInt);
source.runWith(Sink.foreach(a -> System.out.println(a)), system);
}
private static Integer doubleInt(int i) {
try {
Thread.sleep(2_000);
} catch (Exception e) {
System.out.println(e);
}
return 2 * i;
}
}
Why this code works without a failure?
原因是背压。 Source 不会产生比 Sink 消耗的更多的元素,因此 Slow Sink 直接影响元素产生的速度。因此,您的缓冲区永远不会溢出。
How can I simulate an overflow? (for learning purposes)
有一个sink很急,但同时又很慢。可以通过添加创建 1000 个元素的列表并将其传递到下游的 grouped(1000)
来模拟它。
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
public class StreamsBufJava {
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create("reactive-test");
Source<Integer, NotUsed> source =
Source.range(0, 10000000)
.buffer(1, OverflowStrategy.fail())
.grouped(1000)
.mapConcat(list -> list)
.map(StreamsBufJava::doubleInt);
source.runWith(Sink.foreach(System.out::println), system);
}
private static Integer doubleInt(int i) {
try {
Thread.sleep(2_000);
} catch (Exception e) {
System.out.println(e);
}
return 2 * i;
}
}
产生
0
2
[ERROR] [04/17/2020 09:40:47.671] [reactive-test-akka.actor.default-dispatcher-5] [Buffer(akka://reactive-test)] Failing because buffer is full and overflowStrategy is: [Fail] in stream [class akka.stream.impl.fusing.Buffer$$anon]
4
6
8
我是 Akka 和 Akka 流的新手。我创建了一个虚拟流,我希望它以异常结束,因为我的 map() 函数非常慢,我将缓冲区设置为 1
.
所以我的问题分为两部分:
- 为什么这段代码可以正常运行?
- 如何模拟溢出? (用于学习目的)
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
public class Application {
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create("reactive-test");
Source<Integer, NotUsed> source =
Source.range(0, 10000000)
.buffer(1, OverflowStrategy.fail())
.map(Application::doubleInt);
source.runWith(Sink.foreach(a -> System.out.println(a)), system);
}
private static Integer doubleInt(int i) {
try {
Thread.sleep(2_000);
} catch (Exception e) {
System.out.println(e);
}
return 2 * i;
}
}
Why this code works without a failure?
原因是背压。 Source 不会产生比 Sink 消耗的更多的元素,因此 Slow Sink 直接影响元素产生的速度。因此,您的缓冲区永远不会溢出。
How can I simulate an overflow? (for learning purposes)
有一个sink很急,但同时又很慢。可以通过添加创建 1000 个元素的列表并将其传递到下游的 grouped(1000)
来模拟它。
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
public class StreamsBufJava {
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create("reactive-test");
Source<Integer, NotUsed> source =
Source.range(0, 10000000)
.buffer(1, OverflowStrategy.fail())
.grouped(1000)
.mapConcat(list -> list)
.map(StreamsBufJava::doubleInt);
source.runWith(Sink.foreach(System.out::println), system);
}
private static Integer doubleInt(int i) {
try {
Thread.sleep(2_000);
} catch (Exception e) {
System.out.println(e);
}
return 2 * i;
}
}
产生
0
2
[ERROR] [04/17/2020 09:40:47.671] [reactive-test-akka.actor.default-dispatcher-5] [Buffer(akka://reactive-test)] Failing because buffer is full and overflowStrategy is: [Fail] in stream [class akka.stream.impl.fusing.Buffer$$anon]
4
6
8