Quarkus SQS 消费者
Quarkus SQS consumer
我正在检查 this guide 使用 Quarkus 从 SQS 消费。
问题是我想在无限循环中执行此操作,例如每 10 秒获取一次新消息,然后使用 Hibernate Reactive 从消息中插入一些数据到数据库中。
我创建了一个 Quarkus Scheduler,但由于它不支持返回 Uni,我不得不阻止来自 Hibernate reactive 的响应,并收到此错误
2022-02-16 15:01:24,058 ERROR [de.sup.tea.con.SqsConsumer] (vert.x-eventloop-thread-9) Finished with error!: io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
[Exception 0] io.vertx.core.impl.NoStackTraceThrowable: Timeout
[Exception 1] java.lang.IllegalStateException: HR000061: Session is currently connecting to database
使用 Quarkus 和反应式实现我需要的最佳方法是什么?
代码将有助于理解你在做什么。根据您在问题中获得的信息,我建议您使用如下代码创建 Uni:
Uni.createFrom().item(returnDataFromDb());
由于 Quarkus Scheduler 不在 I/O 线程中,因此无法使用 hibernate reactive。因此,要使其工作,您可以与 EventBus 一起工作。下面是一个功能齐全的示例。 processReceivedMessageResponse 方法中的代码在 I/O 线程中运行,并且可以依赖于 Hibernate Reactive。
import io.quarkus.scheduler.Scheduled
import io.quarkus.vertx.ConsumeEvent
import io.smallrye.mutiny.Uni
import io.vertx.mutiny.core.eventbus.EventBus
import org.eclipse.microprofile.config.inject.ConfigProperty
import org.jboss.logging.Logger
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.Message
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse
import java.util.concurrent.CompletionStage
import javax.enterprise.context.ApplicationScoped
import javax.enterprise.inject.Instance
@ApplicationScoped
class SqsConsumer(
private val eventBus: EventBus,
private val logger: Logger,
@ConfigProperty(name = "sqs.consumer.maxFetchedMessages")
private val maxFetchedEvents: Int,
private val handlers: Instance<MessageHandler>,
private val sqsClient: SqsAsyncClient,
) {
@Scheduled(every = "{sqs.consumer.interval}")
fun execute() {
handlers.stream().forEach { handler ->
val handlerName = handler.javaClass.name
logger.info("Fetching messages for $handlerName...")
Uni
.createFrom()
.completionStage(fetchMessages(handler.queueUrl()))
.subscribe()
.with(
{ response ->
val newEventsCount = response.messages().size
if (newEventsCount > 0) {
logger.info("$newEventsCount message(s) fetched for $handlerName.")
eventBus.send("receive-message-responses", ResponseHolder(handler, response))
} else {
logger.info("Queue was empty. Maybe next time.")
}
},
{ logger.error("Error fetching messages!", it) }
)
}
}
@ConsumeEvent("receive-message-responses")
fun processReceivedMessageResponse(holder: ResponseHolder): Uni<Void> {
val handler = holder.handler
val handlerName = handler.javaClass.name
val messageResponse = holder.receiveMessageResponse
logger.info("Processing messages for $handlerName...")
return Uni
.createFrom()
.item(holder)
.flatMap { handler.process(messageResponse.messages().map { message -> message.body() }) }
.onItem()
.invoke { _ ->
logger.info("Processing succeeded. Deleting processed events from the queue...")
messageResponse
.messages()
.forEach { eventBus.send("processed-messages", MessageHolder(handler, it)) }
}
.replaceWithVoid()
.onFailure()
.invoke { it -> logger.error("Error processing messages!", it) }
}
@ConsumeEvent("processed-messages")
fun deleteProcessedMessages(holder: MessageHolder): Uni<Void> {
val handler = holder.handler
val message = holder.message
return Uni
.createFrom()
.completionStage(
sqsClient.deleteMessage {
it
.queueUrl(handler.queueUrl())
.receiptHandle(message.receiptHandle())
}
)
.onItem()
.invoke { _ -> logger.info("Message ${message.messageId()} deleted from the queue!") }
.onFailure()
.invoke { it -> logger.error("Could not delete message ${message.messageId()} from the queue!", it) }
.replaceWithVoid()
}
private fun fetchMessages(queueUrl: String): CompletionStage<ReceiveMessageResponse> {
return sqsClient
.receiveMessage {
it
.maxNumberOfMessages(maxFetchedEvents)
.queueUrl(queueUrl)
}
}
}
class ResponseHolder(
val handler: MessageHandler,
val receiveMessageResponse: ReceiveMessageResponse,
)
class MessageHolder(
val handler: MessageHandler,
val message: Message,
)
我正在检查 this guide 使用 Quarkus 从 SQS 消费。
问题是我想在无限循环中执行此操作,例如每 10 秒获取一次新消息,然后使用 Hibernate Reactive 从消息中插入一些数据到数据库中。
我创建了一个 Quarkus Scheduler,但由于它不支持返回 Uni,我不得不阻止来自 Hibernate reactive 的响应,并收到此错误
2022-02-16 15:01:24,058 ERROR [de.sup.tea.con.SqsConsumer] (vert.x-eventloop-thread-9) Finished with error!: io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
[Exception 0] io.vertx.core.impl.NoStackTraceThrowable: Timeout
[Exception 1] java.lang.IllegalStateException: HR000061: Session is currently connecting to database
使用 Quarkus 和反应式实现我需要的最佳方法是什么?
代码将有助于理解你在做什么。根据您在问题中获得的信息,我建议您使用如下代码创建 Uni:
Uni.createFrom().item(returnDataFromDb());
由于 Quarkus Scheduler 不在 I/O 线程中,因此无法使用 hibernate reactive。因此,要使其工作,您可以与 EventBus 一起工作。下面是一个功能齐全的示例。 processReceivedMessageResponse 方法中的代码在 I/O 线程中运行,并且可以依赖于 Hibernate Reactive。
import io.quarkus.scheduler.Scheduled
import io.quarkus.vertx.ConsumeEvent
import io.smallrye.mutiny.Uni
import io.vertx.mutiny.core.eventbus.EventBus
import org.eclipse.microprofile.config.inject.ConfigProperty
import org.jboss.logging.Logger
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.Message
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse
import java.util.concurrent.CompletionStage
import javax.enterprise.context.ApplicationScoped
import javax.enterprise.inject.Instance
@ApplicationScoped
class SqsConsumer(
private val eventBus: EventBus,
private val logger: Logger,
@ConfigProperty(name = "sqs.consumer.maxFetchedMessages")
private val maxFetchedEvents: Int,
private val handlers: Instance<MessageHandler>,
private val sqsClient: SqsAsyncClient,
) {
@Scheduled(every = "{sqs.consumer.interval}")
fun execute() {
handlers.stream().forEach { handler ->
val handlerName = handler.javaClass.name
logger.info("Fetching messages for $handlerName...")
Uni
.createFrom()
.completionStage(fetchMessages(handler.queueUrl()))
.subscribe()
.with(
{ response ->
val newEventsCount = response.messages().size
if (newEventsCount > 0) {
logger.info("$newEventsCount message(s) fetched for $handlerName.")
eventBus.send("receive-message-responses", ResponseHolder(handler, response))
} else {
logger.info("Queue was empty. Maybe next time.")
}
},
{ logger.error("Error fetching messages!", it) }
)
}
}
@ConsumeEvent("receive-message-responses")
fun processReceivedMessageResponse(holder: ResponseHolder): Uni<Void> {
val handler = holder.handler
val handlerName = handler.javaClass.name
val messageResponse = holder.receiveMessageResponse
logger.info("Processing messages for $handlerName...")
return Uni
.createFrom()
.item(holder)
.flatMap { handler.process(messageResponse.messages().map { message -> message.body() }) }
.onItem()
.invoke { _ ->
logger.info("Processing succeeded. Deleting processed events from the queue...")
messageResponse
.messages()
.forEach { eventBus.send("processed-messages", MessageHolder(handler, it)) }
}
.replaceWithVoid()
.onFailure()
.invoke { it -> logger.error("Error processing messages!", it) }
}
@ConsumeEvent("processed-messages")
fun deleteProcessedMessages(holder: MessageHolder): Uni<Void> {
val handler = holder.handler
val message = holder.message
return Uni
.createFrom()
.completionStage(
sqsClient.deleteMessage {
it
.queueUrl(handler.queueUrl())
.receiptHandle(message.receiptHandle())
}
)
.onItem()
.invoke { _ -> logger.info("Message ${message.messageId()} deleted from the queue!") }
.onFailure()
.invoke { it -> logger.error("Could not delete message ${message.messageId()} from the queue!", it) }
.replaceWithVoid()
}
private fun fetchMessages(queueUrl: String): CompletionStage<ReceiveMessageResponse> {
return sqsClient
.receiveMessage {
it
.maxNumberOfMessages(maxFetchedEvents)
.queueUrl(queueUrl)
}
}
}
class ResponseHolder(
val handler: MessageHandler,
val receiveMessageResponse: ReceiveMessageResponse,
)
class MessageHolder(
val handler: MessageHandler,
val message: Message,
)