使用 KafkaEmbedded 是否不能保证消息顺序?
Is message order not guaranteed using KafkaEmbedded?
我使用 KafkaEmbedded
(和 KafkaTemplate
)进行了单元测试,但消息顺序是随机的。有谁知道这是否合乎逻辑,是否可能保证订单?
这是我的代码:
public class KafkaTest {
private static String TOPIC = "test.topic";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TOPIC);
@Test
public void testEmbeddedKafkaSendOrder() throws Exception {
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
KafkaTemplate<String, byte[]> kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig));
kafkaTemplate.send(TOPIC, "TEST1".getBytes()).get();
kafkaTemplate.send(TOPIC, "TEST2".getBytes()).get();
kafkaTemplate.send(TOPIC, "TEST3".getBytes()).get();
kafkaTemplate.send(TOPIC, "TEST4".getBytes()).get();
kafkaTemplate.send(TOPIC, "TEST5".getBytes()).get();
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-test-group");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfig.put("auto.offset.reset", "earliest");
final Consumer<String, byte[]> consumer = new KafkaConsumer<>(consumerConfig);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, TOPIC);
ConsumerRecords<String, byte[]> records = consumer.poll(100L);
// Tests
final Iterator<ConsumerRecord<String, byte[]>> recordIterator = records.iterator();
while (recordIterator.hasNext()) {
System.out.println("received:" + new String(recordIterator.next().value()));
}
}
例如,此代码打印(但顺序可以更改):
received:TEST2
received:TEST4
received:TEST1
received:TEST3
received:TEST5
在 Kafka 中,您可以确定消息的顺序在同一分区上是相同的,但在主题上不同。
Note that as a topic typically has multiple partitions, there is
no guarantee of message time-ordering across the entire topic, just within a single
partition
引自书本Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale
。
您可以对此做些什么以及如何按顺序接收消息?
选项 1:
kafkaTemplate.send(TOPIC,"1", "TEST1".getBytes()).get();
kafkaTemplate.send(TOPIC,"1", "TEST2".getBytes()).get();
kafkaTemplate.send(TOPIC,"1", "TEST3".getBytes()).get();
kafkaTemplate.send(TOPIC,"1", "TEST4".getBytes()).get();
kafkaTemplate.send(TOPIC,"1", "TEST5".getBytes()).get();
这样,对于每个值,您发送相同的键“1”。 Kafka 将根据您的密钥选择分区。由于所有密钥都相同,所有消息都将发送到同一个分区,您将按顺序收到您的记录。
选项 2:
以这种方式初始化 KafkaEmbedded:
new KafkaEmbedded(1, true,1, TOPIC);
通过这种方式,您告诉 kafka 对于这个主题,您希望只有一个分区,这样每条记录都将转到该分区。
我使用 KafkaEmbedded
(和 KafkaTemplate
)进行了单元测试,但消息顺序是随机的。有谁知道这是否合乎逻辑,是否可能保证订单?
这是我的代码:
public class KafkaTest {
private static String TOPIC = "test.topic";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TOPIC);
@Test
public void testEmbeddedKafkaSendOrder() throws Exception {
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
KafkaTemplate<String, byte[]> kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig));
kafkaTemplate.send(TOPIC, "TEST1".getBytes()).get();
kafkaTemplate.send(TOPIC, "TEST2".getBytes()).get();
kafkaTemplate.send(TOPIC, "TEST3".getBytes()).get();
kafkaTemplate.send(TOPIC, "TEST4".getBytes()).get();
kafkaTemplate.send(TOPIC, "TEST5".getBytes()).get();
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-test-group");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfig.put("auto.offset.reset", "earliest");
final Consumer<String, byte[]> consumer = new KafkaConsumer<>(consumerConfig);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, TOPIC);
ConsumerRecords<String, byte[]> records = consumer.poll(100L);
// Tests
final Iterator<ConsumerRecord<String, byte[]>> recordIterator = records.iterator();
while (recordIterator.hasNext()) {
System.out.println("received:" + new String(recordIterator.next().value()));
}
}
例如,此代码打印(但顺序可以更改):
received:TEST2
received:TEST4
received:TEST1
received:TEST3
received:TEST5
在 Kafka 中,您可以确定消息的顺序在同一分区上是相同的,但在主题上不同。
Note that as a topic typically has multiple partitions, there is
no guarantee of message time-ordering across the entire topic, just within a single
partition
引自书本Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale
。
您可以对此做些什么以及如何按顺序接收消息?
选项 1:
kafkaTemplate.send(TOPIC,"1", "TEST1".getBytes()).get();
kafkaTemplate.send(TOPIC,"1", "TEST2".getBytes()).get();
kafkaTemplate.send(TOPIC,"1", "TEST3".getBytes()).get();
kafkaTemplate.send(TOPIC,"1", "TEST4".getBytes()).get();
kafkaTemplate.send(TOPIC,"1", "TEST5".getBytes()).get();
这样,对于每个值,您发送相同的键“1”。 Kafka 将根据您的密钥选择分区。由于所有密钥都相同,所有消息都将发送到同一个分区,您将按顺序收到您的记录。
选项 2: 以这种方式初始化 KafkaEmbedded:
new KafkaEmbedded(1, true,1, TOPIC);
通过这种方式,您告诉 kafka 对于这个主题,您希望只有一个分区,这样每条记录都将转到该分区。