如何在 spring 启动时向 EmbeddedKafka 发送具有特定偏移量的消息?

How to send a message with specific offset to EmbeddedKafka in spring boot?

我正在为连接到 Kafka 以使用和发布数据的应用程序编写集成测试,为此我正在使用 EmbeddedKafka。部分逻辑是使用具有特定偏移量的消息。我想模拟这个,因此我的目标是:

  1. 向 EmbeddedKafka 发送一些消息,但具有特定的偏移量
  2. 以相同的偏移量消耗它们

这现在不起作用,即我正在使用 KafkaHeaders.OFFSET 发送消息,但它被忽略了,我之后使用的消息具有不同的偏移量。事实上,偏移量只是从 0 开始,然后递增。

MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(payload)
        .setHeader(KafkaHeaders.TOPIC, topic)
        .setHeader(KafkaHeaders.MESSAGE_KEY, key)
        .setHeader(KafkaHeaders.PARTITION_ID, partition);
        .setHeader(KafkaHeaders.OFFSET, val);
kafkaTemplate.send(messageBuilder.build());

KafkaTemplate<String, String> 以标准方式初始化。另一方面,我以标准方式消费:

private class MessagesWithOffsetsConsumer implements BatchMessageListener<String, String>, ConsumerSeekAware {

    MessagesWithOffsetsConsumer() {
    }

    @Override
    public void onMessage(List<ConsumerRecord<String, String>> records) {

        records.forEach(record -> {
            String id = record.key();
            String dataAssetPayload = record.value();
            int partitionId = record.partition();

            LOGGER.info("Received record: {} offset: {}", id, record.offset());
    }
}

简而言之,在 onMessage 中收到的消息的偏移量与在消息构建期间设置的不同。

有什么办法可以实现吗?

不,ProducerRecord 上没有类似显式偏移值的东西。 您只能设置这些道具:

public class ProducerRecord<K, V> {

    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;

而这正是 KafkaTemplate 逻辑所做的。 KafkaHeaders.OFFSET 是消费端 属性:

public class ConsumerRecord<K, V> {

    private final String topic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final Headers headers;
    private final K key;
    private final V value;

换句话说:Apache Kafka 不是为显式偏移设置而设计的。