请求业务逻辑后,Quarkus AMQP 将消息发送到队列
Quarkus AMQP send message to queue after request business logic
一旦收到 HTTP Get/Post 我必须坚持并反对,然后将消息发送到其他服务正在侦听的队列以开始执行其他复杂工作
我当前的问题是我不能只调用带有@Outgoing("channel") 注释的方法,我试过了,只是继续执行该方法而不调用
有没有办法使用 Quarkus 框架调用方法将 JSON 有效负载发送到队列?
PS: 我也在尝试使用 rabbitMQ 并切换回 ActiveMQ
我已经按照 Quarkus tuturial 进行了反应消息传递,并尝试在已实现的资源中注册一些东西,但没有成功
@Path("/part")
class PartService : PanacheRepository<PartDao>, Logging {
@GET
@Produces(MediaType.APPLICATION_JSON)
@Transactional
fun fetchParts(): List<PartDao> {
val partDao = PartDao(label = "Test", status = PartStatus.INBANK, creatorId = "ghost-007")
partDao.persist()
if (partDao.isPersistent) {
// Send a message to a queue -> PoC
send(partDao)
}
return findAll().list()
}
@Outgoing("part-persisted")
@Transactional
fun send(partDao: PartDao): CompletionStage<AmqpMessage<*>> {
val future = CompletableFuture<AmqpMessage<*>>()
val message = "hello from sender"
// Debug proposes
println("Sending (data): $message")
logger.debug(partDao.toString())
future.complete(AmqpMessage(message))
return future
}
}
预计:
执行后在队列中注册消息 "hello from sender":
curl http://localhost/part
实际结果:
发送方法一直在执行
如果我没理解错的话,你想调用一个方法将一些东西放入流中。
据我所知,您必须使用 Emitter
才能做到这一点,例如参见https://github.com/michalszynkiewicz/devoxxpl-demo/blob/master/search/src/main/java/com/example/search/SearchEndpoint.java#L23
请参阅 https://smallrye.io/smallrye-reactive-messaging/#_stream 文档。
一旦收到 HTTP Get/Post 我必须坚持并反对,然后将消息发送到其他服务正在侦听的队列以开始执行其他复杂工作
我当前的问题是我不能只调用带有@Outgoing("channel") 注释的方法,我试过了,只是继续执行该方法而不调用
有没有办法使用 Quarkus 框架调用方法将 JSON 有效负载发送到队列?
PS: 我也在尝试使用 rabbitMQ 并切换回 ActiveMQ
我已经按照 Quarkus tuturial 进行了反应消息传递,并尝试在已实现的资源中注册一些东西,但没有成功
@Path("/part")
class PartService : PanacheRepository<PartDao>, Logging {
@GET
@Produces(MediaType.APPLICATION_JSON)
@Transactional
fun fetchParts(): List<PartDao> {
val partDao = PartDao(label = "Test", status = PartStatus.INBANK, creatorId = "ghost-007")
partDao.persist()
if (partDao.isPersistent) {
// Send a message to a queue -> PoC
send(partDao)
}
return findAll().list()
}
@Outgoing("part-persisted")
@Transactional
fun send(partDao: PartDao): CompletionStage<AmqpMessage<*>> {
val future = CompletableFuture<AmqpMessage<*>>()
val message = "hello from sender"
// Debug proposes
println("Sending (data): $message")
logger.debug(partDao.toString())
future.complete(AmqpMessage(message))
return future
}
}
预计:
执行后在队列中注册消息 "hello from sender":
curl http://localhost/part
实际结果:
发送方法一直在执行
如果我没理解错的话,你想调用一个方法将一些东西放入流中。
据我所知,您必须使用 Emitter
才能做到这一点,例如参见https://github.com/michalszynkiewicz/devoxxpl-demo/blob/master/search/src/main/java/com/example/search/SearchEndpoint.java#L23
请参阅 https://smallrye.io/smallrye-reactive-messaging/#_stream 文档。