Reactive Kafka Receiver 可以与非反应性 Elasticsearch 客户端一起使用吗?
Can Reactive Kafka Receiver work with non-reactive Elasticsearch client?
下面是一个示例代码,它使用 reactor-kafka 并从主题(具有重试逻辑)中读取数据,该主题具有通过非反应性生产者发布的记录。在我的 doOnNext()
消费者中,我使用的是非反应性 elasticsearch 客户端,它在索引中对记录进行索引。所以我有几个问题我还不清楚:
- 我知道消费者和生产者是独立的解耦系统,但是是否建议有反应性生产者以及消费者是反应性的?
- 如果我使用的是非反应性的东西,在本例中为 Elasticsearch 客户端
org.elasticsearch.client.RestClient
,代码的 “反应性” 是否有效?如果有或没有,我该如何测试? (“反应性”,我的意思是它的非阻塞 IO 部分,即如果我产生三个反应性消费者并且一个由于某种原因是潜在的,线程应该被解锁并用于其他反应性消费者)。
- 总的来说,问题是,如果我用反应性客户端包装一些 API,API 是否也应该反应性?
public Disposable consumeRecords() {
long maxAttempts = 3, duration = 10;
RetryBackoffSpec retrySpec = Retry.backoff(maxAttempts, Duration.ofSeconds(duration)).transientErrors(true);
Consumer<ReceiverRecord<K, V>> doOnNextConsumer = x -> {
// use non-reactive elastic search client and index record x
};
return KafkaReceiver.create(receiverOptions)
.receive()
.doOnNext(record -> {
try {
// calling the non-reactive consumer
doOnNextConsumer.accept(record);
} catch (Exception e) {
throw new ReceiverRecordException(record, e);
}
record.receiverOffset().acknowledge();
})
.doOnError(t -> log.error("Error occurred: ", t))
.retryWhen(retrySpec)
.onErrorContinue((e, record) -> {
ReceiverRecordException receiverRecordException = (ReceiverRecordException) e;
log.error("Retries exhausted for: " + receiverRecordException);
receiverRecordException.getRecord().receiverOffset().acknowledge();
})
.repeat()
.subscribe();
}
对它有了一些了解。
Reactive KafkaReceiver 会在内部调用一些API;如果 API 是 阻塞 API 那么即使 KafkaReceiver 是“反应性的”,非阻塞 IO 也不会工作并且接收器线程将被阻塞,因为您正在调用 Blocking API / non-reactive API.
您可以通过创建一个简单的服务器(它会阻止调用一段时间/睡眠)并从此接收器调用该服务器来对此进行测试
下面是一个示例代码,它使用 reactor-kafka 并从主题(具有重试逻辑)中读取数据,该主题具有通过非反应性生产者发布的记录。在我的 doOnNext()
消费者中,我使用的是非反应性 elasticsearch 客户端,它在索引中对记录进行索引。所以我有几个问题我还不清楚:
- 我知道消费者和生产者是独立的解耦系统,但是是否建议有反应性生产者以及消费者是反应性的?
- 如果我使用的是非反应性的东西,在本例中为 Elasticsearch 客户端
org.elasticsearch.client.RestClient
,代码的 “反应性” 是否有效?如果有或没有,我该如何测试? (“反应性”,我的意思是它的非阻塞 IO 部分,即如果我产生三个反应性消费者并且一个由于某种原因是潜在的,线程应该被解锁并用于其他反应性消费者)。 - 总的来说,问题是,如果我用反应性客户端包装一些 API,API 是否也应该反应性?
public Disposable consumeRecords() {
long maxAttempts = 3, duration = 10;
RetryBackoffSpec retrySpec = Retry.backoff(maxAttempts, Duration.ofSeconds(duration)).transientErrors(true);
Consumer<ReceiverRecord<K, V>> doOnNextConsumer = x -> {
// use non-reactive elastic search client and index record x
};
return KafkaReceiver.create(receiverOptions)
.receive()
.doOnNext(record -> {
try {
// calling the non-reactive consumer
doOnNextConsumer.accept(record);
} catch (Exception e) {
throw new ReceiverRecordException(record, e);
}
record.receiverOffset().acknowledge();
})
.doOnError(t -> log.error("Error occurred: ", t))
.retryWhen(retrySpec)
.onErrorContinue((e, record) -> {
ReceiverRecordException receiverRecordException = (ReceiverRecordException) e;
log.error("Retries exhausted for: " + receiverRecordException);
receiverRecordException.getRecord().receiverOffset().acknowledge();
})
.repeat()
.subscribe();
}
对它有了一些了解。
Reactive KafkaReceiver 会在内部调用一些API;如果 API 是 阻塞 API 那么即使 KafkaReceiver 是“反应性的”,非阻塞 IO 也不会工作并且接收器线程将被阻塞,因为您正在调用 Blocking API / non-reactive API.
您可以通过创建一个简单的服务器(它会阻止调用一段时间/睡眠)并从此接收器调用该服务器来对此进行测试