如何在 spring 启动时向 EmbeddedKafka 发送具有特定偏移量的消息?
How to send a message with specific offset to EmbeddedKafka in spring boot?
我正在为连接到 Kafka 以使用和发布数据的应用程序编写集成测试,为此我正在使用 EmbeddedKafka。部分逻辑是使用具有特定偏移量的消息。我想模拟这个,因此我的目标是:
- 向 EmbeddedKafka 发送一些消息,但具有特定的偏移量
- 以相同的偏移量消耗它们
这现在不起作用,即我正在使用 KafkaHeaders.OFFSET
发送消息,但它被忽略了,我之后使用的消息具有不同的偏移量。事实上,偏移量只是从 0 开始,然后递增。
MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(payload)
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.MESSAGE_KEY, key)
.setHeader(KafkaHeaders.PARTITION_ID, partition);
.setHeader(KafkaHeaders.OFFSET, val);
kafkaTemplate.send(messageBuilder.build());
KafkaTemplate<String, String>
以标准方式初始化。另一方面,我以标准方式消费:
private class MessagesWithOffsetsConsumer implements BatchMessageListener<String, String>, ConsumerSeekAware {
MessagesWithOffsetsConsumer() {
}
@Override
public void onMessage(List<ConsumerRecord<String, String>> records) {
records.forEach(record -> {
String id = record.key();
String dataAssetPayload = record.value();
int partitionId = record.partition();
LOGGER.info("Received record: {} offset: {}", id, record.offset());
}
}
简而言之,在 onMessage
中收到的消息的偏移量与在消息构建期间设置的不同。
有什么办法可以实现吗?
不,ProducerRecord
上没有类似显式偏移值的东西。
您只能设置这些道具:
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
而这正是 KafkaTemplate
逻辑所做的。 KafkaHeaders.OFFSET
是消费端 属性:
public class ConsumerRecord<K, V> {
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final Headers headers;
private final K key;
private final V value;
换句话说:Apache Kafka 不是为显式偏移设置而设计的。
我正在为连接到 Kafka 以使用和发布数据的应用程序编写集成测试,为此我正在使用 EmbeddedKafka。部分逻辑是使用具有特定偏移量的消息。我想模拟这个,因此我的目标是:
- 向 EmbeddedKafka 发送一些消息,但具有特定的偏移量
- 以相同的偏移量消耗它们
这现在不起作用,即我正在使用 KafkaHeaders.OFFSET
发送消息,但它被忽略了,我之后使用的消息具有不同的偏移量。事实上,偏移量只是从 0 开始,然后递增。
MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(payload)
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.MESSAGE_KEY, key)
.setHeader(KafkaHeaders.PARTITION_ID, partition);
.setHeader(KafkaHeaders.OFFSET, val);
kafkaTemplate.send(messageBuilder.build());
KafkaTemplate<String, String>
以标准方式初始化。另一方面,我以标准方式消费:
private class MessagesWithOffsetsConsumer implements BatchMessageListener<String, String>, ConsumerSeekAware {
MessagesWithOffsetsConsumer() {
}
@Override
public void onMessage(List<ConsumerRecord<String, String>> records) {
records.forEach(record -> {
String id = record.key();
String dataAssetPayload = record.value();
int partitionId = record.partition();
LOGGER.info("Received record: {} offset: {}", id, record.offset());
}
}
简而言之,在 onMessage
中收到的消息的偏移量与在消息构建期间设置的不同。
有什么办法可以实现吗?
不,ProducerRecord
上没有类似显式偏移值的东西。
您只能设置这些道具:
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
而这正是 KafkaTemplate
逻辑所做的。 KafkaHeaders.OFFSET
是消费端 属性:
public class ConsumerRecord<K, V> {
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final Headers headers;
private final K key;
private final V value;
换句话说:Apache Kafka 不是为显式偏移设置而设计的。