向 Kafka 发送消息时是否需要密钥?
Is key required as part of sending messages to Kafka?
KeyedMessage<String, byte[]> keyedMessage = new KeyedMessage<String, byte[]>(request.getRequestTopicName(), SerializationUtils.serialize(message));
producer.send(keyedMessage);
目前,作为密钥消息的一部分,我正在发送没有任何密钥的消息,它仍然适用于 delete.retention.ms
吗?我需要将密钥作为消息的一部分发送吗?将密钥作为消息的一部分好吗?
如果您需要密钥的严格顺序并且正在开发状态机之类的东西,那么密钥主要是 useful/necessary。如果您需要始终以正确的顺序查看具有相同键(例如,唯一 ID)的消息,则将键附加到消息将确保具有相同键的消息始终进入主题中的相同分区。 Kafka 保证分区内的顺序,但不保证主题中跨分区的顺序,因此或者不提供键(这将导致跨分区的循环分配)将不会维持这样的顺序。
在状态机的情况下,密钥可以与 log.cleaner.enable 一起使用,以删除具有相同密钥的重复项。在这种情况下,Kafka 假定您的应用程序只关心给定键的最新实例,并且只有当键不为空时,日志清理器才会删除给定键的旧副本。这种形式的日志压缩由 log.cleaner.delete.retention 属性 控制并且需要密钥。
或者,更常见的 属性 log.retention.hours,默认情况下启用,通过删除过时的日志的完整片段来工作.在这种情况下,不必提供密钥。 Kafka 将简单地删除早于给定保留期的日志块。
就是说,如果您启用了 log compaction 或要求对具有相同密钥的消息进行严格排序,那么您绝对应该使用密钥。否则,在某些键可能比其他键出现得更多的情况下,空键可以提供更好的分布并防止潜在的热点问题。
tl;博士
不,向 Kafka 发送消息不需要密钥。但是...
除了非常有用的已接受答案外,我还想添加更多详细信息
分区
默认情况下,Kafka 使用消息的键 select 它写入的主题分区。这是由
在 DefaultPartitioner
中完成的
kafka.common.utils.Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
如果没有提供密钥,Kafka 将以循环方式对数据进行分区。
在 Kafka 中,可以通过扩展 Partitioner
class 创建自己的分区器。为此,您需要覆盖具有签名的 partition
方法:
int partition(String topic,
Object key,
byte[] keyBytes,
Object value,
byte[] valueBytes,
Cluster cluster)
通常,Kafka消息的key用于select分区和return值(int
类型)是分区号。没有密钥,您需要依赖处理起来可能复杂得多的值。
正在订购
如给定答案中所述,Kafka 仅在分区级别保证消息的排序。
假设您要在具有两个分区的 Kafka 主题中为您的客户存储金融交易。消息可能看起来像 (key:value)
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 2, "changeInBankAccount": +100}
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": -1337}
null:{"customerId": 1, "changeInBankAccount": +200}
由于我们没有定义键,因此这两个分区可能看起来像
// partition 0
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": +200}
// partition 1
null:{"customerId": 2, "changeInBankAccount": +100}
null:{"customerId": 1, "changeInBankAccount": -1337}
您的消费者阅读该主题可能最终会告诉您帐户余额在特定时间为 600,但事实并非如此!只是因为它先于分区 1 中的消息读取分区 0 中的所有消息。
使用有意义的键(如 customerId)可以避免这种情况,因为分区会像这样:
// partition 0
1:{"customerId": 1, "changeInBankAccount": +200}
1:{"customerId": 1, "changeInBankAccount": +200}
1:{"customerId": 1, "changeInBankAccount": -1337}
1:{"customerId": 1, "changeInBankAccount": +200}
// partition 1
2:{"customerId": 2, "changeInBankAccount": +100}
请记住,只有生产者配置 max.in.flight.requests.per.connection
设置为 1
才能保证分区内的顺序。然而,该配置的默认值是 5
,它被描述为:
"The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled)."
您可以在 上的另一个 Whosebug post 中找到更多详细信息。
日志压缩
如果消息中没有密钥,您将无法将主题配置 cleanup.policy
设置为 compacted
。根据 documentation“日志压缩确保 Kafka 始终至少保留单个主题分区数据日志中每个消息键的最后已知值。”。
如果没有任何密钥,将无法使用此漂亮且有用的设置。
键的使用
在实际使用案例中,Kafka 消息的键会对您的性能和业务逻辑的清晰度产生巨大影响。
例如,密钥可以自然地用于对数据进行分区。由于您可以控制您的消费者从特定分区读取数据,因此这可以作为一个高效的过滤器。此外,密钥可以包含一些关于消息实际值的元数据,帮助您控制后续处理。键通常比值小,因此解析键而不是整个值更方便。同时,您也可以使用密钥应用所有序列化和模式注册,就像您的值一样。
作为说明,还有Header的概念可以用来存储信息,见documentation.
带消息的键基本上是为了获取特定字段的消息排序。
- 如果 key=null,则数据被发送 round-robin(到分布式环境中的不同分区和不同代理,当然还有相同的主题。)。
- 如果发送密钥,则该密钥的所有消息将始终转到同一分区。
解释和举例
- key可以是任意字符串或整数等。以整数employee_id为例作为key。
- 所以emplyee_id123总是去partition 0,employee_id345总是去partition 1,这是由key hashing算法决定的,取决于partitions的数量
- 如果您不发送任何密钥,则消息可以使用 round-robin 技术发送到任何分区。
KeyedMessage<String, byte[]> keyedMessage = new KeyedMessage<String, byte[]>(request.getRequestTopicName(), SerializationUtils.serialize(message));
producer.send(keyedMessage);
目前,作为密钥消息的一部分,我正在发送没有任何密钥的消息,它仍然适用于 delete.retention.ms
吗?我需要将密钥作为消息的一部分发送吗?将密钥作为消息的一部分好吗?
如果您需要密钥的严格顺序并且正在开发状态机之类的东西,那么密钥主要是 useful/necessary。如果您需要始终以正确的顺序查看具有相同键(例如,唯一 ID)的消息,则将键附加到消息将确保具有相同键的消息始终进入主题中的相同分区。 Kafka 保证分区内的顺序,但不保证主题中跨分区的顺序,因此或者不提供键(这将导致跨分区的循环分配)将不会维持这样的顺序。
在状态机的情况下,密钥可以与 log.cleaner.enable 一起使用,以删除具有相同密钥的重复项。在这种情况下,Kafka 假定您的应用程序只关心给定键的最新实例,并且只有当键不为空时,日志清理器才会删除给定键的旧副本。这种形式的日志压缩由 log.cleaner.delete.retention 属性 控制并且需要密钥。
或者,更常见的 属性 log.retention.hours,默认情况下启用,通过删除过时的日志的完整片段来工作.在这种情况下,不必提供密钥。 Kafka 将简单地删除早于给定保留期的日志块。
就是说,如果您启用了 log compaction 或要求对具有相同密钥的消息进行严格排序,那么您绝对应该使用密钥。否则,在某些键可能比其他键出现得更多的情况下,空键可以提供更好的分布并防止潜在的热点问题。
tl;博士 不,向 Kafka 发送消息不需要密钥。但是...
除了非常有用的已接受答案外,我还想添加更多详细信息
分区
默认情况下,Kafka 使用消息的键 select 它写入的主题分区。这是由
在DefaultPartitioner
中完成的
kafka.common.utils.Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
如果没有提供密钥,Kafka 将以循环方式对数据进行分区。
在 Kafka 中,可以通过扩展 Partitioner
class 创建自己的分区器。为此,您需要覆盖具有签名的 partition
方法:
int partition(String topic,
Object key,
byte[] keyBytes,
Object value,
byte[] valueBytes,
Cluster cluster)
通常,Kafka消息的key用于select分区和return值(int
类型)是分区号。没有密钥,您需要依赖处理起来可能复杂得多的值。
正在订购
如给定答案中所述,Kafka 仅在分区级别保证消息的排序。
假设您要在具有两个分区的 Kafka 主题中为您的客户存储金融交易。消息可能看起来像 (key:value)
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 2, "changeInBankAccount": +100}
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": -1337}
null:{"customerId": 1, "changeInBankAccount": +200}
由于我们没有定义键,因此这两个分区可能看起来像
// partition 0
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": +200}
// partition 1
null:{"customerId": 2, "changeInBankAccount": +100}
null:{"customerId": 1, "changeInBankAccount": -1337}
您的消费者阅读该主题可能最终会告诉您帐户余额在特定时间为 600,但事实并非如此!只是因为它先于分区 1 中的消息读取分区 0 中的所有消息。
使用有意义的键(如 customerId)可以避免这种情况,因为分区会像这样:
// partition 0
1:{"customerId": 1, "changeInBankAccount": +200}
1:{"customerId": 1, "changeInBankAccount": +200}
1:{"customerId": 1, "changeInBankAccount": -1337}
1:{"customerId": 1, "changeInBankAccount": +200}
// partition 1
2:{"customerId": 2, "changeInBankAccount": +100}
请记住,只有生产者配置 max.in.flight.requests.per.connection
设置为 1
才能保证分区内的顺序。然而,该配置的默认值是 5
,它被描述为:
"The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled)."
您可以在
日志压缩
如果消息中没有密钥,您将无法将主题配置 cleanup.policy
设置为 compacted
。根据 documentation“日志压缩确保 Kafka 始终至少保留单个主题分区数据日志中每个消息键的最后已知值。”。
如果没有任何密钥,将无法使用此漂亮且有用的设置。
键的使用
在实际使用案例中,Kafka 消息的键会对您的性能和业务逻辑的清晰度产生巨大影响。
例如,密钥可以自然地用于对数据进行分区。由于您可以控制您的消费者从特定分区读取数据,因此这可以作为一个高效的过滤器。此外,密钥可以包含一些关于消息实际值的元数据,帮助您控制后续处理。键通常比值小,因此解析键而不是整个值更方便。同时,您也可以使用密钥应用所有序列化和模式注册,就像您的值一样。
作为说明,还有Header的概念可以用来存储信息,见documentation.
带消息的键基本上是为了获取特定字段的消息排序。
- 如果 key=null,则数据被发送 round-robin(到分布式环境中的不同分区和不同代理,当然还有相同的主题。)。
- 如果发送密钥,则该密钥的所有消息将始终转到同一分区。
解释和举例
- key可以是任意字符串或整数等。以整数employee_id为例作为key。
- 所以emplyee_id123总是去partition 0,employee_id345总是去partition 1,这是由key hashing算法决定的,取决于partitions的数量
- 如果您不发送任何密钥,则消息可以使用 round-robin 技术发送到任何分区。