随机分区程序不会在 Kafka 主题分区之间分发消息
Random partitioner does not distribute messages between Kafka topic partitions
我在 Kafka 中创建了一个有 9 个分区的主题,并恰当地命名了它 'test',并使用 Confluent.Kafka
客户端库在 C# (.NET Core) 中组合了两个简单的应用程序:生产者和消费者。我只是调整了一下 examples from the documentation.
我是 运行 两个消费者应用程序实例和一个生产者应用程序实例。我认为在此处粘贴消费者代码没有多大意义,这是一个微不足道的 'get a message, print it on screen' 应用程序,但是,它也会打印消息来自的分区编号。
这是生产者应用程序:
static async Task Main(string[] args)
{
var random = new Random();
var config = new ProducerConfig {
BootstrapServers = "10.0.0.5:9092",
Partitioner = Partitioner.ConsistentRandom
};
int counter = 0;
while (true)
{
using (var p = new ProducerBuilder<string, string>(config).Build())
{
try
{
p.BeginProduce(
"test",
new Message<string, string>
{
//Key = random.Next().ToString(),
Value = $"test {++counter}"
});
if (counter % 10 == 0)
p.Flush();
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}
问题:如果消息的Key
属性没有设置,所有消息都会发送到分区号7,这意味着其中之一我的消费者实例处于空闲状态。我不得不手动随机化密钥以便在分区之间分配它们(请参阅注释掉的行)。 (从文档中复制的原始代码使用 Null
作为密钥类型,这也将所有消息发送到第 7 个分区。)
这是为什么?根据 ProducerConfig.Partitioner
属性 的文档,如果未指定密钥,consistent_random
选项应确保随机分发。我尝试使用 Partioner.Random
选项,它应该使用随机分布而不考虑密钥,但这没有帮助。
这是预期的行为吗,我做错了什么,还是遇到了错误?
我正在使用 Confluent.Kafka NuGet 的 1.0.0-RC2 版本。
分区程序配置的完整文档:
// Summary:
// Partitioner: `random` - random distribution, `consistent` - CRC32 hash of key
// (Empty and NULL keys are mapped to single partition), `consistent_random` - CRC32
// hash of key (Empty and NULL keys are randomly partitioned), `murmur2` - Java
// Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition),
// `murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are
// randomly partitioned. This is functionally equivalent to the default partitioner
// in the Java Producer.). default: consistent_random importance: high
我遇到了同样的问题。
似乎在启动客户端时,第一条消息将始终进入同一分区。
如果您对所有消息使用相同的客户端,Partioner.Random 将起作用
我在 Kafka 中创建了一个有 9 个分区的主题,并恰当地命名了它 'test',并使用 Confluent.Kafka
客户端库在 C# (.NET Core) 中组合了两个简单的应用程序:生产者和消费者。我只是调整了一下 examples from the documentation.
我是 运行 两个消费者应用程序实例和一个生产者应用程序实例。我认为在此处粘贴消费者代码没有多大意义,这是一个微不足道的 'get a message, print it on screen' 应用程序,但是,它也会打印消息来自的分区编号。
这是生产者应用程序:
static async Task Main(string[] args)
{
var random = new Random();
var config = new ProducerConfig {
BootstrapServers = "10.0.0.5:9092",
Partitioner = Partitioner.ConsistentRandom
};
int counter = 0;
while (true)
{
using (var p = new ProducerBuilder<string, string>(config).Build())
{
try
{
p.BeginProduce(
"test",
new Message<string, string>
{
//Key = random.Next().ToString(),
Value = $"test {++counter}"
});
if (counter % 10 == 0)
p.Flush();
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}
问题:如果消息的Key
属性没有设置,所有消息都会发送到分区号7,这意味着其中之一我的消费者实例处于空闲状态。我不得不手动随机化密钥以便在分区之间分配它们(请参阅注释掉的行)。 (从文档中复制的原始代码使用 Null
作为密钥类型,这也将所有消息发送到第 7 个分区。)
这是为什么?根据 ProducerConfig.Partitioner
属性 的文档,如果未指定密钥,consistent_random
选项应确保随机分发。我尝试使用 Partioner.Random
选项,它应该使用随机分布而不考虑密钥,但这没有帮助。
这是预期的行为吗,我做错了什么,还是遇到了错误?
我正在使用 Confluent.Kafka NuGet 的 1.0.0-RC2 版本。
分区程序配置的完整文档:
// Summary:
// Partitioner: `random` - random distribution, `consistent` - CRC32 hash of key
// (Empty and NULL keys are mapped to single partition), `consistent_random` - CRC32
// hash of key (Empty and NULL keys are randomly partitioned), `murmur2` - Java
// Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition),
// `murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are
// randomly partitioned. This is functionally equivalent to the default partitioner
// in the Java Producer.). default: consistent_random importance: high
我遇到了同样的问题。 似乎在启动客户端时,第一条消息将始终进入同一分区。 如果您对所有消息使用相同的客户端,Partioner.Random 将起作用