Akka Stream 动态 Sink 取决于来自 Kafka 主题的消息

Akka Stream dynamic Sink depending on Message from Kafka topic

我有一个读取 Message 的 Kafka 消费者。每条消息都有一个 ID 和内容。

case class Message(id: String, content: String)

根据ID,我想将消息写入单独的接收器。具体到 MongoDB 集合。 Mongo 提供一个 Sink 将其写入 DB 到指定的集合中。

val sink: Sink[Document, Future[Done]] = MongoSink.insertOne(collection(id))

问题是,我需要在连接 Kafka 消费者源时指定接收器,但每个元素都定义了它应该进入哪个接收器。 有没有一种方法可以在元素到达时动态使用特定的接收器。或者这是不可能的,例如,我应该为每个 ID 使用不同的 Kafka 主题并将每个源连接到一个单独的接收器?

您的示例中的类型排列方式并不完全清楚(例如 DocumentMessage 之间的关系),但您可以采用以下几种方法:

  • 如果有很多可能的集合并且无法提前知道它们,那么 Akka Streams 中最不坏的选择就是
Sink.foreachAsync[Message](parallelism) { msg =>
  val document = documentFromMessage(msg)
  val collection = collection(msg.id)
  Source.single(document).runWith(MongoSink.insertOne(collection))
}

请注意,这将为每条消息使用一个新的 Mongo 接收器,这可能会影响效率。请注意,如果有一种更轻量级的方式(例如在 reactivemongo 驱动程序中?)在插入单个文档后 returns a Future,但使用连接池之类的东西来减少单个文档插入的开销,那可能会更可取。

  • 如果集合事先已知,您可以为每个集合预构建接收器并使用 PartitionGraphDSL 定义包含预构建接收器的接收器
// collection0, etc. are predefined and encompass all of the collections which might be returned by collection(id)
val collections: Map[MongoCollection[Document], (Int, Sink[Document, Future[Done]])] = Map(
  collection0 -> (0 -> MongoSink.insertOne(collection0)),
  collection1 -> (1 -> MongoSink.insertOne(collection1)),
  collection2 -> (2 -> MongoSink.insertOne(collection2)),
  collection3 -> (3 -> MongoSink.insertOne(collection3))
)

val combinedSink = Sink.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val partition = builder.add(
    Partition[Message](
      collections.size,
      { msg => collections(collection(msg.id))._1 }
    )
  )

  val toDocument = Flow[Message].map(documentFromMessage)

  collections.foreach {
    case (_, (n, sink)) =>
      partition.out(n) ~> toDocument ~> sink
  }

  SinkShape.of(partition.in)
}