如何将 Doobie 生成的 FS2 Stream 发布到 Kafka

How to publish an FS2 Stream generated by Doobie to Kafka

我想将一长串事件发布到 Kafka 中,消耗一个 fs2.Stream,对应于一个非常大的数据库行列表,如果编译到列表,最终会导致内存不足错误。

所以,假设我有一个非常大的 UUID 键列表,其中包含数百万条记录:

def getKeyStream(timeRangeEnd: LocalDateTime): fs2.Stream[doobie.ConnectionIO, UUID]

并且我想使用此发布者将事件发布到 Kafka 中,该事件对应于包含 500 个键的 chunk

trait KeyPublisher {
  def publish(event: ChunkOfKeys): IO[Long]
}

我想创建一个函数来 enqueue/publish 此流进入 Kafka:

def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime): IO[Unit] = {
  getKeyStream(endDateTime)
     .chunkN(500)
     .evalMap(myChunk => ?????)
     ...
}

如何使用数据库产生的流,将其拆分为固定大小的块,然后将每个块发布到 Kafka 中?

显然很难找到关于这个主题的好的文档或示例。你能给我指明正确的方向吗?

因为你没有说 ChunkOfKeys 是什么类型,我假设它类似于 Chunk[UUID]

def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime)(
    xa: Transactor[IO],
    publisher: KeyPublisher
): IO[Unit] =
  getKeyStream(endDateTime)
    .transact(xa) // Convert the ConnectionIO stream to Stream[IO, UUID]
    .chunkN(500)  // into Stream[IO, Chunk[UUID]]
    .evalMap(publisher.publish)  // Into Stream[IO, Long]
    .compile
    .drain // An IO[Unit] that describes the whole process