Azure ServiceBusSessionReceiverAsyncClient - Mono 而不是 Flux

Azure ServiceBusSessionReceiverAsyncClient - Mono instead of Flux

我有一个 Spring 启动应用程序,我从 Azure 服务总线队列会话中收到一条消息。

密码是:

@Autowired
ServiceBusSessionReceiverAsyncClient apiMessageQueueIntegrator;
.
.
.
Mono<ServiceBusReceiverAsyncClient> receiverMono = apiMessageQueueIntegrator.acceptSession(sessionid);
        Disposable subscription = Flux.usingWhen(receiverMono,
                receiver -> receiver.receiveMessages(),
                receiver -> Mono.fromRunnable(() -> receiver.close()))
                .subscribe(message -> {
                    // Process message.
                    logger.info(String.format("Message received from quque. Session id: %s. Contents: %s%n", message.getSessionId(),
                        message.getBody()));
                    receivedMessage.setReceivedMessage(message);
                    timeoutCheck.countDown();

                }, error -> {
                    logger.info("Queue error occurred: " + error);
                });

因为我只从会话中收到一条消息,所以我在收到消息后使用 CountDownLatch(1) 处理订阅。

图书馆的文档说,如果我只期待一条消息,可以使用 Mono.usingWhen 而不是 Flux.usingWhen,但我在任何地方都找不到这样的例子,我也没有能够弄清楚如何重写这段代码来做到这一点。

如果我改用 Mono.usingWhen,粘贴的代码会是什么样子?

谢谢conniey。将您的建议作为答案发布以帮助其他社区成员。

By default receiveMessages() is a Flux because we imagine the messages from a session to be "infinitely long". In your case, you only want the first message in the stream, so we use the next() operator.

The usage of the countdown latch is probably not the best approach. In the sample, we had one there so that the program didn't end before the messages were received. .subscribe is not a blocking call, it sets up the handlers and moves onto the next line of code.

Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptSession("greetings-id");
Mono<ServiceBusReceivedMessage> singleMessageMono = Mono.usingWhen(receiverMono,
    receiver -> {
        // Anything you wish to do with the receiver.
        // In this case we only want to take the first message, so we use the "next" operator. This returns a
        // Mono.
        return receiver.receiveMessages().next();
    },
    receiver -> Mono.fromRunnable(() -> receiver.close()));

try {
    // Turns this into a blocking call. .block() waits indefinitely, so we have a timeout.
    ServiceBusReceivedMessage message = singleMessageMono.block(Duration.ofSeconds(30));
    if (message != null) {
        // Process message.

    }
} catch (Exception error) {
    System.err.println("Error occurred: " + error);
}

可以参考GitHub问题:ServiceBusSessionReceiverAsyncClient - Mono instead of Flux