使用 Delta Lake 上的结构化流计算 Spark Dataframe 中分组数据的唯一值

Counting unique values on grouped data in a Spark Dataframe with Structured Streaming on Delta Lake

大家。

我在 Delta Lake 中有一个结构化流。 我的最后一个 table 应该计算每周有多少唯一 ID 访问一个平台。

我在流式传输中按周对数据进行分组,但是,我无法计算另一列上 ID 的唯一值,即使重复进行,我也会继续计算整组数据。

我试过两次对数据进行分组,一次是按周,然后是 device_id。 我试过 dropDuplicate()。 到目前为止没有任何结果。

谁能解释一下我错过了什么?

我的代码:

from pyspark.sql.functions import weekofyear, col

def silverToGold(silverPath, goldPath, queryName):
    (spark.readStream
    .format("delta")
    .load(silverPath)
    .withColumn("week", weekofyear("client_event_time"))
    .groupBy(col("week"))
    .count()
    .select(col("week"),col("count").alias("WAU"))
    .writeStream 
    .format("delta")
    .option("checkpointLocation", goldPath + "/_checkpoint")
    .queryName(queryName)
    .outputMode("complete")
    .start(goldPath))

下面的代码成功了。

使用 approx_count_distinct 和 rsd=0.01。

from pyspark.sql.functions import weekofyear, approx_count_distinct

def silverToGold(silverPath, goldPath, queryName):
  (spark.readStream
   .format("delta")
   .load(silverPath)
   .withColumn("week", weekofyear(col("eventDate")))
   .groupBy(col("week"))
   .agg(approx_count_distinct("device_id",rsd=0.01).alias("WAU"))
   .select("week","WAU")
   .writeStream
   .format("delta")
   .option("checkpointLocation", goldPath + "/_checkpoint")
   .queryName(queryName)
   .outputMode("complete") 
   .start(goldPath))