每个微批次 Spark Streaming 中处理的总记录数
Total records processed in each micro batch spark streaming
有没有办法找到每个微批次有多少记录被处理到下游增量 table 中。我有流媒体作业,使用 trigger.once() 和 附加模式 ,每小时 运行 一次。出于审计目的,我想知道每个微批处理了多少条记录。我试过下面的代码来打印处理的记录数(显示在第二行)。
ss_count=0
def write_to_managed_table(micro_batch_df, batchId):
#print(f"inside foreachBatch for batch_id:{batchId}, rows in passed dataframe: {micro_batch_df.count()}")
ss_count = micro_batch_df.count()
saveloc = "TABLE_PATH"
df_final.writeStream.trigger(once=True).foreachBatch(write_to_managed_table).option('checkpointLocation', f"{saveloc}/_checkpoint").start(saveloc)
print(ss_count)
流式作业 运行 没有任何问题,但 micro_batch_df.count() 不会打印任何计数。
任何指点将不胜感激。
这是您正在寻找的工作示例 (structured_steaming_example.py):
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("StructuredStreamTesting") \
.getOrCreate()
# Create DataFrame representing the stream of input
df = spark.read.parquet("data/")
lines = spark.readStream.schema(df.schema).parquet("data/")
def batch_write(output_df, batch_id):
print("inside foreachBatch for batch_id:{0}, rows in passed dataframe: {1}".format(batch_id, output_df.count()))
save_loc = "/tmp/example"
query = (lines.writeStream.trigger(once=True)
.foreachBatch(batch_write)
.option('checkpointLocation', save_loc + "/_checkpoint")
.start(save_loc)
)
query.awaitTermination()
已附加示例镶木地板文件。请将其放入数据文件夹并使用 spark-submit
执行代码
spark-submit --master local structured_steaming_example.py
请将任何示例 parquet 文件放在数据文件夹下进行测试。
有没有办法找到每个微批次有多少记录被处理到下游增量 table 中。我有流媒体作业,使用 trigger.once() 和 附加模式 ,每小时 运行 一次。出于审计目的,我想知道每个微批处理了多少条记录。我试过下面的代码来打印处理的记录数(显示在第二行)。
ss_count=0
def write_to_managed_table(micro_batch_df, batchId):
#print(f"inside foreachBatch for batch_id:{batchId}, rows in passed dataframe: {micro_batch_df.count()}")
ss_count = micro_batch_df.count()
saveloc = "TABLE_PATH"
df_final.writeStream.trigger(once=True).foreachBatch(write_to_managed_table).option('checkpointLocation', f"{saveloc}/_checkpoint").start(saveloc)
print(ss_count)
流式作业 运行 没有任何问题,但 micro_batch_df.count() 不会打印任何计数。
任何指点将不胜感激。
这是您正在寻找的工作示例 (structured_steaming_example.py):
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("StructuredStreamTesting") \
.getOrCreate()
# Create DataFrame representing the stream of input
df = spark.read.parquet("data/")
lines = spark.readStream.schema(df.schema).parquet("data/")
def batch_write(output_df, batch_id):
print("inside foreachBatch for batch_id:{0}, rows in passed dataframe: {1}".format(batch_id, output_df.count()))
save_loc = "/tmp/example"
query = (lines.writeStream.trigger(once=True)
.foreachBatch(batch_write)
.option('checkpointLocation', save_loc + "/_checkpoint")
.start(save_loc)
)
query.awaitTermination()
已附加示例镶木地板文件。请将其放入数据文件夹并使用 spark-submit
执行代码spark-submit --master local structured_steaming_example.py
请将任何示例 parquet 文件放在数据文件夹下进行测试。