结构化流 - 使用 writeStream 时不将记录写入控制台(批处理似乎有效)

Structured Streaming - not writing records to console when using writeStream (batch seems to be working)

我有一个简单的 Structured Streaming 程序,它从 Kafka 读取数据,然后写入控制台。 这在批处理模式下工作(即 spark.read 或 df.write),而不是在流模式下工作。

用于文件查询的命令:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 /Users/karanalang/PycharmProjects/Kafka/StructuredStreaming_GCP_Versa_Sase.py

代码如下:

import sys, datetime, time, os
from pyspark.sql.functions import col, rank, dense_rank, to_date, to_timestamp, format_number, row_number, lead, lag,monotonically_increasing_id
from pyspark.sql import SparkSession, Window

spark = SparkSession.builder.appName('StructuredStreaming_Kafka_Streaming').getOrCreate()
# os.environ["SPARK_HOME"] = "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2"
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.13:3.2.0'

kafkaBrokers='<IP>:9094'
topic = "versa-sase"
security_protocol="SSL"
ssl_truststore_location="/Users/karanalang/Documents/Technology/strimzi/Install/versa_ssl_certs/dataproc-versa-sase/ca.p12"
ssl_truststore_password="<pwd>"
ssl_keystore_location="/Users/karanalang/Documents/Technology/strimzi/Install/versa_ssl_certs/dataproc-versa-sase/dataproc-versa-sase.p12"
ssl_keystore_password="<pwd>"
consumerGroupId = "versa-sase-grp"

print(" SPARK.SPARKCONTEXT -> ", spark.sparkContext)

spark.sparkContext.setLogLevel("ERROR")

df_stream = spark.readStream.format('kafka') \
    .option("kafka.security.protocol", "SSL") \
    .option("kafka.ssl.truststore.location", ssl_truststore_location) \
    .option("kafka.ssl.truststore.password", ssl_truststore_password) \
    .option("kafka.ssl.keystore.location", ssl_keystore_location) \
    .option("kafka.ssl.keystore.password", ssl_keystore_password) \
    .option("kafka.bootstrap.servers",kafkaBrokers)\
    .option("subscribe", topic) \
    .option("kafka.group.id", consumerGroupId)\
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", 10) \
    .load()

print("df_stream -> ", df_stream, type(df_stream))
# df_stream ->  DataFrame[key: binary, value: binary, topic: string, partition: # int, offset: bigint, timestamp: timestamp, timestampType: int] <class #'pyspark.sql.dataframe.DataFrame'>

# query = df_stream.select("value", "topic","partition","timestamp") \
query = df_stream.select("value", "topic","partition","timestamp")
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .trigger(processingTime='30 seconds') \
    .option("numRows",10)\
    .option("checkpointLocation", "checkpoint") \
    .start()

query.awaitTermination()

我期待数据在控制台上写出,但实际上什么也没有发生。 当我启用调试(级别 - DEBUG)时,这是我看到的:

34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to LATEST offset of partition versa-sase-1
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=1561146, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:26 INFO KafkaConsumer: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to offset 47160 for partition versa-sase-1
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to EARLIEST offset of partition versa-sase-1
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to LATEST offset of partition versa-sase-1
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=1561150, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:26 INFO KafkaConsumer: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to offset 47660 for partition versa-sase-1
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to EARLIEST offset of partition versa-sase-1
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:26 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to LATEST offset of partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=1561152, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:27 INFO KafkaConsumer: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to offset 48160 for partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to EARLIEST offset of partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to LATEST offset of partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=1561154, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:27 INFO KafkaConsumer: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to offset 48660 for partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to EARLIEST offset of partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to LATEST offset of partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Resetting offset for partition versa-sase-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.75.249.190:9094 (id: 2 rack: null)], epoch=17}}.
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Seeking to LATEST offset of partition versa-sase-0
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Resetting offset for partition versa-sase-1 to position FetchPosition{offset=1561161, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.138.122.164:9094 (id: 1 rack: null)], epoch=20}}.
22/02/01 15:30:27 INFO KafkaConsumer: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to offset 49160 for partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to EARLIEST offset of partition versa-sase-1
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Resetting offset for partition versa-sase-0 to position FetchPosition{offset=1560872, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.75.249.190:9094 (id: 2 rack: null)], epoch=17}}.
22/02/01 15:30:27 INFO KafkaConsumer: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Seeking to offset 49385 for partition versa-sase-0
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Seeking to EARLIEST offset of partition versa-sase-0
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Resetting offset for partition versa-sase-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.75.249.190:9094 (id: 2 rack: null)], epoch=17}}.
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Seeking to LATEST offset of partition versa-sase-0
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Resetting offset for partition versa-sase-0 to position FetchPosition{offset=1560877, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.75.249.190:9094 (id: 2 rack: null)], epoch=17}}.
22/02/01 15:30:27 INFO KafkaConsumer: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Seeking to offset 49885 for partition versa-sase-0
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Seeking to EARLIEST offset of partition versa-sase-0
^C22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Resetting offset for partition versa-sase-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.75.249.190:9094 (id: 2 rack: null)], epoch=17}}.
22/02/01 15:30:27 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Seeking to LATEST offset of partition versa-sase-0

ADDITIONAL LOGS:

22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20778 requested 20778
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20779 requested 20779
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20780 requested 20780
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20781 requested 20781
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20782 requested 20782
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20783 requested 20783
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20784 requested 20784
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20785 requested 20785
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20786 requested 20786
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-2 nextOffset 20787 requested 20787
22/02/01 15:39:21 DEBUG InternalKafkaConsumer: Seeking to versa-sase-grp versa-sase-2 20787
22/02/01 15:39:21 INFO KafkaConsumer: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to offset 20787 for partition versa-sase-2
22/02/01 15:39:21 DEBUG InternalKafkaConsumer: Polled versa-sase-grp [versa-sase-2]  500
22/02/01 15:39:21 DEBUG InternalKafkaConsumer: Offset changed from 20787 to 21287 after polling
22/02/01 15:39:21 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Seeking to EARLIEST offset of partition versa-sase-2
22/02/01 15:39:21 DEBUG Fetcher: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Sending ListOffsetRequest ListOffsetsRequestData(replicaId=-1, isolationLevel=0, topics=[ListOffsetsTopic(name='versa-sase', partitions=[ListOffsetsPartition(partitionIndex=2, currentLeaderEpoch=15, timestamp=-2, maxNumOffsets=1)])]) to broker 34.138.248.133:9094 (id: 0 rack: null)
22/02/01 15:39:21 DEBUG NetworkClient: [Consumer clientId=consumer-versa-sase-grp-2, groupId=versa-sase-grp] Sending LIST_OFFSETS request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=6, clientId=consumer-versa-sase-grp-2, correlationId=109) and timeout 30000 to node 0: ListOffsetsRequestData(replicaId=-1, isolationLevel=0, topics=[ListOffsetsTopic(name='versa-sase', partitions=[ListOffsetsPartition(partitionIndex=2, currentLeaderEpoch=15, timestamp=-2, maxNumOffsets=1)])])
22/02/01 15:39:21 INFO SparkContext: Successfully stopped SparkContext
22/02/01 15:39:21 INFO ShutdownHookManager: Shutdown hook called
22/02/01 15:39:21 INFO ShutdownHookManager: Deleting directory /private/var/folders/yp/p_9783hn1fg_dyzbf4sgnxqh0000gn/T/spark-6b854d12-55ad-48d5-bca1-e1c4a6992797
22/02/01 15:39:21 INFO ShutdownHookManager: Deleting directory /private/var/folders/yp/p_9783hn1fg_dyzbf4sgnxqh0000gn/T/spark-6b854d12-55ad-48d5-bca1-e1c4a6992797/pyspark-df44cb1a-c18c-4357-992c-47eff25f7eb0
22/02/01 15:39:21 INFO ShutdownHookManager: Deleting directory /private/var/folders/yp/p_9783hn1fg_dyzbf4sgnxqh0000gn/T/spark-edac7493-5da7-4a41-88a5-1203aa1b7ac1
22/02/01 15:39:21 DEBUG FileSystem: FileSystem.close() by method: org.apache.hadoop.fs.FilterFileSystem.close(FilterFileSystem.java:529)); Key: (karanalang (auth:SIMPLE))@file://; URI: file:///; Object Identity Hash: 3411b055
22/02/01 15:39:21 DEBUG FileSystem: FileSystem.close() by method: org.apache.hadoop.fs.RawLocalFileSystem.close(RawLocalFileSystem.java:759)); Key: null; URI: file:///; Object Identity Hash: f0683e4
22/02/01 15:39:21 DEBUG ShutdownHookManager: Completed shutdown in 0.238 seconds; Timeouts: 0
22/02/01 15:39:21 DEBUG NetworkClient: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Received LIST_OFFSETS response from node 2 for request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=6, clientId=consumer-versa-sase-grp-3, correlationId=106): ListOffsetsResponseData(throttleTimeMs=0, topics=[ListOffsetsTopicResponse(name='versa-sase', partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0, oldStyleOffsets=[], timestamp=-1, offset=1570867, leaderEpoch=17)])])
22/02/01 15:39:21 DEBUG Fetcher: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Handling ListOffsetResponse response for versa-sase-0. Fetched offset 1570867, timestamp -1
22/02/01 15:39:21 DEBUG Metadata: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Not replacing existing epoch 17 with new epoch 17 for partition versa-sase-0
22/02/01 15:39:21 INFO SubscriptionState: [Consumer clientId=consumer-versa-sase-grp-3, groupId=versa-sase-grp] Resetting offset for partition versa-sase-0 to position FetchPosition{offset=1570867, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[34.75.249.190:9094 (id: 2 rack: null)], epoch=17}}.
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19800 requested 19800
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19801 requested 19801
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19802 requested 19802
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19803 requested 19803
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19804 requested 19804
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19805 requested 19805
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19806 requested 19806
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19807 requested 19807
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19808 requested 19808
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19809 requested 19809
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19810 requested 19810
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19811 requested 19811
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19812 requested 19812
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19813 requested 19813
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19814 requested 19814
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19815 requested 19815
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19816 requested 19816
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19817 requested 19817
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19818 requested 19818
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19819 requested 19819
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19820 requested 19820
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19821 requested 19821
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19822 requested 19822
22/02/01 15:39:21 DEBUG KafkaDataConsumer: Get versa-sase-grp versa-sase-0 nextOffset 19823 requested 19823




debug/fix 这需要做什么? 正如我提到的 - 批处理代码能够从 Kafka 读取数据,并在控制台上打印记录!

蒂亚!

更新: 在调试模式(用于流式传输)中,我看到以下错误, 不确定这是一个问题.. 因为在此之后,日志表明 spark 能够从 Kafka 获取偏移量。 另外,令人惊讶的是——我在批处理模式下没有看到这个错误

22/02/01 16:07:36 DEBUG UserGroupInformation: PrivilegedAction [as: karanalang (auth:SIMPLE)][action: org.apache.hadoop.fs.FileContext@6a01a2c]
java.lang.Exception
   at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
   at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339)
   at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465)
   at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.<init>(CheckpointFileManager.scala:312)
   at org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:202)
   at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.<init>(HDFSMetadataLog.scala:61)
   at org.apache.spark.sql.kafka010.KafkaSourceInitialOffsetWriter.<init>(KafkaSourceInitialOffsetWriter.scala:32)
   at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:227)
   at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:90)
   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:512)
   at scala.Option.getOrElse(Option.scala:189)
   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:512)
   at scala.collection.TraversableLike.$anonfun$flatMap(TraversableLike.scala:293)
   at scala.collection.Iterator.foreach(Iterator.scala:943)
   at scala.collection.Iterator.foreach$(Iterator.scala:943)
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
   at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
   at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
   at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
   at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:492)
   at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
   at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
   at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:492)
   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:228)
   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
   at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
   at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:193)
   at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
   at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
   at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream(StreamExecution.scala:303)
   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
   at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
   at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:209)
22/02/01 16:07:36 DEBUG MicroBatchExecution: Retrieving data from KafkaV2[Subscribe[versa-sase]]: None -> {"versa-sase":{"2":1373669,"1":1373507,"0":1372412}}
22/02/01 16:07:36 DEBUG MicroBatchExecution: getBatch took 17 ms
22/02/01 16:07:36 INFO KafkaOffsetReaderConsumer: Partitions added: Map()
22/02/01 16:07:36 DEBUG KafkaOffsetReaderConsumer: TopicPartitions: versa-sase-2, versa-sase-1, versa-sase-0
22/02/01 16:07:36 INFO KafkaOffsetReaderConsumer: Partitions added: Map()

Structured Streaming 正在尝试检查点到正确设置的“检查点”目录,但似乎作业本身无权这样做。如果检查点不工作,那么工作负载停止并且没有进行是完全有效的。

请仔细检查Hadoop FS 无法写入的原因和re-run 工作量。 只是一个提示,也许可以在设置“checkpointLocation”时使用完整路径以更明确地使用哪个目录。