pyspark 数据框中的聚合和一年中的一周
aggregation and week of the year in pyspark dataframe
我在数据框中有以下架构
root
|-- device_id: string (nullable = true)
|-- eventName: string (nullable = true)
|-- client_event_time: timestamp (nullable = true)
|-- eventDate: date (nullable = true)
|-- deviceType: string (nullable = true)
我想在此数据框中添加以下两列:
WAU:每周活跃用户数(按周分组的不同设备 ID)
week:一年中的第几周(需要使用适当的SQL函数)
我想使用 approx_count_distinct。
可选关键字 rsd 也需要设置为 .01。
我试着开始写类似下面的东西,但出现错误。
spark.readStream
.format("delta")
.load(inputpath)
.groupBy(weekofyear('eventDate'))
.count()
.distinct()
.writeStream
.format("delta")
.option("checkpointLocation", outputpath)
.outputMode("complete")
.start(outputpath)
根据讨论,下面的代码有效。
spark.readStream
.format("delta")
.load(inputdata)
.groupBy(weekofyear('eventDate').alias('week'))
.agg(F.approx_count_distinct('device_id', rsd = .01)).alias('WAU')
.writeStream
.format("delta")
.option("checkpointLocation", outputdata)
.outputMode("complete")
.start(outputdata)
谢谢,我也遇到了这个问题。
对我来说,您分享的代码有效,但稍作修改。
spark.readStream
.format("delta")
.load(inputdata)
.groupBy(weekofyear('eventDate').alias('week'))
.agg(approx_count_distinct('device_id', rsd = .01).alias('WAU'))
.writeStream
.format("delta")
.queryName(queryName)
.option("checkpointLocation", f"{outputdata}_checkpoint")
.outputMode("complete")
.start(outputdata)
请注意我在何处为 approx_count_distinct 添加了别名。 :)
我在数据框中有以下架构
root
|-- device_id: string (nullable = true)
|-- eventName: string (nullable = true)
|-- client_event_time: timestamp (nullable = true)
|-- eventDate: date (nullable = true)
|-- deviceType: string (nullable = true)
我想在此数据框中添加以下两列:
WAU:每周活跃用户数(按周分组的不同设备 ID)
week:一年中的第几周(需要使用适当的SQL函数)
我想使用 approx_count_distinct。 可选关键字 rsd 也需要设置为 .01。
我试着开始写类似下面的东西,但出现错误。
spark.readStream
.format("delta")
.load(inputpath)
.groupBy(weekofyear('eventDate'))
.count()
.distinct()
.writeStream
.format("delta")
.option("checkpointLocation", outputpath)
.outputMode("complete")
.start(outputpath)
根据讨论,下面的代码有效。
spark.readStream
.format("delta")
.load(inputdata)
.groupBy(weekofyear('eventDate').alias('week'))
.agg(F.approx_count_distinct('device_id', rsd = .01)).alias('WAU')
.writeStream
.format("delta")
.option("checkpointLocation", outputdata)
.outputMode("complete")
.start(outputdata)
谢谢,我也遇到了这个问题。 对我来说,您分享的代码有效,但稍作修改。
spark.readStream
.format("delta")
.load(inputdata)
.groupBy(weekofyear('eventDate').alias('week'))
.agg(approx_count_distinct('device_id', rsd = .01).alias('WAU'))
.writeStream
.format("delta")
.queryName(queryName)
.option("checkpointLocation", f"{outputdata}_checkpoint")
.outputMode("complete")
.start(outputdata)
请注意我在何处为 approx_count_distinct 添加了别名。 :)