Samza 发送消息时会自动创建分区吗?
Does Samza create partitions automatically when sending messages?
如果您使用 Samza 的 OutgoingMessageEnvelope 以这种格式发送消息:
public OutgoingMessageEnvelope(SystemStream systemStream,
java.lang.Object partitionKey,
java.lang.Object key,
java.lang.Object message)
Constructs a new OutgoingMessageEnvelope from specified components.
Parameters:
systemStream - Object representing the appropriate stream of which this envelope will be sent on.
partitionKey - A key representing which partition of the systemStream to send this envelope on.
key - A deserialized key to be used for the message.
message - A deserialized message to be sent in this envelope.
并且您在流任务的 process() 方法中调用此方法并希望将传入消息路由到适当的分区,Samza 会在您调用该方法时为您创建分区吗?
例如
MessageA = {"id": "idA", "key": "keyA", "body":"some details"}
MessageB = {"id": "idB", "key": "keyB", "body":"some more details"}
如果我在流任务中调用 process()
,其中 msg
是一个消息实例:
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
// ...
String partition = msg["id"]
String key = msg["key"]
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "PartitionedMessages"), id, key, msg));
// ...
这会自动为我创建分区 idA 和 idB(即我是否需要在向它们发送消息之前创建这些分区)?我希望能够将消息路由到适当的分区,并且能够使用单独的消息密钥记录压缩。
创建主题时必须指定分区数。您不能动态添加新分区(好吧,您 可以 但这并不容易,而且 Samza 不会自动执行)。如果主题不存在但具有默认分区数,Samza 应该为您创建新主题。这取决于设置。你可以测试一下。
但是值msg["id"]
没有指定分区的名称。该值仅用于计算目标分区的数量。该值被散列为一个数字,然后使用模数进行修整。像这样(算法比较多,这是最基本的):
partitionID = hash(msg["id"]) % total_number_of_partitions
而partitionID
总是一个非负整数。这意味着您实际拥有多少个分区并不重要。它总是以一些结束。主要思想是,如果您有两条具有相同 msg["id"]
的消息,那么消息将最终位于相同的分区中。这通常就是你想要的。
日志压缩将如您预期的那样工作——它将从特定分区中删除具有相同密钥的消息(但如果您有两条具有相同密钥且位于两个不同分区的消息,它们将不会被删除)。
仅供参考,您可以使用 kafkacat 找出分区数和其他有用的东西。
如果您使用 Samza 的 OutgoingMessageEnvelope 以这种格式发送消息:
public OutgoingMessageEnvelope(SystemStream systemStream,
java.lang.Object partitionKey,
java.lang.Object key,
java.lang.Object message)
Constructs a new OutgoingMessageEnvelope from specified components.
Parameters:
systemStream - Object representing the appropriate stream of which this envelope will be sent on.
partitionKey - A key representing which partition of the systemStream to send this envelope on.
key - A deserialized key to be used for the message.
message - A deserialized message to be sent in this envelope.
并且您在流任务的 process() 方法中调用此方法并希望将传入消息路由到适当的分区,Samza 会在您调用该方法时为您创建分区吗?
例如
MessageA = {"id": "idA", "key": "keyA", "body":"some details"}
MessageB = {"id": "idB", "key": "keyB", "body":"some more details"}
如果我在流任务中调用 process()
,其中 msg
是一个消息实例:
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
// ...
String partition = msg["id"]
String key = msg["key"]
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "PartitionedMessages"), id, key, msg));
// ...
这会自动为我创建分区 idA 和 idB(即我是否需要在向它们发送消息之前创建这些分区)?我希望能够将消息路由到适当的分区,并且能够使用单独的消息密钥记录压缩。
创建主题时必须指定分区数。您不能动态添加新分区(好吧,您 可以 但这并不容易,而且 Samza 不会自动执行)。如果主题不存在但具有默认分区数,Samza 应该为您创建新主题。这取决于设置。你可以测试一下。
但是值msg["id"]
没有指定分区的名称。该值仅用于计算目标分区的数量。该值被散列为一个数字,然后使用模数进行修整。像这样(算法比较多,这是最基本的):
partitionID = hash(msg["id"]) % total_number_of_partitions
而partitionID
总是一个非负整数。这意味着您实际拥有多少个分区并不重要。它总是以一些结束。主要思想是,如果您有两条具有相同 msg["id"]
的消息,那么消息将最终位于相同的分区中。这通常就是你想要的。
日志压缩将如您预期的那样工作——它将从特定分区中删除具有相同密钥的消息(但如果您有两条具有相同密钥且位于两个不同分区的消息,它们将不会被删除)。
仅供参考,您可以使用 kafkacat 找出分区数和其他有用的东西。