pyspark 结构化流不使用 query.lastProgress 或其他标准指标更新查询指标
pyspark structured streaming not updating query metrics with query.lastProgress or other standard metrics
我正在尝试将日志记录添加到我的 pyspark 结构化流应用程序,以便查看有关每个微批处理的进度和统计信息。 writestream 方法使用 foreach 编写器将数据帧中的行写入 postgres 数据库。我正在使用 .lastProgress
和 pyspark 提供的其他标准指标进行日志记录。 writestream 方法和我尝试记录如下所示。
query_1 = eventsDF \
.writeStream \
.foreach(writer) \
.outputMode("append") \
.option("checkpointLocation", "/tmp/checkpoint_a/") \
.trigger(processingTime="5 seconds") \
.start()
query_progress = query_1.lastProgress
print("progress ", query_progress)
print("status ", query_1.status)
print("active ", query_1.isActive)
query_1.awaitTermination()
我的第一个循环的结果是:
progress None
status {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}
active True
但是,当事件数据到达时处理更多的批次不会产生更多的日志消息。我希望在流作业中处理每个微批处理后都会发出日志消息。
感谢任何建议或指导。谢谢
start
和 awaitTermination
之间的所有代码只执行一次。只有 load
和 start
之间的代码会在每个查询触发器上连续执行。
根据《Spark - 权威指南》一书,这种监视方式旨在 运行 您的应用程序内部。但是,对于独立应用程序,您通常不会将 shell 附加到 运行 任意代码。在书中,他们建议“通过实施监控服务器来公开 [查询] 状态,例如侦听端口的小型 HTTP 服务器,并在收到请求时 returns query.status
。
因此,需要创建一个专用的运行able线程,频繁调用查询的监控API。我真的不熟悉 Python,但它基本上如下所示:
# import the threading module
import threading
class thread(threading.Thread):
def __init__(self, query):
threading.Thread.__init__(self)
self.query = query
# helper function to execute the threads
def run(self):
print("progress ", query.lastProgress);
完成后,您需要将其放在 start
和 awaitTermination
之间:
query_1 = eventsDF \
[...]
.start()
monitoring = thread(query_1)
query_1.awaitTermination()
除了专用线程,您还可以使用 while(query_1.isActive)
.
循环查询状态
对于 Scala 用户:
How to get progress of streaming query after awaitTermination?
我正在尝试将日志记录添加到我的 pyspark 结构化流应用程序,以便查看有关每个微批处理的进度和统计信息。 writestream 方法使用 foreach 编写器将数据帧中的行写入 postgres 数据库。我正在使用 .lastProgress
和 pyspark 提供的其他标准指标进行日志记录。 writestream 方法和我尝试记录如下所示。
query_1 = eventsDF \
.writeStream \
.foreach(writer) \
.outputMode("append") \
.option("checkpointLocation", "/tmp/checkpoint_a/") \
.trigger(processingTime="5 seconds") \
.start()
query_progress = query_1.lastProgress
print("progress ", query_progress)
print("status ", query_1.status)
print("active ", query_1.isActive)
query_1.awaitTermination()
我的第一个循环的结果是:
progress None
status {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}
active True
但是,当事件数据到达时处理更多的批次不会产生更多的日志消息。我希望在流作业中处理每个微批处理后都会发出日志消息。
感谢任何建议或指导。谢谢
start
和 awaitTermination
之间的所有代码只执行一次。只有 load
和 start
之间的代码会在每个查询触发器上连续执行。
根据《Spark - 权威指南》一书,这种监视方式旨在 运行 您的应用程序内部。但是,对于独立应用程序,您通常不会将 shell 附加到 运行 任意代码。在书中,他们建议“通过实施监控服务器来公开 [查询] 状态,例如侦听端口的小型 HTTP 服务器,并在收到请求时 returns query.status
。
因此,需要创建一个专用的运行able线程,频繁调用查询的监控API。我真的不熟悉 Python,但它基本上如下所示:
# import the threading module
import threading
class thread(threading.Thread):
def __init__(self, query):
threading.Thread.__init__(self)
self.query = query
# helper function to execute the threads
def run(self):
print("progress ", query.lastProgress);
完成后,您需要将其放在 start
和 awaitTermination
之间:
query_1 = eventsDF \
[...]
.start()
monitoring = thread(query_1)
query_1.awaitTermination()
除了专用线程,您还可以使用 while(query_1.isActive)
.
对于 Scala 用户:
How to get progress of streaming query after awaitTermination?