如何使用 Samza 在 Kafka 主题上创建分区?
How can you create a partition on a Kafka topic using Samza?
我有一些 Samza 作业 运行 所有读取 Kafka 主题的消息并向新主题写入新消息。要发送新消息,我使用 Samza 的内置 OutgoingMessageEnvelope。还使用 MessageCollector 发送新消息。它看起来像这样:
collector.send(new OutgoingMessageEnvelope(SystemStream, newMessage))
有什么方法可以使用它向 Kafka 主题添加分区吗?比如按用户ID分区之类的。
或者如果有更好的方法我很想听听!
您应该可以使用 partitioning key、
发送消息
public OutgoingMessageEnvelope(SystemStream systemStream,
java.lang.Object partitionKey,
java.lang.Object key,
java.lang.Object message)
Constructs a new OutgoingMessageEnvelope from specified components.
Parameters:
systemStream - Object representing the appropriate stream of which this envelope will be sent on.
partitionKey - A key representing which partition of the systemStream to send this envelope on.
key - A deserialized key to be used for the message.
message - A deserialized message to be sent in this envelope.
使用此方法将对您的数据进行分区。但是我认为,如果您正在考虑以编程方式控制分区数量,则应该使用 kafka API 到 create/alter 提到的主题 here
我有一些 Samza 作业 运行 所有读取 Kafka 主题的消息并向新主题写入新消息。要发送新消息,我使用 Samza 的内置 OutgoingMessageEnvelope。还使用 MessageCollector 发送新消息。它看起来像这样:
collector.send(new OutgoingMessageEnvelope(SystemStream, newMessage))
有什么方法可以使用它向 Kafka 主题添加分区吗?比如按用户ID分区之类的。
或者如果有更好的方法我很想听听!
您应该可以使用 partitioning key、
发送消息 public OutgoingMessageEnvelope(SystemStream systemStream,
java.lang.Object partitionKey,
java.lang.Object key,
java.lang.Object message)
Constructs a new OutgoingMessageEnvelope from specified components.
Parameters:
systemStream - Object representing the appropriate stream of which this envelope will be sent on.
partitionKey - A key representing which partition of the systemStream to send this envelope on.
key - A deserialized key to be used for the message.
message - A deserialized message to be sent in this envelope.
使用此方法将对您的数据进行分区。但是我认为,如果您正在考虑以编程方式控制分区数量,则应该使用 kafka API 到 create/alter 提到的主题 here