StructuredStreaming withWatermark - TypeError: 'module' object is not callable

StructuredStreaming withWatermark - TypeError: 'module' object is not callable

我在 GCP Dataproc 上有一个结构化流式 pyspark 程序 运行,它从 Kafka 读取数据, 并进行一些数据处理和聚合。 我正在尝试使用 withWatermark(),但出现错误。

代码如下:

df_stream = spark.readStream.format('kafka') \
    .option("kafka.security.protocol", "SSL") \
    .option("kafka.ssl.truststore.location", ssl_truststore_location) \
    .option("kafka.ssl.truststore.password", ssl_truststore_password) \
    .option("kafka.ssl.keystore.location", ssl_keystore_location) \
    .option("kafka.ssl.keystore.password", ssl_keystore_password) \
    .option("kafka.bootstrap.servers",kafkaBrokers)\
    .option("subscribe", topic) \
    .option("kafka.group.id", consumerGroupId)\
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .option("maxOffsetsPerTrigger", 10) \
    .load()

# readStream calls foreachBatch(convertToDictForEachBatch)
query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp").writeStream \
    .outputMode("append") \
    .trigger(processingTime='10 seconds') \
    .option("numRows",10)\
    .option("truncate", "false") \
    .option("checkpointLocation", checkpoint) \
    .foreachBatch(convertToDictForEachBatch) \
    .start()

convertToDictForEachBatch - 具有进行数据处理和聚合的代码

def convertToDictForEachBatch(df, batchId):
    # d = df_stream.rdd.collect()
    print(" IN CONVERT TO DICT ", batchId, " currentTime ", datetime.datetime.now(), " df -> ", df)
    ll = df.rdd.map(lambda x: x[0])
    res = []
    # each row is parsed, and finally converted to rdd i.e. tdict) 
    tdict = ll.map(convertToDict)
    # converting the tdict to DF, which is passed to Alarm class, where the data massaging & aggregation is done
    dfnew = tdict.toDF()
    ap = Alarm(tdict, spark)


    #Aggregation code in Alarm call, which uses withWatermark
     def computeCount(df_processedAlarm, df_totalAlarm):
          processedAlarmCnt = None
          if df_processedAlarm.count() > 0:
               processedAlarmCnt = df_processedAlarm.withWatermark("timestamp", "10 seconds")\
               .groupBy(
                    window(col("timestamp"), "1 minutes").alias("window")
                ).count()

Objective 上面的代码是计算 window 1 分钟的 processedAlarms 的计数,水印为 10 秒

错误:

py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in call
    raise e
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 193, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
  File "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 444, in convertToDictForEachBatch
    ap = Alarm(tdict, spark)
  File "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 356, in __init__
    computeCount(l_alarm_df, l_alarm1_df)
  File "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 262, in computeCount
    window(col("timestamp"), "10 minutes").alias("window")
TypeError: 'module' object is not callable

    at py4j.Protocol.getReturnValue(Protocol.java:476)
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
    at com.sun.proxy.$Proxy33.call(Unknown Source)
    at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch(ForeachBatchSink.scala:55)
    at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$adapted(ForeachBatchSink.scala:55)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:586)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch(MicroBatchExecution.scala:584)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:226)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:194)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream(StreamExecution.scala:334)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:244)
Traceback (most recent call last):

debug/fix 这个问题需要做什么? 蒂亚!

如@ewertonvsilva 所述,这与导入错误有关。 具体来说 ->

from spark.sql.functions import window

导入更正后,问题得到解决。