我们可以并行处理每个多流水线步骤吗?
Can we get parallel processing of each of Multi pipeline steps?
假设我们有:
- 一个 URL 列表,这是我们 Multi
的来源
- 作为第一步,我们使用 HTTP 客户端调用
获取此页面的 HTML
- 然后我们尝试找到一些特定的标签并获取其内容
- 然后我们将找到的东西存储到数据库中
现在我们有 3 个步骤。有没有办法让这些步骤 运行 并行?我的意思是一段时间后它应该:抓取 HTML 并同时处理 html + 获取标签内容,同时将数据从已经处理过的项目保存到数据库中。(希望我的意思很明显)这我们可以进行并行处理的方式。默认情况下,我可以看到,mutiny 以串行方式执行。
这是一个例子:
@Test
public void test3() {
Multi<String> source = Multi.createFrom().items("a", "b", "c");
source
.onItem().transform(i -> trans(i, "-step1"))
.onItem().transform(i -> trans(i, "-step2"))
.onItem().transform(i -> trans(i, "-step3"))
.subscribe().with(item -> System.out.println("Subscriber received " + item));
}
private String trans(String s, String add) {
int t = new Random().nextInt(4) * 1000;
try {
print("Sleeping for '" + s + "' miliseconds: " + t);
Thread.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + add;
}
现在报告以下控制台输出:
Sleeping for 'a' miliseconds: 2000
Sleeping for 'a-step1' miliseconds: 3000
Sleeping for 'a-step1-step2' miliseconds: 3000
Subscriber received a-step1-step2-step3
Sleeping for 'b' miliseconds: 0
Sleeping for 'b-step1' miliseconds: 0
Sleeping for 'b-step1-step2' miliseconds: 0
Subscriber received b-step1-step2-step3
Sleeping for 'c' miliseconds: 1000
Sleeping for 'c-step1' miliseconds: 3000
Sleeping for 'c-step1-step2' miliseconds: 3000
Subscriber received c-step1-step2-step3
可以看出它不是运行并发的。我在这里错过了什么?
这是预期的,Multi
将项目作为流处理。
如果您想进行并行操作(例如,启动 10 个 HTTP 请求),您应该合并 Uni
,请参阅 https://smallrye.io/smallrye-mutiny/guides/combining-items
如@jponge 所述,您可以在一些 List<Uni<String>>
中收集您的物品
然后调用
Uni.combine().all().unis(listOfUnis).onitem().subscribe().with()
List<Uni<String>> listOfUnis = new ArrayList<>();
Multi<String> source = Multi.createFrom().items("a", "b", "c");
source
.onItem().invoke(i -> listOfUnis.add(trans(i, "-step1")))
.onItem().invoke(i -> listOfUnis.add(trans(i, "-step2")))
.onItem().invoke(i -> listOfUnis.add(trans(i, "-step3")))
// do not subscribe on Multis here
这里还要注意一点 - 如果您要进行 HTTP 调用,最好添加
.emitOn(someBlockingPoolExecutor)
因为您不想阻塞等待 http 调用完成的 Netty 线程
假设我们有:
- 一个 URL 列表,这是我们 Multi 的来源
- 作为第一步,我们使用 HTTP 客户端调用 获取此页面的 HTML
- 然后我们尝试找到一些特定的标签并获取其内容
- 然后我们将找到的东西存储到数据库中
现在我们有 3 个步骤。有没有办法让这些步骤 运行 并行?我的意思是一段时间后它应该:抓取 HTML 并同时处理 html + 获取标签内容,同时将数据从已经处理过的项目保存到数据库中。(希望我的意思很明显)这我们可以进行并行处理的方式。默认情况下,我可以看到,mutiny 以串行方式执行。
这是一个例子:
@Test
public void test3() {
Multi<String> source = Multi.createFrom().items("a", "b", "c");
source
.onItem().transform(i -> trans(i, "-step1"))
.onItem().transform(i -> trans(i, "-step2"))
.onItem().transform(i -> trans(i, "-step3"))
.subscribe().with(item -> System.out.println("Subscriber received " + item));
}
private String trans(String s, String add) {
int t = new Random().nextInt(4) * 1000;
try {
print("Sleeping for '" + s + "' miliseconds: " + t);
Thread.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s + add;
}
现在报告以下控制台输出:
Sleeping for 'a' miliseconds: 2000
Sleeping for 'a-step1' miliseconds: 3000
Sleeping for 'a-step1-step2' miliseconds: 3000
Subscriber received a-step1-step2-step3
Sleeping for 'b' miliseconds: 0
Sleeping for 'b-step1' miliseconds: 0
Sleeping for 'b-step1-step2' miliseconds: 0
Subscriber received b-step1-step2-step3
Sleeping for 'c' miliseconds: 1000
Sleeping for 'c-step1' miliseconds: 3000
Sleeping for 'c-step1-step2' miliseconds: 3000
Subscriber received c-step1-step2-step3
可以看出它不是运行并发的。我在这里错过了什么?
这是预期的,Multi
将项目作为流处理。
如果您想进行并行操作(例如,启动 10 个 HTTP 请求),您应该合并 Uni
,请参阅 https://smallrye.io/smallrye-mutiny/guides/combining-items
如@jponge 所述,您可以在一些 List<Uni<String>>
中收集您的物品
然后调用
Uni.combine().all().unis(listOfUnis).onitem().subscribe().with()
List<Uni<String>> listOfUnis = new ArrayList<>();
Multi<String> source = Multi.createFrom().items("a", "b", "c");
source
.onItem().invoke(i -> listOfUnis.add(trans(i, "-step1")))
.onItem().invoke(i -> listOfUnis.add(trans(i, "-step2")))
.onItem().invoke(i -> listOfUnis.add(trans(i, "-step3")))
// do not subscribe on Multis here
这里还要注意一点 - 如果您要进行 HTTP 调用,最好添加
.emitOn(someBlockingPoolExecutor)
因为您不想阻塞等待 http 调用完成的 Netty 线程