如何在 SQL/Spark/GraphFrames 中进行此转换
How to do this transformation in SQL/Spark/GraphFrames
我有一个 table 包含以下两列:
Device-Id Account-Id
d1 a1
d2 a1
d1 a2
d2 a3
d3 a4
d3 a5
d4 a6
d1 a4
Device-Id 是安装我的应用程序的设备的唯一ID,Account-Id 是用户帐户的ID。一个用户可以拥有多个设备,并且可以在同一台设备上创建多个帐户(例如,d1 设备设置了 a1、a2 和 a3 帐户)。
我想找到唯一的实际用户(应该表示为在生成的 table 中具有一些唯一 UUID 的新列)和我正在寻找的转换,生成以下内容 table :
Unique-User-Id Devices-Used Accounts-Used
uuid1 [d1, d2, d3] [a1, a2, a3, a4, a5]
uuid2 [d4] [a6]
上面生成的table背后的想法是实际用户uuid1在他们的设备d1和d2上设置了一个帐户a1,这基本上意味着这两个设备都属于uuid 1和所有在这些 d1 和 d2 设备上设置的其他帐户也映射到同一用户 uuid1。同样,d1也有一个账号a4,也是在d3上设置的,所以d3也是uuid1的设备,它上面的每个账号都应该映射到uuid1。
如何在 SQL/Spark/GraphFrames(通过 DataBricks)中实现上述转换,其中 设备 ID 和帐户 ID 都可以以百万为单位?
我并不真正为这个解决方案感到自豪,因为我认为可能有一个更有效的解决方案,但无论如何我都会把它留在这里。希望对你有帮助
import org.apache.spark.sql.functions._
val flatten_distinct = (array_distinct _) compose (flatten _)
val df = Seq(
("d1","a1"),
("d2","a1"),
("d1","a2"),
("d2","a3"),
("d3","a4"),
("d3","a5"),
("d4","a6")
).toDF("d_id","u_id")
val userDevices = df
.groupBy("u_id")
.agg(collect_list("d_id").alias("d_id_list"))
//+----+---------+
//|u_id|d_id_list|
//+----+---------+
//| a5| [d3]|
//| a3| [d2]|
//| a4| [d3]|
//| a2| [d1]|
//| a1| [d1, d2]|
//| a6| [d4]|
//+----+---------+
val accountsByDevice = df
.groupBy("d_id")
.agg(collect_list("u_id").alias("u_id_list"))
//+----+---------+
//|d_id|u_id_list|
//+----+---------+
//| d2| [a3, a1]|
//| d3| [a4, a5]|
//| d1| [a1, a2]|
//| d4| [a6]|
//+----+---------+
val ungroupedDf = userDevices
.join(accountsByDevice, expr("array_contains(d_id_list,d_id)"))
.groupBy("d_id_list")
.agg(collect_set("u_id_list") as "set")
.select(col("d_id_list") as "d_id", flatten_distinct(col("set")) as "u_id")
.select(explode(col("d_id")) as "d_id", col("u_id"), size(col("u_id")) as "size")
//+----+------------+----+
//|d_id| u_id|size|
//+----+------------+----+
//| d2| [a1, a3]| 2|
//| d1|[a1, a3, a2]| 3|
//| d2|[a1, a3, a2]| 3|
//| d3| [a4, a5]| 2|
//| d1| [a1, a2]| 2|
//| d4| [a6]| 1|
//+----+------------+----+
val finalDf = ungroupedDf
.join(ungroupedDf.groupBy("d_id").agg(max(col("size")) as "size"), Seq("size","d_id"))
.groupBy("u_id")
.agg(collect_set("d_id") as "d_id")
.withColumn("unique_id", monotonically_increasing_id())
//+------------+--------+-------------+
//| u_id| d_id| unique_id|
//+------------+--------+-------------+
//|[a1, a2, a3]|[d1, d2]|1228360646656|
//| [a4, a5]| [d3]|1297080123392|
//| [a6]| [d4]|1520418422784|
//+------------+--------+-------------+
你可以试试GraphFrame.connectedComponents,给所有的Device-ID
加一个前缀,这样就可以在post-处理步骤中从Account-ID
中拆分出来:
from graphframes import GraphFrame
from pyspark.sql.functions import collect_set, expr
df = spark.createDataFrame([
("d1","a1"), ("d2","a1"), ("d1","a2"), ("d1","a4"),
("d2","a3"), ("d3","a4"), ("d3","a5"), ("d4","a6")
], ["Device-Id","Account-Id"])
# set checkpoint which is required for Graphframe
spark.sparkContext.setCheckpointDir("/tmp/111")
# for testing purpose, set a small shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", 2)
# set up edges and vertices, add an underscore as prefix of Device-ID
edges = df.withColumn('Device-Id', expr('concat("_", `Device-Id`)')).toDF('src', 'dst')
vertices = edges.selectExpr('src as id').distinct().union(edges.select('dst').distinct())
# set up the graph
g = GraphFrame(vertices, edges)
# compute the connected components and group resultset by component
# and collect corresponding ids using collect_set(id)
df1 = g.connectedComponents().groupby('component').agg(collect_set('id').alias('ids'))
df1.show(truncate=False)
+------------+-----------------------------------+
|component |ids |
+------------+-----------------------------------+
|309237645312|[a6, _d4] |
|85899345920 |[_d1, a4, a1, _d3, a3, a5, a2, _d2]|
+------------+-----------------------------------+
# split the ids based on the prefix we predefined when creating edges.
df1.selectExpr(
'transform(filter(ids, x -> left(x,1) = "_"), y -> substr(y,2)) AS `Devices-Used`'
, 'filter(ids, x -> left(x,1) != "_") AS `Accounts-Used`'
, 'component AS `Unique-User-Id`'
).show()
+------------+--------------------+--------------+
|Devices-Used| Accounts-Used|Unique-User-Id|
+------------+--------------------+--------------+
|[d1, d3, d2]|[a4, a1, a3, a5, a2]| 85899345920|
| [d4]| [a6]| 309237645312|
+------------+--------------------+--------------+
编辑: 上述方法在创建不需要的edges/vertices大列表时效率较低,使用自连接创建边缘列表应该是更好的选择(受此启发):
edges = df.alias('d1').join(df.alias('d2'), ["Account-Id"]) \
.filter("d1.`Device-Id` > d2.`Device-Id`") \
.toDF("account", "src", "dst")
+-------+---+---+
|account|src|dst|
+-------+---+---+
| a1| d2| d1|
| a4| d3| d1|
+-------+---+---+
vertices = df.selectExpr('`Device-Id` as id', "`Account-Id` as acct_id")
g = GraphFrame(vertices, edges)
df1 = g.connectedComponents() \
.groupby('component') \
.agg(
collect_set('id').alias('Device-Ids'),
collect_set('acct_id').alias('Account-Ids')
)
+---------+------------+--------------------+
|component| Device-Ids| Account-Ids|
+---------+------------+--------------------+
| 0|[d1, d2, d3]|[a4, a1, a3, a5, a2]|
| 1| [d4]| [a6]|
+---------+------------+--------------------+
我有一个 table 包含以下两列:
Device-Id Account-Id
d1 a1
d2 a1
d1 a2
d2 a3
d3 a4
d3 a5
d4 a6
d1 a4
Device-Id 是安装我的应用程序的设备的唯一ID,Account-Id 是用户帐户的ID。一个用户可以拥有多个设备,并且可以在同一台设备上创建多个帐户(例如,d1 设备设置了 a1、a2 和 a3 帐户)。
我想找到唯一的实际用户(应该表示为在生成的 table 中具有一些唯一 UUID 的新列)和我正在寻找的转换,生成以下内容 table :
Unique-User-Id Devices-Used Accounts-Used
uuid1 [d1, d2, d3] [a1, a2, a3, a4, a5]
uuid2 [d4] [a6]
上面生成的table背后的想法是实际用户uuid1在他们的设备d1和d2上设置了一个帐户a1,这基本上意味着这两个设备都属于uuid 1和所有在这些 d1 和 d2 设备上设置的其他帐户也映射到同一用户 uuid1。同样,d1也有一个账号a4,也是在d3上设置的,所以d3也是uuid1的设备,它上面的每个账号都应该映射到uuid1。
如何在 SQL/Spark/GraphFrames(通过 DataBricks)中实现上述转换,其中 设备 ID 和帐户 ID 都可以以百万为单位?
我并不真正为这个解决方案感到自豪,因为我认为可能有一个更有效的解决方案,但无论如何我都会把它留在这里。希望对你有帮助
import org.apache.spark.sql.functions._
val flatten_distinct = (array_distinct _) compose (flatten _)
val df = Seq(
("d1","a1"),
("d2","a1"),
("d1","a2"),
("d2","a3"),
("d3","a4"),
("d3","a5"),
("d4","a6")
).toDF("d_id","u_id")
val userDevices = df
.groupBy("u_id")
.agg(collect_list("d_id").alias("d_id_list"))
//+----+---------+
//|u_id|d_id_list|
//+----+---------+
//| a5| [d3]|
//| a3| [d2]|
//| a4| [d3]|
//| a2| [d1]|
//| a1| [d1, d2]|
//| a6| [d4]|
//+----+---------+
val accountsByDevice = df
.groupBy("d_id")
.agg(collect_list("u_id").alias("u_id_list"))
//+----+---------+
//|d_id|u_id_list|
//+----+---------+
//| d2| [a3, a1]|
//| d3| [a4, a5]|
//| d1| [a1, a2]|
//| d4| [a6]|
//+----+---------+
val ungroupedDf = userDevices
.join(accountsByDevice, expr("array_contains(d_id_list,d_id)"))
.groupBy("d_id_list")
.agg(collect_set("u_id_list") as "set")
.select(col("d_id_list") as "d_id", flatten_distinct(col("set")) as "u_id")
.select(explode(col("d_id")) as "d_id", col("u_id"), size(col("u_id")) as "size")
//+----+------------+----+
//|d_id| u_id|size|
//+----+------------+----+
//| d2| [a1, a3]| 2|
//| d1|[a1, a3, a2]| 3|
//| d2|[a1, a3, a2]| 3|
//| d3| [a4, a5]| 2|
//| d1| [a1, a2]| 2|
//| d4| [a6]| 1|
//+----+------------+----+
val finalDf = ungroupedDf
.join(ungroupedDf.groupBy("d_id").agg(max(col("size")) as "size"), Seq("size","d_id"))
.groupBy("u_id")
.agg(collect_set("d_id") as "d_id")
.withColumn("unique_id", monotonically_increasing_id())
//+------------+--------+-------------+
//| u_id| d_id| unique_id|
//+------------+--------+-------------+
//|[a1, a2, a3]|[d1, d2]|1228360646656|
//| [a4, a5]| [d3]|1297080123392|
//| [a6]| [d4]|1520418422784|
//+------------+--------+-------------+
你可以试试GraphFrame.connectedComponents,给所有的Device-ID
加一个前缀,这样就可以在post-处理步骤中从Account-ID
中拆分出来:
from graphframes import GraphFrame
from pyspark.sql.functions import collect_set, expr
df = spark.createDataFrame([
("d1","a1"), ("d2","a1"), ("d1","a2"), ("d1","a4"),
("d2","a3"), ("d3","a4"), ("d3","a5"), ("d4","a6")
], ["Device-Id","Account-Id"])
# set checkpoint which is required for Graphframe
spark.sparkContext.setCheckpointDir("/tmp/111")
# for testing purpose, set a small shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", 2)
# set up edges and vertices, add an underscore as prefix of Device-ID
edges = df.withColumn('Device-Id', expr('concat("_", `Device-Id`)')).toDF('src', 'dst')
vertices = edges.selectExpr('src as id').distinct().union(edges.select('dst').distinct())
# set up the graph
g = GraphFrame(vertices, edges)
# compute the connected components and group resultset by component
# and collect corresponding ids using collect_set(id)
df1 = g.connectedComponents().groupby('component').agg(collect_set('id').alias('ids'))
df1.show(truncate=False)
+------------+-----------------------------------+
|component |ids |
+------------+-----------------------------------+
|309237645312|[a6, _d4] |
|85899345920 |[_d1, a4, a1, _d3, a3, a5, a2, _d2]|
+------------+-----------------------------------+
# split the ids based on the prefix we predefined when creating edges.
df1.selectExpr(
'transform(filter(ids, x -> left(x,1) = "_"), y -> substr(y,2)) AS `Devices-Used`'
, 'filter(ids, x -> left(x,1) != "_") AS `Accounts-Used`'
, 'component AS `Unique-User-Id`'
).show()
+------------+--------------------+--------------+
|Devices-Used| Accounts-Used|Unique-User-Id|
+------------+--------------------+--------------+
|[d1, d3, d2]|[a4, a1, a3, a5, a2]| 85899345920|
| [d4]| [a6]| 309237645312|
+------------+--------------------+--------------+
编辑: 上述方法在创建不需要的edges/vertices大列表时效率较低,使用自连接创建边缘列表应该是更好的选择(受此启发
edges = df.alias('d1').join(df.alias('d2'), ["Account-Id"]) \
.filter("d1.`Device-Id` > d2.`Device-Id`") \
.toDF("account", "src", "dst")
+-------+---+---+
|account|src|dst|
+-------+---+---+
| a1| d2| d1|
| a4| d3| d1|
+-------+---+---+
vertices = df.selectExpr('`Device-Id` as id', "`Account-Id` as acct_id")
g = GraphFrame(vertices, edges)
df1 = g.connectedComponents() \
.groupby('component') \
.agg(
collect_set('id').alias('Device-Ids'),
collect_set('acct_id').alias('Account-Ids')
)
+---------+------------+--------------------+
|component| Device-Ids| Account-Ids|
+---------+------------+--------------------+
| 0|[d1, d2, d3]|[a4, a1, a3, a5, a2]|
| 1| [d4]| [a6]|
+---------+------------+--------------------+