在 akka-stream-kafka 中是否可以进行基于哈希的排序?
Is hash based ordering possible in akka-stream-kafka?
我正在探索 akka-stream-kafka
我的一个用例并完成这个 documentation。根据文档,生产者接收器将有效负载即数据记录平均分配到所有 Kafka 分区中,这是合乎逻辑的。但是我想控制消息所在的分区。我的用例是,我将获得数百万行,键为 record_id
,现在我想发送相同 record_id
的所有记录让我们假设 1234
到同一个分区让我们假设分区号10
。所以总而言之,可以说我有 1000 条记录和 10 个分区。在这 1000 条记录中,有 3700 条带有 record_id
1234
。假设 kafka 将 record_id
发送到分区号 1。所以我希望所有这些 3700 条记录都通过分区 1,因为我想维护这些记录的顺序。其他 record_id
也是如此。文档中的 plainsink
实现将记录平均分配到所有分区。
有没有一种方法可以根据键的散列来控制记录流?
当您创建 ProducerRecord
时,您有机会提供一个您希望它结束的分区索引。
要计算分区索引,您可以简单地使用 recordId % numberOfPartitions
,并且您将确保具有相同 recordId
的所有邮件最终都在同一分区中。
示例如下:
val source: Source[Record, NotUsed] = ???
source
.map { record =>
val partition = record.recordId % 10
new ProducerRecord[Array[Byte], Record]("topic1", partition, null, record)
}
.runWith(Producer.plainSink(producerSettings))
我正在探索 akka-stream-kafka
我的一个用例并完成这个 documentation。根据文档,生产者接收器将有效负载即数据记录平均分配到所有 Kafka 分区中,这是合乎逻辑的。但是我想控制消息所在的分区。我的用例是,我将获得数百万行,键为 record_id
,现在我想发送相同 record_id
的所有记录让我们假设 1234
到同一个分区让我们假设分区号10
。所以总而言之,可以说我有 1000 条记录和 10 个分区。在这 1000 条记录中,有 3700 条带有 record_id
1234
。假设 kafka 将 record_id
发送到分区号 1。所以我希望所有这些 3700 条记录都通过分区 1,因为我想维护这些记录的顺序。其他 record_id
也是如此。文档中的 plainsink
实现将记录平均分配到所有分区。
有没有一种方法可以根据键的散列来控制记录流?
当您创建 ProducerRecord
时,您有机会提供一个您希望它结束的分区索引。
要计算分区索引,您可以简单地使用 recordId % numberOfPartitions
,并且您将确保具有相同 recordId
的所有邮件最终都在同一分区中。
示例如下:
val source: Source[Record, NotUsed] = ???
source
.map { record =>
val partition = record.recordId % 10
new ProducerRecord[Array[Byte], Record]("topic1", partition, null, record)
}
.runWith(Producer.plainSink(producerSettings))