如何取消注册两个顶点消费者和 returns 一个 rxjava 可完成?
How to unregister two vertx consumers and returns an rxjava completable?
我需要一些 Rxjava 方面的帮助。目前我有两个哈希映射。每个哈希映射都包含针对订阅密钥的顶点消息消费者。只有当我能够取消注册两个顶点消息消费者时,我才想 return 一个可完成的对象。我怎样才能实现它。
我可以post我正在处理的代码。
@Override
public 可完成的 deregisterKeyEvents(String subscriptionId) {
MessageConsumer<JsonObject> messageConsumer = consumerMap.get(subscriptionId);
MessageConsumer<JsonObject> subscriptionConsumer = subscriptionConsumerMap.get(subscriptionId);
if( subscriptionConsumer != null) {
subscriptionConsumerMap.remove(subscriptionId);
subscriptionConsumer.unregister( res-> {
if(res.succeeded()) {
LOGGER.debug("Subscription channel consumer deregistered successfully!");
} else {
LOGGER.error("Unable to de-register Subscription channel consumer");
}
});
}
if (messageConsumer != null) {
consumerMap.remove(subscriptionId);
return Completable.create(emitter -> {
messageConsumer.unregister(res -> {
if (res.succeeded()) {
emitter.onComplete();
} else {
emitter.onError(res.cause());
}
});
});
} else {
LOGGER.warn("There was no consumer registered!");
return Completable.create(emitter -> emitter.onError(new KvNoSuchElementException("Subscription '" + subscriptionId + "' not found")));
}
}
我想这样改写上面的代码
subscriptionConsumer.unregister() & messageConsumer.unregister() 成功然后 return 一个可完成的
MessageConsumer class 来自 vert.x 库 io.vertx.core.eventbus.MessageConsumer。
如果你能提供帮助,我将不胜感激
谢谢
如果您愿意添加Vert.x RxJava2 to your dependencies, you could do this with toCompletable
:
@Override
public Completable deregisterKeyEvents(String subscriptionId) {
MessageConsumer<JsonObject> messageConsumer = consumerMap.get(subscriptionId);
MessageConsumer<JsonObject> subscriptionConsumer = subscriptionConsumerMap.get(subscriptionId);
Completable c1;
if( subscriptionConsumer != null) {
subscriptionConsumerMap.remove(subscriptionId);
c1 = CompletableHelper.toCompletable(handler -> subscriptionConsumer.unregister(handler))
.doOnSuccess(() -> LOGGER.debug("Subscription channel consumer deregistered successfully!"))
.doOnError(t-> LOGGER.error("Unable to de-register Subscription channel consumer", t));
} else {
c1 = Completable.complete();
}
Completable c2;
if (messageConsumer != null) {
consumerMap.remove(subscriptionId);
c2 = CompletableHelper.toCompletable(handler -> messageConsumer.unregister(handler));
} else {
LOGGER.warn("There was no consumer registered!");
c2 = Completable.error(new KvNoSuchElementException("Subscription '" + subscriptionId + "' not found"));
}
return c1.concatWith(c2);
}
请注意,这与您的原始代码有点不同,因为:
messageConsumer
注销仅在 subscriptionConsumer
、 注销后发生
- 仅当
subscriptionConsumer
的注销成功时才会发生 messageConsumer
注销。
如果这不是您想要的行为,您可以使用 Completable
的不同方法。
我需要一些 Rxjava 方面的帮助。目前我有两个哈希映射。每个哈希映射都包含针对订阅密钥的顶点消息消费者。只有当我能够取消注册两个顶点消息消费者时,我才想 return 一个可完成的对象。我怎样才能实现它。
我可以post我正在处理的代码。
@Override public 可完成的 deregisterKeyEvents(String subscriptionId) {
MessageConsumer<JsonObject> messageConsumer = consumerMap.get(subscriptionId);
MessageConsumer<JsonObject> subscriptionConsumer = subscriptionConsumerMap.get(subscriptionId);
if( subscriptionConsumer != null) {
subscriptionConsumerMap.remove(subscriptionId);
subscriptionConsumer.unregister( res-> {
if(res.succeeded()) {
LOGGER.debug("Subscription channel consumer deregistered successfully!");
} else {
LOGGER.error("Unable to de-register Subscription channel consumer");
}
});
}
if (messageConsumer != null) {
consumerMap.remove(subscriptionId);
return Completable.create(emitter -> {
messageConsumer.unregister(res -> {
if (res.succeeded()) {
emitter.onComplete();
} else {
emitter.onError(res.cause());
}
});
});
} else {
LOGGER.warn("There was no consumer registered!");
return Completable.create(emitter -> emitter.onError(new KvNoSuchElementException("Subscription '" + subscriptionId + "' not found")));
}
}
我想这样改写上面的代码
subscriptionConsumer.unregister() & messageConsumer.unregister() 成功然后 return 一个可完成的
MessageConsumer class 来自 vert.x 库 io.vertx.core.eventbus.MessageConsumer。 如果你能提供帮助,我将不胜感激 谢谢
如果您愿意添加Vert.x RxJava2 to your dependencies, you could do this with toCompletable
:
@Override
public Completable deregisterKeyEvents(String subscriptionId) {
MessageConsumer<JsonObject> messageConsumer = consumerMap.get(subscriptionId);
MessageConsumer<JsonObject> subscriptionConsumer = subscriptionConsumerMap.get(subscriptionId);
Completable c1;
if( subscriptionConsumer != null) {
subscriptionConsumerMap.remove(subscriptionId);
c1 = CompletableHelper.toCompletable(handler -> subscriptionConsumer.unregister(handler))
.doOnSuccess(() -> LOGGER.debug("Subscription channel consumer deregistered successfully!"))
.doOnError(t-> LOGGER.error("Unable to de-register Subscription channel consumer", t));
} else {
c1 = Completable.complete();
}
Completable c2;
if (messageConsumer != null) {
consumerMap.remove(subscriptionId);
c2 = CompletableHelper.toCompletable(handler -> messageConsumer.unregister(handler));
} else {
LOGGER.warn("There was no consumer registered!");
c2 = Completable.error(new KvNoSuchElementException("Subscription '" + subscriptionId + "' not found"));
}
return c1.concatWith(c2);
}
请注意,这与您的原始代码有点不同,因为:
messageConsumer
注销仅在subscriptionConsumer
、 注销后发生
- 仅当
subscriptionConsumer
的注销成功时才会发生messageConsumer
注销。
如果这不是您想要的行为,您可以使用 Completable
的不同方法。