是否可以延迟从 quarkus 中的通道(amqp、kafka 等)消费?

is it possible to delay consuming from channels (amqp,kafka etc) in quarkus?

我的服务需要在开始使用 Incoming 频道之前设置一些数据和连接(例如第三方令牌、hazelcast 连接)。但我注意到 amqp 连接也会在启动时发生,并且一旦连接,它们就会在上述设置完成之前开始使用。那么是否可以延迟传入队列的消费?

我找到了一个(可能是脏的)解决方案来延迟特定时间(这里是 1 分钟)。但它需要在每个消费者中实施:

@ApplicationScoped
@ActivateRequestContext
public class MyConsumer {

private static LocalDateTime startupTime;

void onStart(@Observes StartupEvent startup) {
    startupTime = LocalDateTime.now();
}

@Incoming("my-queue")
public CompletionStage<Message<MyClass>> process(final Message<MyClass> msg) {
    long start = startupTime.plusMinutes(1).toEpochSecond(ZoneOffset.UTC);
    long now = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
    if (start > now) {
        return Uni.createFrom().item(msg)
                .onItem().delayIt()
                .by(Duration.ofSeconds(start - now))
                .invoke(getRunnable(msg)).subscribeAsCompletionStage();
    }
    return Uni.createFrom().item(msg).onItem().invoke(getRunnable(msg)).subscribeAsCompletionStage();
}

private Runnable getRunnable(Message<MyClass> msg) {
    return () -> {
        // Your Business Code...
        msg.ack();
    };
}

}

我使用了类似的解决方案,但实际上将所有内容都消耗到了队列中, 在我准备好消费后,我首先处理队列,然后通过布尔值实时激活消费。 (一种背压方法)但我对它也不是很满意。(在我的场景中,如果服务失败时记录丢失并不重要,所以我也直接提交消息。

@Incoming("my-authors")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
public void getAuthorChanges(Envelope envelope) {
     // if cache ready // else store
            if (!authorCacheReady && !kafkaEnabled) {
                authorChanges.add(envelope);
                return;
            }
... proceed if active
}

函数处理队列:

 public void enableConsumptionAfterCacheInit() {
            authorCacheReady = true;
            while (!authorChanges.isEmpty()) {
                getAuthorChanges(authorChanges.poll());
            }
            kafkaEnabled = true;
            LOG.info("Starting realtime kafka consumption for Authors..");
    
        }

我用以下方法解决了我的特殊情况。

void onStart(@Observes StartupEvent ev) {
    setupTokens();
}

文档是here