Quarkus Reactive AMQP 客户端以线程方式处理
Quarkus Reactive AMQP client processing in threaded manner
我正在尝试使用 Quarkus 的 AMQP (reactive-messaging-amqp
) 扩展来将工作与原始 REST 请求分离。作为 REST 调用的想法将启动一个长时间的 运行ning 操作,稍后可能会返回以获取结果。
但是,在我的代码中,Quarkus 运行 的每个步骤似乎都在同一个线程中,在从原始 sendNewLRA()
调用返回之前完成工作。我会假设消息将通过 AMQP 发送,从而在消息发送后将流程解耦。为什么不是这样呢?我目前没有任何 AMQP/messaging 特定配置,只是让默认值 运行 来自它自己的 TestContainer(由 Quarkus 管理)
REST 处理程序:
@Inject
LRAMessenger messenger;
@LRA(end = false)
@GET
@Path("start")
@Produces(MediaType.TEXT_PLAIN)
public Response hello(
@HeaderParam(LRA_HTTP_CONTEXT_HEADER) URI lraId,
@QueryParam("processTime") int processTime,
@QueryParam("payload") String payload
) {
log.info("Start. LRA ID: {}", lraId);
StartMessage start = new StartMessage();
start.setLraId(lraId);
start.setProcessTime(processTime);
start.setPayload(payload);
this.messenger.sendNewLRA(start); // blocks here
log.info("Sent lra processing message.");
return Response.ok(lraId).build();
}
消息代码:
@ApplicationScoped
@Slf4j
public class LRAMessenger {
@Inject
NarayanaLRAClient lraClient;
@Inject
@Channel("lra-out")
Emitter<StartMessage> startEmitter;
/**
* Method to kick off backend processing.
* @param startMessage The mesage to send
*/
@Incoming("lra-start")
public void sendNewLRA(StartMessage startMessage) {
startEmitter.send(startMessage);
}
@Incoming("lra-out")
public void processLRA(StartMessage startMessage) throws InterruptedException {
log.info("Got lra message in process step: {}", startMessage);
lraClient.setCurrentLRA(startMessage.getLraId());
int waitTime = startMessage.getProcessTime() / 10;
for (int percent = 10; percent <= 100; percent += 10) {
log.info("Waiting to simulate processing...");
Thread.sleep(waitTime);
log.info("Done waiting ({}%)", percent);
}
log.info("Waiting to simulate processing completed.");
lraClient.closeLRA(startMessage.getLraId());
log.info("Closed LRA.");
}
}
输出:
2022-02-21 11:48:28,723 INFO [org.acm.cus.dem.end.LRAResourceTest] (main) testing LRA.
2022-02-21 11:48:29,192 INFO [org.acm.cus.dem.end.LRAResource] (executor-thread-0) Start. LRA ID: http://localhost:49251/lra-coordinator/0_ffffac110006_b651_6213c25d_2
2022-02-21 11:48:29,200 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Got lra message in process step: StartMessage(processTime=10000, payload=null)
2022-02-21 11:48:29,200 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:30,201 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (10%)
2022-02-21 11:48:30,202 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:31,203 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (20%)
2022-02-21 11:48:31,204 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:32,205 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (30%)
2022-02-21 11:48:32,206 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:33,207 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (40%)
2022-02-21 11:48:33,207 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:34,208 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (50%)
2022-02-21 11:48:34,209 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:35,210 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (60%)
2022-02-21 11:48:35,211 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:36,211 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (70%)
2022-02-21 11:48:36,212 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:37,212 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (80%)
2022-02-21 11:48:37,213 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:38,214 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (90%)
2022-02-21 11:48:38,215 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:39,216 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (100%)
2022-02-21 11:48:39,216 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing completed.
2022-02-21 11:48:39,235 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Closed LRA.
2022-02-21 11:48:39,237 INFO [org.acm.cus.dem.end.LRAResource] (executor-thread-0) Sent lra processing message.
注意:我希望 Sent lra processing message.
日志出现在进程的早期,可能在 Got lra message in process step
日志消息之前。
找到了答案,或者至少找到了解决方法...
将 @Blocking
添加到消息链的第二步似乎已将流程分离:
@Incoming("lra-out")
@Blocking
public void processLRA(StartMessage startMessage) throws InterruptedException {
log.info("Got lra message in process step: {}", startMessage);
// ...
我正在尝试使用 Quarkus 的 AMQP (reactive-messaging-amqp
) 扩展来将工作与原始 REST 请求分离。作为 REST 调用的想法将启动一个长时间的 运行ning 操作,稍后可能会返回以获取结果。
但是,在我的代码中,Quarkus 运行 的每个步骤似乎都在同一个线程中,在从原始 sendNewLRA()
调用返回之前完成工作。我会假设消息将通过 AMQP 发送,从而在消息发送后将流程解耦。为什么不是这样呢?我目前没有任何 AMQP/messaging 特定配置,只是让默认值 运行 来自它自己的 TestContainer(由 Quarkus 管理)
REST 处理程序:
@Inject
LRAMessenger messenger;
@LRA(end = false)
@GET
@Path("start")
@Produces(MediaType.TEXT_PLAIN)
public Response hello(
@HeaderParam(LRA_HTTP_CONTEXT_HEADER) URI lraId,
@QueryParam("processTime") int processTime,
@QueryParam("payload") String payload
) {
log.info("Start. LRA ID: {}", lraId);
StartMessage start = new StartMessage();
start.setLraId(lraId);
start.setProcessTime(processTime);
start.setPayload(payload);
this.messenger.sendNewLRA(start); // blocks here
log.info("Sent lra processing message.");
return Response.ok(lraId).build();
}
消息代码:
@ApplicationScoped
@Slf4j
public class LRAMessenger {
@Inject
NarayanaLRAClient lraClient;
@Inject
@Channel("lra-out")
Emitter<StartMessage> startEmitter;
/**
* Method to kick off backend processing.
* @param startMessage The mesage to send
*/
@Incoming("lra-start")
public void sendNewLRA(StartMessage startMessage) {
startEmitter.send(startMessage);
}
@Incoming("lra-out")
public void processLRA(StartMessage startMessage) throws InterruptedException {
log.info("Got lra message in process step: {}", startMessage);
lraClient.setCurrentLRA(startMessage.getLraId());
int waitTime = startMessage.getProcessTime() / 10;
for (int percent = 10; percent <= 100; percent += 10) {
log.info("Waiting to simulate processing...");
Thread.sleep(waitTime);
log.info("Done waiting ({}%)", percent);
}
log.info("Waiting to simulate processing completed.");
lraClient.closeLRA(startMessage.getLraId());
log.info("Closed LRA.");
}
}
输出:
2022-02-21 11:48:28,723 INFO [org.acm.cus.dem.end.LRAResourceTest] (main) testing LRA.
2022-02-21 11:48:29,192 INFO [org.acm.cus.dem.end.LRAResource] (executor-thread-0) Start. LRA ID: http://localhost:49251/lra-coordinator/0_ffffac110006_b651_6213c25d_2
2022-02-21 11:48:29,200 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Got lra message in process step: StartMessage(processTime=10000, payload=null)
2022-02-21 11:48:29,200 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:30,201 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (10%)
2022-02-21 11:48:30,202 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:31,203 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (20%)
2022-02-21 11:48:31,204 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:32,205 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (30%)
2022-02-21 11:48:32,206 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:33,207 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (40%)
2022-02-21 11:48:33,207 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:34,208 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (50%)
2022-02-21 11:48:34,209 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:35,210 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (60%)
2022-02-21 11:48:35,211 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:36,211 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (70%)
2022-02-21 11:48:36,212 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:37,212 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (80%)
2022-02-21 11:48:37,213 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:38,214 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (90%)
2022-02-21 11:48:38,215 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:39,216 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (100%)
2022-02-21 11:48:39,216 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing completed.
2022-02-21 11:48:39,235 INFO [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Closed LRA.
2022-02-21 11:48:39,237 INFO [org.acm.cus.dem.end.LRAResource] (executor-thread-0) Sent lra processing message.
注意:我希望 Sent lra processing message.
日志出现在进程的早期,可能在 Got lra message in process step
日志消息之前。
找到了答案,或者至少找到了解决方法...
将 @Blocking
添加到消息链的第二步似乎已将流程分离:
@Incoming("lra-out")
@Blocking
public void processLRA(StartMessage startMessage) throws InterruptedException {
log.info("Got lra message in process step: {}", startMessage);
// ...