如何只获取Kafka主题的最新消息?
How to get only the latest message of a Kafka topic?
有没有办法分解消息,只消费最新的消息?
我尝试将消息保存在列表中,但没有成功
var consumer = new Consumer(new ConsumerOptions(topic, router));
foreach (var message in consumer.Consume())
{
Console.WriteLine(Encoding.UTF8.GetString(message.Value));
}
输出应为:1、2、3、4
输出为:1, 1, 2, 1, 2, 3, 1, 2, 3, 4
事实上,做你想做的唯一方法是一个只有一个分区的主题并设置max.poll.records 到一。
不然也没办法了,因为最后一条消息没有意义。任何输入消息都可以被推送到不同的分区中,并且您有一些与主题所具有的分区数相关的最后消息。
您可以使用名为 Log Compaction 的东西,它将 "truncate" 条消息使用相同的密钥。因此,当您使用相同的密钥发送消息时,您只会获得该密钥的最后一个 message/value。默认情况下启用此功能。当您发送消息 1, 1, 2, 1, 2, 3, 1, 2, 3, 4 并且您想将它们读为 1, 2, 3、4 你给消息相同的密钥,它们应该被彼此覆盖(所以所有的 1 消息得到相同的密钥,所有的 2 条消息获得相同的密钥,...)。
有没有办法分解消息,只消费最新的消息?
我尝试将消息保存在列表中,但没有成功
var consumer = new Consumer(new ConsumerOptions(topic, router));
foreach (var message in consumer.Consume())
{
Console.WriteLine(Encoding.UTF8.GetString(message.Value));
}
输出应为:1、2、3、4
输出为:1, 1, 2, 1, 2, 3, 1, 2, 3, 4
事实上,做你想做的唯一方法是一个只有一个分区的主题并设置max.poll.records 到一。
不然也没办法了,因为最后一条消息没有意义。任何输入消息都可以被推送到不同的分区中,并且您有一些与主题所具有的分区数相关的最后消息。
您可以使用名为 Log Compaction 的东西,它将 "truncate" 条消息使用相同的密钥。因此,当您使用相同的密钥发送消息时,您只会获得该密钥的最后一个 message/value。默认情况下启用此功能。当您发送消息 1, 1, 2, 1, 2, 3, 1, 2, 3, 4 并且您想将它们读为 1, 2, 3、4 你给消息相同的密钥,它们应该被彼此覆盖(所以所有的 1 消息得到相同的密钥,所有的 2 条消息获得相同的密钥,...)。