Quarkus SQS 消费者

Quarkus SQS consumer

我正在检查 this guide 使用 Quarkus 从 SQS 消费。

问题是我想在无限循环中执行此操作,例如每 10 秒获取一次新消息,然后使用 Hibernate Reactive 从消息中插入一些数据到数据库中。

我创建了一个 Quarkus Scheduler,但由于它不支持返回 Uni,我不得不阻止来自 Hibernate reactive 的响应,并收到此错误

2022-02-16 15:01:24,058 ERROR [de.sup.tea.con.SqsConsumer] (vert.x-eventloop-thread-9) Finished with error!: io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
    [Exception 0] io.vertx.core.impl.NoStackTraceThrowable: Timeout
    [Exception 1] java.lang.IllegalStateException: HR000061: Session is currently connecting to database

使用 Quarkus 和反应式实现我需要的最佳方法是什么?

代码将有助于理解你在做什么。根据您在问题中获得的信息,我建议您使用如下代码创建 Uni:

Uni.createFrom().item(returnDataFromDb());

由于 Quarkus Scheduler 不在 I/O 线程中,因此无法使用 hibernate reactive。因此,要使其工作,您可以与 EventBus 一起工作。下面是一个功能齐全的示例。 processReceivedMessageResponse 方法中的代码在 I/O 线程中运行,并且可以依赖于 Hibernate Reactive。

import io.quarkus.scheduler.Scheduled
import io.quarkus.vertx.ConsumeEvent
import io.smallrye.mutiny.Uni
import io.vertx.mutiny.core.eventbus.EventBus
import org.eclipse.microprofile.config.inject.ConfigProperty
import org.jboss.logging.Logger
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.Message
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse
import java.util.concurrent.CompletionStage
import javax.enterprise.context.ApplicationScoped
import javax.enterprise.inject.Instance

@ApplicationScoped
class SqsConsumer(
    private val eventBus: EventBus,
    private val logger: Logger,
    @ConfigProperty(name = "sqs.consumer.maxFetchedMessages")
    private val maxFetchedEvents: Int,
    private val handlers: Instance<MessageHandler>,
    private val sqsClient: SqsAsyncClient,
) {

    @Scheduled(every = "{sqs.consumer.interval}")
    fun execute() {
        handlers.stream().forEach { handler ->
            val handlerName = handler.javaClass.name
            logger.info("Fetching messages for $handlerName...")
            Uni
                .createFrom()
                .completionStage(fetchMessages(handler.queueUrl()))
                .subscribe()
                .with(
                    { response ->
                        val newEventsCount = response.messages().size
                        if (newEventsCount > 0) {
                            logger.info("$newEventsCount message(s) fetched for $handlerName.")
                            eventBus.send("receive-message-responses", ResponseHolder(handler, response))
                        } else {
                            logger.info("Queue was empty. Maybe next time.")
                        }
                    },
                    { logger.error("Error fetching messages!", it) }
                )
        }
    }

    @ConsumeEvent("receive-message-responses")
    fun processReceivedMessageResponse(holder: ResponseHolder): Uni<Void> {
        val handler = holder.handler
        val handlerName = handler.javaClass.name
        val messageResponse = holder.receiveMessageResponse
        logger.info("Processing messages for $handlerName...")
        return Uni
            .createFrom()
            .item(holder)
            .flatMap { handler.process(messageResponse.messages().map { message -> message.body() }) }
            .onItem()
            .invoke { _ ->
                logger.info("Processing succeeded. Deleting processed events from the queue...")
                messageResponse
                    .messages()
                    .forEach { eventBus.send("processed-messages", MessageHolder(handler, it)) }
            }
            .replaceWithVoid()
            .onFailure()
            .invoke { it -> logger.error("Error processing messages!", it) }
    }

    @ConsumeEvent("processed-messages")
    fun deleteProcessedMessages(holder: MessageHolder): Uni<Void> {
        val handler = holder.handler
        val message = holder.message
        return Uni
            .createFrom()
            .completionStage(
                sqsClient.deleteMessage {
                    it
                        .queueUrl(handler.queueUrl())
                        .receiptHandle(message.receiptHandle())
                }
            )
            .onItem()
            .invoke { _ -> logger.info("Message ${message.messageId()} deleted from the queue!") }
            .onFailure()
            .invoke { it -> logger.error("Could not delete message ${message.messageId()} from the queue!", it) }
            .replaceWithVoid()
    }

    private fun fetchMessages(queueUrl: String): CompletionStage<ReceiveMessageResponse> {
        return sqsClient
            .receiveMessage {
                it
                    .maxNumberOfMessages(maxFetchedEvents)
                    .queueUrl(queueUrl)
            }
    }
}

class ResponseHolder(
    val handler: MessageHandler,
    val receiveMessageResponse: ReceiveMessageResponse,
)

class MessageHolder(
    val handler: MessageHandler,
    val message: Message,
)