是否可以延迟从 quarkus 中的通道(amqp、kafka 等)消费?
is it possible to delay consuming from channels (amqp,kafka etc) in quarkus?
我的服务需要在开始使用 Incoming
频道之前设置一些数据和连接(例如第三方令牌、hazelcast 连接)。但我注意到 amqp
连接也会在启动时发生,并且一旦连接,它们就会在上述设置完成之前开始使用。那么是否可以延迟传入队列的消费?
我找到了一个(可能是脏的)解决方案来延迟特定时间(这里是 1 分钟)。但它需要在每个消费者中实施:
@ApplicationScoped
@ActivateRequestContext
public class MyConsumer {
private static LocalDateTime startupTime;
void onStart(@Observes StartupEvent startup) {
startupTime = LocalDateTime.now();
}
@Incoming("my-queue")
public CompletionStage<Message<MyClass>> process(final Message<MyClass> msg) {
long start = startupTime.plusMinutes(1).toEpochSecond(ZoneOffset.UTC);
long now = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
if (start > now) {
return Uni.createFrom().item(msg)
.onItem().delayIt()
.by(Duration.ofSeconds(start - now))
.invoke(getRunnable(msg)).subscribeAsCompletionStage();
}
return Uni.createFrom().item(msg).onItem().invoke(getRunnable(msg)).subscribeAsCompletionStage();
}
private Runnable getRunnable(Message<MyClass> msg) {
return () -> {
// Your Business Code...
msg.ack();
};
}
}
我使用了类似的解决方案,但实际上将所有内容都消耗到了队列中,
在我准备好消费后,我首先处理队列,然后通过布尔值实时激活消费。 (一种背压方法)但我对它也不是很满意。(在我的场景中,如果服务失败时记录丢失并不重要,所以我也直接提交消息。
@Incoming("my-authors")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void getAuthorChanges(Envelope envelope) {
// if cache ready // else store
if (!authorCacheReady && !kafkaEnabled) {
authorChanges.add(envelope);
return;
}
... proceed if active
}
函数处理队列:
public void enableConsumptionAfterCacheInit() {
authorCacheReady = true;
while (!authorChanges.isEmpty()) {
getAuthorChanges(authorChanges.poll());
}
kafkaEnabled = true;
LOG.info("Starting realtime kafka consumption for Authors..");
}
我用以下方法解决了我的特殊情况。
void onStart(@Observes StartupEvent ev) {
setupTokens();
}
文档是here
我的服务需要在开始使用 Incoming
频道之前设置一些数据和连接(例如第三方令牌、hazelcast 连接)。但我注意到 amqp
连接也会在启动时发生,并且一旦连接,它们就会在上述设置完成之前开始使用。那么是否可以延迟传入队列的消费?
我找到了一个(可能是脏的)解决方案来延迟特定时间(这里是 1 分钟)。但它需要在每个消费者中实施:
@ApplicationScoped
@ActivateRequestContext
public class MyConsumer {
private static LocalDateTime startupTime;
void onStart(@Observes StartupEvent startup) {
startupTime = LocalDateTime.now();
}
@Incoming("my-queue")
public CompletionStage<Message<MyClass>> process(final Message<MyClass> msg) {
long start = startupTime.plusMinutes(1).toEpochSecond(ZoneOffset.UTC);
long now = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
if (start > now) {
return Uni.createFrom().item(msg)
.onItem().delayIt()
.by(Duration.ofSeconds(start - now))
.invoke(getRunnable(msg)).subscribeAsCompletionStage();
}
return Uni.createFrom().item(msg).onItem().invoke(getRunnable(msg)).subscribeAsCompletionStage();
}
private Runnable getRunnable(Message<MyClass> msg) {
return () -> {
// Your Business Code...
msg.ack();
};
}
}
我使用了类似的解决方案,但实际上将所有内容都消耗到了队列中, 在我准备好消费后,我首先处理队列,然后通过布尔值实时激活消费。 (一种背压方法)但我对它也不是很满意。(在我的场景中,如果服务失败时记录丢失并不重要,所以我也直接提交消息。
@Incoming("my-authors")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void getAuthorChanges(Envelope envelope) {
// if cache ready // else store
if (!authorCacheReady && !kafkaEnabled) {
authorChanges.add(envelope);
return;
}
... proceed if active
}
函数处理队列:
public void enableConsumptionAfterCacheInit() {
authorCacheReady = true;
while (!authorChanges.isEmpty()) {
getAuthorChanges(authorChanges.poll());
}
kafkaEnabled = true;
LOG.info("Starting realtime kafka consumption for Authors..");
}
我用以下方法解决了我的特殊情况。
void onStart(@Observes StartupEvent ev) {
setupTokens();
}
文档是here