使用 pyspark 查找连接模式/adv 分区以获取最新值
finding connecting pattern / adv partition to get latest value using pyspark
我是 pyspark 的新手。请哪位大侠帮忙解答一下。
问题:
我的数据框包含两列,一列 var_x 和另一列 var_y。
在上面的数据中,首先我需要进行分区和排序以将所有连接的模式变量放在一个地方,我想用一个例子来解释更多:这里如果你观察者 var_x 中的值更新 var_y z2146 变为 -> y3498,y3498 变为 -> B5216,B5216 变为 -> B6849,最后变为 B6849 -> C9965。
因此,此连接变量的最新值为 C9965,与其他变量类似。预期输出如下所示。
这里的主要问题是我无法根据连接模式变量对数据进行分区。请哪位大侠帮我解答一下。
我没有在 pyspark 中得到确切的答案,但我用 python 解决了上面的问题。
步骤:我将 Var x 和 Var y 变量转换为字典。然后我使用下面的代码获取最新值。
d = {'Z2146':'y3498', 'y3498':'B5216', 'B5216':'B6849', 'B6849':'C9965', 'A2286':'C4955', 'C4955':'Q7658', 'D7777':'E8849'}
keys = list(d.keys())
values = list(d.values())
n = len(d.keys())
print(n)
while (n>0):
for i,j in d.items():
if(j in keys):
d[i]=d[j]
n = n-1
print(d)
将其保存为另一个字典输出,最后在 pyspark 中转换回数据框列。
如果有人想在 python 或 pyspark 中改进我的代码。随意做 post 吧。
我通过使用 RDD
:
设法得到了你的解决方案
import pyspark.sql.functions as f
from pyspark.sql import Window
df = spark.createDataFrame([
('Z2146', 'y3498'),
('A2286', 'C4955'),
('y3498', 'B5216'),
('B5216', 'B6849'),
('D7777', 'E8849'),
('C4955', 'Q7658'),
('B6849', 'C9965'),
], ('var_x', 'var_y'))
# Creating an incremental id to use later
df = df.withColumn('increasing_id', f.monotonically_increasing_id())
# Set to this column a value that would partition your data. If you don't have,
# leave as 1, but may have performance issues depending your dataset size
df = df.withColumn('partition', f.lit(1))
def transform(rows):
mapper, group_id = {}, 0
for row in rows:
increasing_id, var_x, var_y = row
if var_x in mapper:
group = mapper.get(var_x)
yield increasing_id, var_x, var_y, group
mapper.pop(var_x)
mapper[var_y] = group
else:
mapper[var_y] = group_id
yield increasing_id, var_x, var_y, group_id
group_id += 1
# Creating a new column with `RDD` that contains which group your row belongs to
df_linked = (df
.rdd
.map(lambda row: (row.partition, (row.increasing_id, row.var_x, row.var_y)))
.groupByKey()
.flatMapValues(transform)
.map(lambda row: row[1])
.toDF('increasing_id long, var_x string, var_y string, group int'))
# +-------------+-----+-----+-----+
# |increasing_id|var_x|var_y|group|
# +-------------+-----+-----+-----+
# | 8589934592|Z2146|y3498| 0|
# | 25769803776|y3498|B5216| 0|
# | 34359738368|B5216|B6849| 0|
# | 60129542144|B6849|C9965| 0|
# | 17179869184|A2286|C4955| 1|
# | 51539607552|C4955|Q7658| 1|
# | 42949672960|D7777|E8849| 2|
# +-------------+-----+-----+-----+
# Getting the last `var_y` from each group
w = (Window.partitionBy('group')
.orderBy(f.col('increasing_id').asc())
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
df_latest = df_linked.select('var_x', 'var_y', f.last('var_y').over(w).alias('Latest'))
df_latest.orderBy('group', 'increasing_id').show()
# +-----+-----+------+
# |var_x|var_y|Latest|
# +-----+-----+------+
# |Z2146|y3498| C9965|
# |y3498|B5216| C9965|
# |B5216|B6849| C9965|
# |B6849|C9965| C9965|
# |A2286|C4955| Q7658|
# |C4955|Q7658| Q7658|
# |D7777|E8849| E8849|
# +-----+-----+------+
我是 pyspark 的新手。请哪位大侠帮忙解答一下。
问题:
我的数据框包含两列,一列 var_x 和另一列 var_y。
在上面的数据中,首先我需要进行分区和排序以将所有连接的模式变量放在一个地方,我想用一个例子来解释更多:这里如果你观察者 var_x 中的值更新 var_y z2146 变为 -> y3498,y3498 变为 -> B5216,B5216 变为 -> B6849,最后变为 B6849 -> C9965。
因此,此连接变量的最新值为 C9965,与其他变量类似。预期输出如下所示。
这里的主要问题是我无法根据连接模式变量对数据进行分区。请哪位大侠帮我解答一下。
我没有在 pyspark 中得到确切的答案,但我用 python 解决了上面的问题。
步骤:我将 Var x 和 Var y 变量转换为字典。然后我使用下面的代码获取最新值。
d = {'Z2146':'y3498', 'y3498':'B5216', 'B5216':'B6849', 'B6849':'C9965', 'A2286':'C4955', 'C4955':'Q7658', 'D7777':'E8849'}
keys = list(d.keys())
values = list(d.values())
n = len(d.keys())
print(n)
while (n>0):
for i,j in d.items():
if(j in keys):
d[i]=d[j]
n = n-1
print(d)
将其保存为另一个字典输出,最后在 pyspark 中转换回数据框列。
如果有人想在 python 或 pyspark 中改进我的代码。随意做 post 吧。
我通过使用 RDD
:
import pyspark.sql.functions as f
from pyspark.sql import Window
df = spark.createDataFrame([
('Z2146', 'y3498'),
('A2286', 'C4955'),
('y3498', 'B5216'),
('B5216', 'B6849'),
('D7777', 'E8849'),
('C4955', 'Q7658'),
('B6849', 'C9965'),
], ('var_x', 'var_y'))
# Creating an incremental id to use later
df = df.withColumn('increasing_id', f.monotonically_increasing_id())
# Set to this column a value that would partition your data. If you don't have,
# leave as 1, but may have performance issues depending your dataset size
df = df.withColumn('partition', f.lit(1))
def transform(rows):
mapper, group_id = {}, 0
for row in rows:
increasing_id, var_x, var_y = row
if var_x in mapper:
group = mapper.get(var_x)
yield increasing_id, var_x, var_y, group
mapper.pop(var_x)
mapper[var_y] = group
else:
mapper[var_y] = group_id
yield increasing_id, var_x, var_y, group_id
group_id += 1
# Creating a new column with `RDD` that contains which group your row belongs to
df_linked = (df
.rdd
.map(lambda row: (row.partition, (row.increasing_id, row.var_x, row.var_y)))
.groupByKey()
.flatMapValues(transform)
.map(lambda row: row[1])
.toDF('increasing_id long, var_x string, var_y string, group int'))
# +-------------+-----+-----+-----+
# |increasing_id|var_x|var_y|group|
# +-------------+-----+-----+-----+
# | 8589934592|Z2146|y3498| 0|
# | 25769803776|y3498|B5216| 0|
# | 34359738368|B5216|B6849| 0|
# | 60129542144|B6849|C9965| 0|
# | 17179869184|A2286|C4955| 1|
# | 51539607552|C4955|Q7658| 1|
# | 42949672960|D7777|E8849| 2|
# +-------------+-----+-----+-----+
# Getting the last `var_y` from each group
w = (Window.partitionBy('group')
.orderBy(f.col('increasing_id').asc())
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
df_latest = df_linked.select('var_x', 'var_y', f.last('var_y').over(w).alias('Latest'))
df_latest.orderBy('group', 'increasing_id').show()
# +-----+-----+------+
# |var_x|var_y|Latest|
# +-----+-----+------+
# |Z2146|y3498| C9965|
# |y3498|B5216| C9965|
# |B5216|B6849| C9965|
# |B6849|C9965| C9965|
# |A2286|C4955| Q7658|
# |C4955|Q7658| Q7658|
# |D7777|E8849| E8849|
# +-----+-----+------+