服务生成消息时的 Kafka 自动化 Java
Kafka automation Java when service is producing the messages
我想自动化由 API 服务生成的 Kafka 消息。有人可以提供您的见解吗?
- 触发 REST API 请求
- kafka 消息正在日志中作为生产者发布
- 需要从日志中获取这些消息并验证它们
我和团队终于得到了答案,并张贴在这里供任何后代使用,
从下面的第二行开始,一旦返回消息,就需要编写自己的断言来验证 kafka 消息是否有效且格式是否正确 JSON。
KafkaPicker kafkaPicker = new KafkaPicker("Charging", properties);
ConsumerRecords<Object, Object> messages = kafkaPicker.returnAll();
System.out.println("Charging topic messages are : " + messages);
kafkaPicker.close();
public class KafkaPicker implements AutoCloseable {
private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
private final String topicName;
private final KafkaConsumer<Object, Object> consumer;
public KafkaPicker(String topicName, Properties properties) {
this.topicName = topicName;
consumer = new KafkaConsumer<>(properties);
}
public void readSpecific(Long offset, Integer partition) {
TopicPartition topicPartition = new TopicPartition(topicName, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, offset);
ConsumerRecords<Object, Object> records = consumer.poll(POLL_TIMEOUT);
records.forEach(a -> System.out.println(a.value()));
}
public void readAll() {
TopicPartition topicPartition = new TopicPartition(topicName, 0);
List<TopicPartition> partitions = Collections.singletonList(topicPartition);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
ConsumerRecords<Object, Object> records = consumer.poll(POLL_TIMEOUT);
records.forEach(a -> System.out.println(a.value()));
}
public ConsumerRecords<Object, Object> returnAll() {
TopicPartition topicPartition = new TopicPartition(topicName, 0);
List<TopicPartition> partitions = Collections.singletonList(topicPartition);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
boolean size = true;
ConsumerRecords<Object, Object> records = null;
while (size) {
records = consumer.poll(POLL_TIMEOUT);
if (records.count() != 0)
size = false;
}
return records;
}
public void close() {
consumer.close();
}
我想自动化由 API 服务生成的 Kafka 消息。有人可以提供您的见解吗?
- 触发 REST API 请求
- kafka 消息正在日志中作为生产者发布
- 需要从日志中获取这些消息并验证它们
我和团队终于得到了答案,并张贴在这里供任何后代使用, 从下面的第二行开始,一旦返回消息,就需要编写自己的断言来验证 kafka 消息是否有效且格式是否正确 JSON。
KafkaPicker kafkaPicker = new KafkaPicker("Charging", properties);
ConsumerRecords<Object, Object> messages = kafkaPicker.returnAll();
System.out.println("Charging topic messages are : " + messages);
kafkaPicker.close();
public class KafkaPicker implements AutoCloseable {
private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
private final String topicName;
private final KafkaConsumer<Object, Object> consumer;
public KafkaPicker(String topicName, Properties properties) {
this.topicName = topicName;
consumer = new KafkaConsumer<>(properties);
}
public void readSpecific(Long offset, Integer partition) {
TopicPartition topicPartition = new TopicPartition(topicName, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, offset);
ConsumerRecords<Object, Object> records = consumer.poll(POLL_TIMEOUT);
records.forEach(a -> System.out.println(a.value()));
}
public void readAll() {
TopicPartition topicPartition = new TopicPartition(topicName, 0);
List<TopicPartition> partitions = Collections.singletonList(topicPartition);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
ConsumerRecords<Object, Object> records = consumer.poll(POLL_TIMEOUT);
records.forEach(a -> System.out.println(a.value()));
}
public ConsumerRecords<Object, Object> returnAll() {
TopicPartition topicPartition = new TopicPartition(topicName, 0);
List<TopicPartition> partitions = Collections.singletonList(topicPartition);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
boolean size = true;
ConsumerRecords<Object, Object> records = null;
while (size) {
records = consumer.poll(POLL_TIMEOUT);
if (records.count() != 0)
size = false;
}
return records;
}
public void close() {
consumer.close();
}