服务生成消息时的 Kafka 自动化 Java

Kafka automation Java when service is producing the messages

我想自动化由 API 服务生成的 Kafka 消息。有人可以提供您的见解吗?

  1. 触发 REST API 请求
  2. kafka 消息正在日志中作为生产者发布
  3. 需要从日志中获取这些消息并验证它们

我和团队终于得到了答案,并张贴在这里供任何后代使用, 从下面的第二行开始,一旦返回消息,就需要编写自己的断言来验证 kafka 消息是否有效且格式是否正确 JSON。

KafkaPicker kafkaPicker = new KafkaPicker("Charging", properties);
ConsumerRecords<Object, Object> messages = kafkaPicker.returnAll();
System.out.println("Charging topic messages are : " + messages);
kafkaPicker.close();


public class KafkaPicker implements AutoCloseable {
private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
private final String topicName;
private final KafkaConsumer<Object, Object> consumer;

public KafkaPicker(String topicName, Properties properties) {
    this.topicName = topicName;
    consumer = new KafkaConsumer<>(properties);
}

public void readSpecific(Long offset, Integer partition) {
    TopicPartition topicPartition = new TopicPartition(topicName, partition);
    consumer.assign(Collections.singletonList(topicPartition));
    consumer.seek(topicPartition, offset);
    ConsumerRecords<Object, Object> records = consumer.poll(POLL_TIMEOUT);
    records.forEach(a -> System.out.println(a.value()));
}

public void readAll() {
    TopicPartition topicPartition = new TopicPartition(topicName, 0);
    List<TopicPartition> partitions = Collections.singletonList(topicPartition);
    consumer.assign(partitions);
    consumer.seekToBeginning(partitions);
    ConsumerRecords<Object, Object> records = consumer.poll(POLL_TIMEOUT);
    records.forEach(a -> System.out.println(a.value()));
}

public ConsumerRecords<Object, Object> returnAll() {
    TopicPartition topicPartition = new TopicPartition(topicName, 0);
    List<TopicPartition> partitions = Collections.singletonList(topicPartition);
    consumer.assign(partitions);
    consumer.seekToBeginning(partitions);
    boolean size = true;
    ConsumerRecords<Object, Object> records = null;
    while (size) {
        records = consumer.poll(POLL_TIMEOUT);
        if (records.count() != 0)
            size = false;
    }
    return records;
}

public void close() {
    consumer.close();
}