使用 pyspark 查找连接模式/adv 分区以获取最新值

finding connecting pattern / adv partition to get latest value using pyspark

我是 pyspark 的新手。请哪位大侠帮忙解答一下。

问题:

我的数据框包含两列,一列 var_x 和另一列 var_y。

在上面的数据中,首先我需要进行分区和排序以将所有连接的模式变量放在一个地方,我想用一个例子来解释更多:这里如果你观察者 var_x 中的值更新 var_y z2146 变为 -> y3498,y3498 变为 -> B5216,B5216 变为 -> B6849,最后变为 B6849 -> C9965。

因此,此连接变量的最新值为 C9965,与其他变量类似。预期输出如下所示。

这里的主要问题是我无法根据连接模式变量对数据进行分区。请哪位大侠帮我解答一下。

我没有在 pyspark 中得到确切的答案,但我用 python 解决了上面的问题。

步骤:我将 Var x 和 Var y 变量转换为字典。然后我使用下面的代码获取最新值。

d = {'Z2146':'y3498', 'y3498':'B5216', 'B5216':'B6849', 'B6849':'C9965', 'A2286':'C4955', 'C4955':'Q7658', 'D7777':'E8849'}

keys = list(d.keys())
values = list(d.values())

n = len(d.keys())
print(n)
while (n>0):
    for i,j in d.items():
        if(j in keys):
            d[i]=d[j]
    n = n-1
print(d)

将其保存为另一个字典输出,最后在 pyspark 中转换回数据框列。

如果有人想在 python 或 pyspark 中改进我的代码。随意做 post 吧。

我通过使用 RDD:

设法得到了你的解决方案
import pyspark.sql.functions as f
from pyspark.sql import Window

df = spark.createDataFrame([
    ('Z2146', 'y3498'),
    ('A2286', 'C4955'),
    ('y3498', 'B5216'),
    ('B5216', 'B6849'),
    ('D7777', 'E8849'),
    ('C4955', 'Q7658'),
    ('B6849', 'C9965'),
], ('var_x', 'var_y'))

# Creating an incremental id to use later
df = df.withColumn('increasing_id', f.monotonically_increasing_id())

# Set to this column a value that would partition your data. If you don't have,
# leave as 1, but may have performance issues depending your dataset size
df = df.withColumn('partition', f.lit(1))


def transform(rows):
    mapper, group_id = {}, 0
    for row in rows:
        increasing_id, var_x, var_y = row

        if var_x in mapper:
            group = mapper.get(var_x)
            yield increasing_id, var_x, var_y, group
            mapper.pop(var_x)
            mapper[var_y] = group
        else:
            mapper[var_y] = group_id
            yield increasing_id, var_x, var_y, group_id
            group_id += 1


# Creating a new column with `RDD` that contains which group your row belongs to
df_linked = (df
             .rdd
             .map(lambda row: (row.partition, (row.increasing_id, row.var_x, row.var_y)))
             .groupByKey()
             .flatMapValues(transform)
             .map(lambda row: row[1])
             .toDF('increasing_id long, var_x string, var_y string, group int'))
# +-------------+-----+-----+-----+
# |increasing_id|var_x|var_y|group|
# +-------------+-----+-----+-----+
# |   8589934592|Z2146|y3498|    0|
# |  25769803776|y3498|B5216|    0|
# |  34359738368|B5216|B6849|    0|
# |  60129542144|B6849|C9965|    0|
# |  17179869184|A2286|C4955|    1|
# |  51539607552|C4955|Q7658|    1|
# |  42949672960|D7777|E8849|    2|
# +-------------+-----+-----+-----+

# Getting the last `var_y` from each group
w = (Window.partitionBy('group')
     .orderBy(f.col('increasing_id').asc())
     .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
df_latest = df_linked.select('var_x', 'var_y', f.last('var_y').over(w).alias('Latest'))
df_latest.orderBy('group', 'increasing_id').show()
# +-----+-----+------+
# |var_x|var_y|Latest|
# +-----+-----+------+
# |Z2146|y3498| C9965|
# |y3498|B5216| C9965|
# |B5216|B6849| C9965|
# |B6849|C9965| C9965|
# |A2286|C4955| Q7658|
# |C4955|Q7658| Q7658|
# |D7777|E8849| E8849|
# +-----+-----+------+