Quarkus Reactive AMQP 客户端以线程方式处理

Quarkus Reactive AMQP client processing in threaded manner

我正在尝试使用 Quarkus 的 AMQP (reactive-messaging-amqp) 扩展来将工作与原始 REST 请求分离。作为 REST 调用的想法将启动一个长时间的 运行ning 操作,稍后可能会返回以获取结果。

但是,在我的代码中,Quarkus 运行 的每个步骤似乎都在同一个线程中,在从原始 sendNewLRA() 调用返回之前完成工作。我会假设消息将通过 AMQP 发送,从而在消息发送后将流程解耦。为什么不是这样呢?我目前没有任何 AMQP/messaging 特定配置,只是让默认值 运行 来自它自己的 TestContainer(由 Quarkus 管理)

REST 处理程序:

    @Inject
    LRAMessenger messenger;
    
    @LRA(end = false)
    @GET
    @Path("start")
    @Produces(MediaType.TEXT_PLAIN)
    public Response hello(
        @HeaderParam(LRA_HTTP_CONTEXT_HEADER) URI lraId,
        @QueryParam("processTime") int processTime,
        @QueryParam("payload") String payload
    ) {
        log.info("Start. LRA ID: {}", lraId);
        
        StartMessage start = new StartMessage();
        start.setLraId(lraId);
        start.setProcessTime(processTime);
        start.setPayload(payload);
        
        this.messenger.sendNewLRA(start); // blocks here
        log.info("Sent lra processing message.");
        
        return Response.ok(lraId).build();
    }

消息代码:

@ApplicationScoped
@Slf4j
public class LRAMessenger {
    
    @Inject
    NarayanaLRAClient lraClient;
    
    @Inject
    @Channel("lra-out")
    Emitter<StartMessage> startEmitter;
    
    /**
     * Method to kick off backend processing.
     * @param startMessage The mesage to send
     */
    @Incoming("lra-start")
    public void sendNewLRA(StartMessage startMessage) {
        startEmitter.send(startMessage);
    }
    
    @Incoming("lra-out")
    public void processLRA(StartMessage startMessage) throws InterruptedException {
        log.info("Got lra message in process step: {}", startMessage);
        lraClient.setCurrentLRA(startMessage.getLraId());
        
        int waitTime = startMessage.getProcessTime() / 10;
        
        for (int percent = 10; percent <= 100; percent += 10) {
            log.info("Waiting to simulate processing...");
            Thread.sleep(waitTime);
            log.info("Done waiting ({}%)", percent);
        }
        log.info("Waiting to simulate processing completed.");
        lraClient.closeLRA(startMessage.getLraId());
        log.info("Closed LRA.");
    }
}

输出:

2022-02-21 11:48:28,723 INFO  [org.acm.cus.dem.end.LRAResourceTest] (main) testing LRA.
2022-02-21 11:48:29,192 INFO  [org.acm.cus.dem.end.LRAResource] (executor-thread-0) Start. LRA ID: http://localhost:49251/lra-coordinator/0_ffffac110006_b651_6213c25d_2
2022-02-21 11:48:29,200 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Got lra message in process step: StartMessage(processTime=10000, payload=null)
2022-02-21 11:48:29,200 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:30,201 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (10%)
2022-02-21 11:48:30,202 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:31,203 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (20%)
2022-02-21 11:48:31,204 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:32,205 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (30%)
2022-02-21 11:48:32,206 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:33,207 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (40%)
2022-02-21 11:48:33,207 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:34,208 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (50%)
2022-02-21 11:48:34,209 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:35,210 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (60%)
2022-02-21 11:48:35,211 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:36,211 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (70%)
2022-02-21 11:48:36,212 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:37,212 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (80%)
2022-02-21 11:48:37,213 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:38,214 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (90%)
2022-02-21 11:48:38,215 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing...
2022-02-21 11:48:39,216 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Done waiting (100%)
2022-02-21 11:48:39,216 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Waiting to simulate processing completed.
2022-02-21 11:48:39,235 INFO  [org.acm.cus.dem.mes.LRAMessenger] (executor-thread-0) Closed LRA.
2022-02-21 11:48:39,237 INFO  [org.acm.cus.dem.end.LRAResource] (executor-thread-0) Sent lra processing message.

注意:我希望 Sent lra processing message. 日志出现在进程的早期,可能在 Got lra message in process step 日志消息之前。

找到了答案,或者至少找到了解决方法...

@Blocking 添加到消息链的第二步似乎已将流程分离:

    @Incoming("lra-out")
    @Blocking
    public void processLRA(StartMessage startMessage) throws InterruptedException {
        log.info("Got lra message in process step: {}", startMessage);
        // ...