使用 Delta Lake 上的结构化流计算 Spark Dataframe 中分组数据的唯一值
Counting unique values on grouped data in a Spark Dataframe with Structured Streaming on Delta Lake
大家。
我在 Delta Lake 中有一个结构化流。
我的最后一个 table 应该计算每周有多少唯一 ID 访问一个平台。
我在流式传输中按周对数据进行分组,但是,我无法计算另一列上 ID 的唯一值,即使重复进行,我也会继续计算整组数据。
我试过两次对数据进行分组,一次是按周,然后是 device_id。
我试过 dropDuplicate()。
到目前为止没有任何结果。
谁能解释一下我错过了什么?
我的代码:
from pyspark.sql.functions import weekofyear, col
def silverToGold(silverPath, goldPath, queryName):
(spark.readStream
.format("delta")
.load(silverPath)
.withColumn("week", weekofyear("client_event_time"))
.groupBy(col("week"))
.count()
.select(col("week"),col("count").alias("WAU"))
.writeStream
.format("delta")
.option("checkpointLocation", goldPath + "/_checkpoint")
.queryName(queryName)
.outputMode("complete")
.start(goldPath))
下面的代码成功了。
使用 approx_count_distinct 和 rsd=0.01。
from pyspark.sql.functions import weekofyear, approx_count_distinct
def silverToGold(silverPath, goldPath, queryName):
(spark.readStream
.format("delta")
.load(silverPath)
.withColumn("week", weekofyear(col("eventDate")))
.groupBy(col("week"))
.agg(approx_count_distinct("device_id",rsd=0.01).alias("WAU"))
.select("week","WAU")
.writeStream
.format("delta")
.option("checkpointLocation", goldPath + "/_checkpoint")
.queryName(queryName)
.outputMode("complete")
.start(goldPath))
大家。
我在 Delta Lake 中有一个结构化流。 我的最后一个 table 应该计算每周有多少唯一 ID 访问一个平台。
我在流式传输中按周对数据进行分组,但是,我无法计算另一列上 ID 的唯一值,即使重复进行,我也会继续计算整组数据。
我试过两次对数据进行分组,一次是按周,然后是 device_id。 我试过 dropDuplicate()。 到目前为止没有任何结果。
谁能解释一下我错过了什么?
我的代码:
from pyspark.sql.functions import weekofyear, col
def silverToGold(silverPath, goldPath, queryName):
(spark.readStream
.format("delta")
.load(silverPath)
.withColumn("week", weekofyear("client_event_time"))
.groupBy(col("week"))
.count()
.select(col("week"),col("count").alias("WAU"))
.writeStream
.format("delta")
.option("checkpointLocation", goldPath + "/_checkpoint")
.queryName(queryName)
.outputMode("complete")
.start(goldPath))
下面的代码成功了。
使用 approx_count_distinct 和 rsd=0.01。
from pyspark.sql.functions import weekofyear, approx_count_distinct
def silverToGold(silverPath, goldPath, queryName):
(spark.readStream
.format("delta")
.load(silverPath)
.withColumn("week", weekofyear(col("eventDate")))
.groupBy(col("week"))
.agg(approx_count_distinct("device_id",rsd=0.01).alias("WAU"))
.select("week","WAU")
.writeStream
.format("delta")
.option("checkpointLocation", goldPath + "/_checkpoint")
.queryName(queryName)
.outputMode("complete")
.start(goldPath))