每个微批次 Spark Streaming 中处理的总记录数

Total records processed in each micro batch spark streaming

有没有办法找到每个微批次有多少记录被处理到下游增量 table 中。我有流媒体作业,使用 trigger.once() 附加模式 ,每小时 运行 一次。出于审计目的,我想知道每个微批处理了多少条记录。我试过下面的代码来打印处理的记录数(显示在第二行)。

ss_count=0 

def write_to_managed_table(micro_batch_df, batchId):
#print(f"inside foreachBatch for batch_id:{batchId}, rows in passed dataframe: {micro_batch_df.count()}")

ss_count = micro_batch_df.count()

saveloc = "TABLE_PATH"
df_final.writeStream.trigger(once=True).foreachBatch(write_to_managed_table).option('checkpointLocation', f"{saveloc}/_checkpoint").start(saveloc)

print(ss_count)

流式作业 运行 没有任何问题,但 micro_batch_df.count() 不会打印任何计数。

任何指点将不胜感激。

这是您正在寻找的工作示例 (structured_steaming_example.py):

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("StructuredStreamTesting") \
    .getOrCreate()

# Create DataFrame representing the stream of input
df = spark.read.parquet("data/")
lines = spark.readStream.schema(df.schema).parquet("data/")


def batch_write(output_df, batch_id):
    print("inside foreachBatch for batch_id:{0}, rows in passed dataframe: {1}".format(batch_id, output_df.count()))


save_loc = "/tmp/example"
query = (lines.writeStream.trigger(once=True)
         .foreachBatch(batch_write)
         .option('checkpointLocation', save_loc + "/_checkpoint")
         .start(save_loc)
         )
query.awaitTermination()

已附加示例镶木地板文件。请将其放入数据文件夹并使用 spark-submit

执行代码
spark-submit --master local structured_steaming_example.py

请将任何示例 parquet 文件放在数据文件夹下进行测试。