通过 Observable(RxJava) 使用 Kafka
Using Kafka through Observable(RxJava)
我有一个生产者(使用 Kafka)和多个消费者。所以我在主题中发布一条消息,然后我的消费者接收并处理该消息。
我需要在生产者中收到至少一个消费者的响应(最好是第一个)。我正在尝试使用 RxJava 来做到这一点(可观察)。
这样可以吗?有人有例子吗?
你最好先分享你的解决方案...
由于SpringCloud Stream是一个mh流解决方案,而不是request/reply,所以没有示例可以与您分享。
你可以考虑把你的消费者也做成生产者。并且在原始生产者中有一个消费者可以从回复的主题中阅读。最后,您必须将回复数据与请求数据相关联。
RxJava 或任何其他实现细节不相关。
您可以按如下方式使用:
val consumer = new RxConsumer("zookeeper:2181", "consumer-group")
consumer.getRecordStream("cool-topic-(x|y|z)")
.map(deserialize)
.take(42 seconds)
.foreach(println)
consumer.shutdown()
有关详细信息,请参阅:
https://github.com/cjdev/kafka-rx
下面是我如何使用 rxjava '2.2.6' 来处理 Kafka 事件而无需任何额外的依赖:
import io.reactivex.Observable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
...
// Load consumer props
Properties props = new Properties();
props.load(KafkaUtils.class.getClassLoader().getResourceAsStream("kafka-client.properties"));
// Create a consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Subscribe to topics
consumer.subscribe(Arrays.asList(props.getProperty("kafkaTopics").split("\s*,\s*")));
// Create an Observable for topic events
Observable<ConsumerRecords<String, String>> observable = Observable.fromCallable(() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSecond(10);
return records;
});
// Process Observable events
observable.subscribe(records -> {
if ((records != null) && (!records.isEmpty())) {
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ": " + record.value());
}
}
});
我有一个生产者(使用 Kafka)和多个消费者。所以我在主题中发布一条消息,然后我的消费者接收并处理该消息。
我需要在生产者中收到至少一个消费者的响应(最好是第一个)。我正在尝试使用 RxJava 来做到这一点(可观察)。
这样可以吗?有人有例子吗?
你最好先分享你的解决方案...
由于SpringCloud Stream是一个mh流解决方案,而不是request/reply,所以没有示例可以与您分享。
你可以考虑把你的消费者也做成生产者。并且在原始生产者中有一个消费者可以从回复的主题中阅读。最后,您必须将回复数据与请求数据相关联。
RxJava 或任何其他实现细节不相关。
您可以按如下方式使用:
val consumer = new RxConsumer("zookeeper:2181", "consumer-group")
consumer.getRecordStream("cool-topic-(x|y|z)")
.map(deserialize)
.take(42 seconds)
.foreach(println)
consumer.shutdown()
有关详细信息,请参阅: https://github.com/cjdev/kafka-rx
下面是我如何使用 rxjava '2.2.6' 来处理 Kafka 事件而无需任何额外的依赖:
import io.reactivex.Observable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
...
// Load consumer props
Properties props = new Properties();
props.load(KafkaUtils.class.getClassLoader().getResourceAsStream("kafka-client.properties"));
// Create a consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Subscribe to topics
consumer.subscribe(Arrays.asList(props.getProperty("kafkaTopics").split("\s*,\s*")));
// Create an Observable for topic events
Observable<ConsumerRecords<String, String>> observable = Observable.fromCallable(() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSecond(10);
return records;
});
// Process Observable events
observable.subscribe(records -> {
if ((records != null) && (!records.isEmpty())) {
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ": " + record.value());
}
}
});