在 akka-stream-kafka 中是否可以进行基于哈希的排序?

Is hash based ordering possible in akka-stream-kafka?

我正在探索 akka-stream-kafka 我的一个用例并完成这个 documentation。根据文档,生产者接收器将有效负载即数据记录平均分配到所有 Kafka 分区中,这是合乎逻辑的。但是我想控制消息所在的分区。我的用例是,我将获得数百万行,键为 record_id,现在我想发送相同 record_id 的所有记录让我们假设 1234 到同一个分区让我们假设分区号10。所以总而言之,可以说我有 1000 条记录和 10 个分区。在这 1000 条记录中,有 3700 条带有 record_id 1234。假设 kafka 将 record_id 发送到分区号 1。所以我希望所有这些 3700 条记录都通过分区 1,因为我想维护这些记录的顺序。其他 record_id 也是如此。文档中的 plainsink 实现将记录平均分配到所有分区。

有没有一种方法可以根据键的散列来控制记录流?

当您创建 ProducerRecord 时,您有机会提供一个您希望它结束​​的分区索引。

要计算分区索引,您可以简单地使用 recordId % numberOfPartitions,并且您将确保具有相同 recordId 的所有邮件最终都在同一分区中。

示例如下:

  val source: Source[Record, NotUsed] = ???

  source
    .map { record =>
      val partition = record.recordId % 10
      new ProducerRecord[Array[Byte], Record]("topic1", partition, null, record)
    }
    .runWith(Producer.plainSink(producerSettings))