重置偏移量和寻找最新偏移量的无限循环

Infinite loop of Resetting offset and seeking for LATEST offset

我正在尝试执行一个简单的 spark 结构化流应用程序,该应用程序目前对从本地 Kafka 集群中提取数据并写入本地文件系统没有太多期望。代码如下所示:

    private static final String TARGET_PATH = "orchestration/target/myfolder/";

    private static final String BOOTSTRAP_SERVER = "localhost:9092";
    private static final String KAFKA_TOPIC = "twitter_aapl2";

    public static void main(String[] args) throws TimeoutException, StreamingQueryException {

        SparkSession spark = SparkSession.builder().master("local[*]").appName("spark app").getOrCreate();

        Dataset<Row> df = spark.readStream().format("kafka")
                .option("kafka.bootstrap.servers", BOOTSTRAP_SERVER)
                .option("subscribe", KAFKA_TOPIC)
                .load();

        StreamingQuery query = df.writeStream()
                .outputMode("append")
                .format("parquet")
                .option("path", TARGET_PATH + "data/")
                .option("checkpointLocation", TARGET_PATH + "checkpoints/")
                .start();
        query.awaitTermination();


但是在执行时我得到了以下输出,我的数据并没有真正得到处理。

21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Resetting offset for partition twitter_aapl2-0 to offset 128.
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Seeking to LATEST offset of partition twitter_aapl2-0
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Resetting offset for partition twitter_aapl2-0 to offset 128.
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Seeking to LATEST offset of partition twitter_aapl2-0
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Resetting offset for partition twitter_aapl2-0 to offset 128.
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Seeking to LATEST offset of partition twitter_aapl2-0
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Resetting offset for partition twitter_aapl2-0 to offset 128.
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Seeking to LATEST offset of partition twitter_aapl2-0
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Resetting offset for partition twitter_aapl2-0 to offset 128.
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Seeking to LATEST offset of partition twitter_aapl2-0
21/01/20 16:54:08 INFO SubscriptionState: [Consumer clientId=consumer-spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0-1, groupId=spark-kafka-source-13ec7de0-97da-499c-a507-d8e4baa764dd-1008481407-driver-0] Resetting offset for partition twitter_aapl2-0 to offset 128.

我该如何解决这个问题?

事实证明,如果一个人不是从头开始阅读主题,而是从最新的偏移量开始阅读,这种寻找和重置的行为是非常可取的。然后管道只读取发送到 Kafka 主题的新数据,而它是 运行 并且由于没有发送新数据,寻找(新数据)和重置(到最新偏移量)的无限循环。

最重要的是,只需从头读取或发送新数据即可解决问题。

我不得不通过设置日志配置来避免这种情况:

log4j.logger.org.apache.kafka.clients.consumer.internals.SubscriptionState=WARN

虽然这是预期的行为,但似乎没有必要每隔几毫秒记录一次 'Seeking to LATEST offset' 消息。它掩盖了日志文件中的所有其他应用程序日志。当从一直不是很活跃的主题中消费时,这个问题变得更加令人担忧。如果只是在 DEBUG 级别而不是 INFO 级别会更好。