StructuredStreaming withWatermark - TypeError: 'module' object is not callable
StructuredStreaming withWatermark - TypeError: 'module' object is not callable
我在 GCP Dataproc 上有一个结构化流式 pyspark 程序 运行,它从 Kafka 读取数据,
并进行一些数据处理和聚合。
我正在尝试使用 withWatermark(),但出现错误。
代码如下:
df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.option("maxOffsetsPerTrigger", 10) \
.load()
# readStream calls foreachBatch(convertToDictForEachBatch)
query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp").writeStream \
.outputMode("append") \
.trigger(processingTime='10 seconds') \
.option("numRows",10)\
.option("truncate", "false") \
.option("checkpointLocation", checkpoint) \
.foreachBatch(convertToDictForEachBatch) \
.start()
convertToDictForEachBatch - 具有进行数据处理和聚合的代码
def convertToDictForEachBatch(df, batchId):
# d = df_stream.rdd.collect()
print(" IN CONVERT TO DICT ", batchId, " currentTime ", datetime.datetime.now(), " df -> ", df)
ll = df.rdd.map(lambda x: x[0])
res = []
# each row is parsed, and finally converted to rdd i.e. tdict)
tdict = ll.map(convertToDict)
# converting the tdict to DF, which is passed to Alarm class, where the data massaging & aggregation is done
dfnew = tdict.toDF()
ap = Alarm(tdict, spark)
#Aggregation code in Alarm call, which uses withWatermark
def computeCount(df_processedAlarm, df_totalAlarm):
processedAlarmCnt = None
if df_processedAlarm.count() > 0:
processedAlarmCnt = df_processedAlarm.withWatermark("timestamp", "10 seconds")\
.groupBy(
window(col("timestamp"), "1 minutes").alias("window")
).count()
Objective 上面的代码是计算 window 1 分钟的 processedAlarms 的计数,水印为 10 秒
错误:
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in call
raise e
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 193, in call
self.func(DataFrame(jdf, self.sql_ctx), batch_id)
File "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 444, in convertToDictForEachBatch
ap = Alarm(tdict, spark)
File "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 356, in __init__
computeCount(l_alarm_df, l_alarm1_df)
File "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 262, in computeCount
window(col("timestamp"), "10 minutes").alias("window")
TypeError: 'module' object is not callable
at py4j.Protocol.getReturnValue(Protocol.java:476)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
at com.sun.proxy.$Proxy33.call(Unknown Source)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch(ForeachBatchSink.scala:55)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$adapted(ForeachBatchSink.scala:55)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:586)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:584)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:226)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:194)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream(StreamExecution.scala:334)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:244)
Traceback (most recent call last):
debug/fix 这个问题需要做什么?
蒂亚!
如@ewertonvsilva 所述,这与导入错误有关。
具体来说 ->
from spark.sql.functions import window
导入更正后,问题得到解决。
我在 GCP Dataproc 上有一个结构化流式 pyspark 程序 运行,它从 Kafka 读取数据, 并进行一些数据处理和聚合。 我正在尝试使用 withWatermark(),但出现错误。
代码如下:
df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.option("maxOffsetsPerTrigger", 10) \
.load()
# readStream calls foreachBatch(convertToDictForEachBatch)
query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp").writeStream \
.outputMode("append") \
.trigger(processingTime='10 seconds') \
.option("numRows",10)\
.option("truncate", "false") \
.option("checkpointLocation", checkpoint) \
.foreachBatch(convertToDictForEachBatch) \
.start()
convertToDictForEachBatch - 具有进行数据处理和聚合的代码
def convertToDictForEachBatch(df, batchId):
# d = df_stream.rdd.collect()
print(" IN CONVERT TO DICT ", batchId, " currentTime ", datetime.datetime.now(), " df -> ", df)
ll = df.rdd.map(lambda x: x[0])
res = []
# each row is parsed, and finally converted to rdd i.e. tdict)
tdict = ll.map(convertToDict)
# converting the tdict to DF, which is passed to Alarm class, where the data massaging & aggregation is done
dfnew = tdict.toDF()
ap = Alarm(tdict, spark)
#Aggregation code in Alarm call, which uses withWatermark
def computeCount(df_processedAlarm, df_totalAlarm):
processedAlarmCnt = None
if df_processedAlarm.count() > 0:
processedAlarmCnt = df_processedAlarm.withWatermark("timestamp", "10 seconds")\
.groupBy(
window(col("timestamp"), "1 minutes").alias("window")
).count()
Objective 上面的代码是计算 window 1 分钟的 processedAlarms 的计数,水印为 10 秒
错误:
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in call
raise e
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 193, in call
self.func(DataFrame(jdf, self.sql_ctx), batch_id)
File "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 444, in convertToDictForEachBatch
ap = Alarm(tdict, spark)
File "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 356, in __init__
computeCount(l_alarm_df, l_alarm1_df)
File "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 262, in computeCount
window(col("timestamp"), "10 minutes").alias("window")
TypeError: 'module' object is not callable
at py4j.Protocol.getReturnValue(Protocol.java:476)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
at com.sun.proxy.$Proxy33.call(Unknown Source)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch(ForeachBatchSink.scala:55)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$adapted(ForeachBatchSink.scala:55)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:586)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:584)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:226)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:194)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream(StreamExecution.scala:334)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:244)
Traceback (most recent call last):
debug/fix 这个问题需要做什么? 蒂亚!
如@ewertonvsilva 所述,这与导入错误有关。 具体来说 ->
from spark.sql.functions import window
导入更正后,问题得到解决。