如何取消注册两个顶点消费者和 returns 一个 rxjava 可完成?

How to unregister two vertx consumers and returns an rxjava completable?

我需要一些 Rxjava 方面的帮助。目前我有两个哈希映射。每个哈希映射都包含针对订阅密钥的顶点消息消费者。只有当我能够取消注册两个顶点消息消费者时,我才想 return 一个可完成的对象。我怎样才能实现它。

我可以post我正在处理的代码。

@Override public 可完成的 deregisterKeyEvents(String subscriptionId) {

MessageConsumer<JsonObject> messageConsumer = consumerMap.get(subscriptionId);
MessageConsumer<JsonObject> subscriptionConsumer = subscriptionConsumerMap.get(subscriptionId);


if( subscriptionConsumer != null) {
    subscriptionConsumerMap.remove(subscriptionId);
    subscriptionConsumer.unregister( res-> {
        
        if(res.succeeded()) {
            LOGGER.debug("Subscription channel consumer deregistered successfully!");
        } else {
            LOGGER.error("Unable to de-register Subscription channel consumer");
        }
        
    });
}

if (messageConsumer != null) {
    
    consumerMap.remove(subscriptionId);
    
    return Completable.create(emitter -> {
        
        messageConsumer.unregister(res -> {
            if (res.succeeded()) {
                emitter.onComplete();
            } else {
                emitter.onError(res.cause());
            }
        });
    });
} else {
    LOGGER.warn("There was no consumer registered!");
    return Completable.create(emitter -> emitter.onError(new KvNoSuchElementException("Subscription '" + subscriptionId + "' not found")));
}

}

我想这样改写上面的代码

subscriptionConsumer.unregister() & messageConsumer.unregister() 成功然后 return 一个可完成的

MessageConsumer class 来自 vert.x 库 io.vertx.core.eventbus.MessageConsumer。 如果你能提供帮助,我将不胜感激 谢谢

如果您愿意添加Vert.x RxJava2 to your dependencies, you could do this with toCompletable:

@Override
public Completable deregisterKeyEvents(String subscriptionId) {

    MessageConsumer<JsonObject> messageConsumer = consumerMap.get(subscriptionId);
    MessageConsumer<JsonObject> subscriptionConsumer = subscriptionConsumerMap.get(subscriptionId);
    
    Completable c1;
    if( subscriptionConsumer != null) {
        subscriptionConsumerMap.remove(subscriptionId);
        c1 = CompletableHelper.toCompletable(handler -> subscriptionConsumer.unregister(handler))
            .doOnSuccess(() -> LOGGER.debug("Subscription channel consumer deregistered successfully!"))
            .doOnError(t-> LOGGER.error("Unable to de-register Subscription channel consumer", t));
    } else {
        c1 = Completable.complete();
    }
    
    Completable c2;
    if (messageConsumer != null) {
        consumerMap.remove(subscriptionId);
        c2 = CompletableHelper.toCompletable(handler -> messageConsumer.unregister(handler));
    } else {
        LOGGER.warn("There was no consumer registered!");
        c2 = Completable.error(new KvNoSuchElementException("Subscription '" + subscriptionId + "' not found"));

    }

    return c1.concatWith(c2);
}

请注意,这与您的原始代码有点不同,因为:

  • messageConsumer 注销仅在 subscriptionConsumer
  • 注销后发生
  • 仅当 subscriptionConsumer 的注销成功时才会发生 messageConsumer 注销。

如果这不是您想要的行为,您可以使用 Completable 的不同方法。