将 Uni 事件带回调用者线程
Bring Uni event back to the caller thread
在 Quarkus 应用程序中订阅:
Uni.createFrom.Item(1)
.chain { it -> processA(it) }
.emitOn(Infrastructure.getDefaultWorkerPool())
.chain { it -> processB(it) }
.chain { it -> processC(it) }
.subscribe().with{ it -> processD(it) }
如果我没理解错的话,processA
会在调用者线程上执行(所以如果是在Verticle中,应该是在IO线程上),processB
和processC
将在工作线程上执行,processD
将再次在 caller/IO 线程上执行。
如何让 processC
在 IO 线程上被调用,而 processB
仍在工作线程上?有没有简单的方法将事件带回调用线程?
编辑:现在我正在使用以下解决方法:
Uni.createFrom.Item(1)
.chain { it -> processA(it) }
.chain { i -> Uni.createFrom().future { Infrastructure.getDefaultWorkerPool().submit{ processB(i) } } }
.chain { it -> processC(it) }
.subscribe().with{ it -> processD(it) }
您需要捕获 Vert.x 上下文并切换回它。
这样的事情会起作用(使用 Java,因为我的 Kotlin 不是很好):
Context context = Vertx.currentContext();
Uni.createFrom().item(1)
.chain(it -> processA(it))
.emitOn(Infrastructure.getDefaultWorkerPool())
.chain(it -> processB(it))
.emitOn(runnable -> context.runOnContext(ignored -> runnable.run())
.chain(it -> process(it))
.subscribe().with(it -> processD(it));
在 Quarkus 应用程序中订阅:
Uni.createFrom.Item(1)
.chain { it -> processA(it) }
.emitOn(Infrastructure.getDefaultWorkerPool())
.chain { it -> processB(it) }
.chain { it -> processC(it) }
.subscribe().with{ it -> processD(it) }
如果我没理解错的话,processA
会在调用者线程上执行(所以如果是在Verticle中,应该是在IO线程上),processB
和processC
将在工作线程上执行,processD
将再次在 caller/IO 线程上执行。
如何让 processC
在 IO 线程上被调用,而 processB
仍在工作线程上?有没有简单的方法将事件带回调用线程?
编辑:现在我正在使用以下解决方法:
Uni.createFrom.Item(1)
.chain { it -> processA(it) }
.chain { i -> Uni.createFrom().future { Infrastructure.getDefaultWorkerPool().submit{ processB(i) } } }
.chain { it -> processC(it) }
.subscribe().with{ it -> processD(it) }
您需要捕获 Vert.x 上下文并切换回它。 这样的事情会起作用(使用 Java,因为我的 Kotlin 不是很好):
Context context = Vertx.currentContext();
Uni.createFrom().item(1)
.chain(it -> processA(it))
.emitOn(Infrastructure.getDefaultWorkerPool())
.chain(it -> processB(it))
.emitOn(runnable -> context.runOnContext(ignored -> runnable.run())
.chain(it -> process(it))
.subscribe().with(it -> processD(it));