Pyspark Kafka 偏移范围单位

Pyspark Kafka offset range units

我正在使用 Spark 作为批处理来处理来自 kafka 的日志。 在每个周期中,我的代码应该得到到达 kafka 消费者的任何东西。但是,我想限制每个周期从 kafka 获取的数据量。假设 5 GB 或 500000 行日志..

offsetRanges = []
def storeOffsetRanges(rdd):
    global offsetRanges
    offsetRanges = rdd.offsetRanges()
    WRITE OFFSETS TO DISK
    return rdd

while True:
    host = "localhost:9092"
    offset = OffsetRange(topic, 0, fromOffset, untilOffset)
    kafka_content = KafkaUtils.createRDD(sc, {"metadata.broker.list": host}, [offset])
    kafka_content.transform(storeOffsetRanges)
    RDD TRANSFORMATIONS..

我会将偏移量存储在内存和磁盘中,以防驱动程序出现故障。但是我怎样才能强加这些卡夫卡偏移量来限制每个周期的最大数据量呢? kafka偏移范围的单位是什么??

提前致谢!

Kafka 偏移量单元是消息。在每个周期中,您最多会收到来自 Kafka 的 untilOffest - fromOffset 条消息。但是数据只会从一个主题分区中读取,因此如果您的主题有更多分区,那么应用程序将丢失一些日志行。

作为替代方案,您可以尝试 spark streaming with kafka direct approach. Using this method you will get rid of while True, you will work with log lines in microbatches based on time (not fixed offsets) with optional backpressure mechanism. Then you can omit saving offsets in memory (streaming will handle it), but saving them to disk is still necessary in case of driver restart (see fromOffsets in KafkaUtils.createDirectStream)。