如何将 Doobie 生成的 FS2 Stream 发布到 Kafka
How to publish an FS2 Stream generated by Doobie to Kafka
我想将一长串事件发布到 Kafka 中,消耗一个 fs2.Stream,对应于一个非常大的数据库行列表,如果编译到列表,最终会导致内存不足错误。
所以,假设我有一个非常大的 UUID 键列表,其中包含数百万条记录:
def getKeyStream(timeRangeEnd: LocalDateTime): fs2.Stream[doobie.ConnectionIO, UUID]
并且我想使用此发布者将事件发布到 Kafka 中,该事件对应于包含 500 个键的 chunk:
trait KeyPublisher {
def publish(event: ChunkOfKeys): IO[Long]
}
我想创建一个函数来 enqueue/publish 此流进入 Kafka:
def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime): IO[Unit] = {
getKeyStream(endDateTime)
.chunkN(500)
.evalMap(myChunk => ?????)
...
}
如何使用数据库产生的流,将其拆分为固定大小的块,然后将每个块发布到 Kafka 中?
显然很难找到关于这个主题的好的文档或示例。你能给我指明正确的方向吗?
因为你没有说 ChunkOfKeys
是什么类型,我假设它类似于 Chunk[UUID]
def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime)(
xa: Transactor[IO],
publisher: KeyPublisher
): IO[Unit] =
getKeyStream(endDateTime)
.transact(xa) // Convert the ConnectionIO stream to Stream[IO, UUID]
.chunkN(500) // into Stream[IO, Chunk[UUID]]
.evalMap(publisher.publish) // Into Stream[IO, Long]
.compile
.drain // An IO[Unit] that describes the whole process
我想将一长串事件发布到 Kafka 中,消耗一个 fs2.Stream,对应于一个非常大的数据库行列表,如果编译到列表,最终会导致内存不足错误。
所以,假设我有一个非常大的 UUID 键列表,其中包含数百万条记录:
def getKeyStream(timeRangeEnd: LocalDateTime): fs2.Stream[doobie.ConnectionIO, UUID]
并且我想使用此发布者将事件发布到 Kafka 中,该事件对应于包含 500 个键的 chunk:
trait KeyPublisher {
def publish(event: ChunkOfKeys): IO[Long]
}
我想创建一个函数来 enqueue/publish 此流进入 Kafka:
def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime): IO[Unit] = {
getKeyStream(endDateTime)
.chunkN(500)
.evalMap(myChunk => ?????)
...
}
如何使用数据库产生的流,将其拆分为固定大小的块,然后将每个块发布到 Kafka 中?
显然很难找到关于这个主题的好的文档或示例。你能给我指明正确的方向吗?
因为你没有说 ChunkOfKeys
是什么类型,我假设它类似于 Chunk[UUID]
def enqueueKeyStreamIntoKafka(endDateTime: LocalDateTime)(
xa: Transactor[IO],
publisher: KeyPublisher
): IO[Unit] =
getKeyStream(endDateTime)
.transact(xa) // Convert the ConnectionIO stream to Stream[IO, UUID]
.chunkN(500) // into Stream[IO, Chunk[UUID]]
.evalMap(publisher.publish) // Into Stream[IO, Long]
.compile
.drain // An IO[Unit] that describes the whole process