pyspark rdd 采用最小年龄的最大频率

pyspark rdd taking the max frequency with the least age

我有如下的 rdd:

[{'age': 2.18430371791803,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {'age': 2.80033330216659,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {'age': 2.8222365762732,
  'code': u'"315.320000"',
  'id': u'"00008RINR"'},
 {...}]

我正在尝试通过使用以下代码获取最高频率代码来将每个 ID 减少到仅 1 条记录:

rdd.map(lambda x: (x["id"], [(x["age"], x["code"])]))\
.reduceByKey(lambda x, y: x + y)\
.map(lambda x: [i[1] for i in x[1]])\
.map(lambda x: [max(zip((x.count(i) for i in set(x)), set(x)))])

这个实现有一个问题,它没有考虑年龄,所以如果例如一个 id 有多个频率为 2 的代码,它将采用最后一个代码。

为了说明这个问题,请考虑这个缩减的 id:

(u'"000PZ7S2G"',
 [(4.3218651186303, u'"388.400000"'),
  (4.34924421126357, u'"388.400000"'),
  (4.3218651186303, u'"389.900000"'),
  (4.34924421126357, u'"389.900000"'),
  (13.3667102491139, u'"794.310000"'),
  (5.99897016368982, u'"995.300000"'),
  (6.02634923989903, u'"995.300000"'),
  (4.3218651186303, u'"V72.19"'),
  (4.34924421126357, u'"V72.19"'),
  (13.3639723398581, u'"V81.2"'),
  (13.3667102491139, u'"V81.2"')])

我的代码会输出:

[(2, u'"V81.2"')]

当我希望它输出时:

[(2, u'"388.400000"')]

因为虽然这两个代码的频率相同,但代码 388.400000 的年龄较小,出现在最前面。

通过在 .reduceByKey() 之后添加这一行:

.map(lambda x: (x[0], [i for i in x[1] if i[0] == min(x[1])[0]]))

我可以过滤掉那些大于最小年龄的人,但是我只考虑那些有最小年龄的人而不是所有代码来计算他们的频率。我不能在 [max(zip((x.count(i) for i in set(x)), set(x)))] 之后应用相同/相似的逻辑,因为 set(x) 是集合x[1],不考虑年龄。

我应该补充一下,我不想只取频率最高的第一个代码,我想取年龄最小的频率最高的代码,或者最先出现的代码,如果这是可能,仅使用 rdd 操作。

SQL 中我试图获得的等效代码类似于:

SELECT code, count(*) as code_frequency
FROM (SELECT id, code, age
FROM (SELECT id, code, MIN(age) AS age, COUNT(*) as cnt,
             ROW_NUMBER() OVER (PARTITION BY id ORDER BY COUNT(*) DESC, MIN(age)) as seqnum
      FROM tbl
      GROUP BY id, code
     ) t
WHERE seqnum = 1) a
GROUP BY code
ORDER by code_frequency DESC
LIMIT 5;

作为一名 DF(尽管试图避免这种情况):

wc = Window().partitionBy("id", "code").orderBy("age")
wc2 = Window().partitionBy("id")
df = rdd.toDF()
df = df.withColumn("count", F.count("code").over(wc))\
.withColumn("max", F.max("count").over(wc2))\
.filter("count = max")\
.groupBy("id").agg(F.first("age").alias("age"),
                           F.first("code").alias("code"))\
.orderBy("id")\
.groupBy("code")\
.count()\
.orderBy("count", ascending = False)

我非常感谢对此的任何帮助。

如果可以选择将 rdd 转换为数据帧,我认为这种方法可以解决您的问题:

from pyspark.sql.functions import row_number, col
from pyspark.sql import Window
df = rdd.toDF()
w = Window.partitionBy('id').orderBy('age')
df = df.withColumn('row_number', row_number.over(w)).where(col('row_number') == 1).drop('row_number')

基于你的代码的SQL等价,我将逻辑转换为以下rdd1加上一些post-processing(从原来的开始RDD):

rdd = sc.parallelize([{'age': 4.3218651186303, 'code': '"388.400000"', 'id': '"000PZ7S2G"'},
 {'age': 4.34924421126357, 'code': '"388.400000"', 'id': '"000PZ7S2G"'},
 {'age': 4.3218651186303, 'code': '"389.900000"', 'id': '"000PZ7S2G"'},
 {'age': 4.34924421126357, 'code': '"389.900000"', 'id': '"000PZ7S2G"'},
 {'age': 13.3667102491139, 'code': '"794.310000"', 'id': '"000PZ7S2G"'},
 {'age': 5.99897016368982, 'code': '"995.300000"', 'id': '"000PZ7S2G"'},
 {'age': 6.02634923989903, 'code': '"995.300000"', 'id': '"000PZ7S2G"'},
 {'age': 4.3218651186303, 'code': '"V72.19"', 'id': '"000PZ7S2G"'},
 {'age': 4.34924421126357, 'code': '"V72.19"', 'id': '"000PZ7S2G"'},
 {'age': 13.3639723398581, 'code': '"V81.2"', 'id': '"000PZ7S2G"'},
 {'age': 13.3667102491139, 'code': '"V81.2"', 'id': '"000PZ7S2G"'}])

rdd1 = rdd.map(lambda x: ((x['id'], x['code']),(x['age'], 1))) \
    .reduceByKey(lambda x,y: (min(x[0],y[0]), x[1]+y[1])) \
    .map(lambda x: (x[0][0], (-x[1][1] ,x[1][0], x[0][1]))) \
    .reduceByKey(lambda x,y: x if x < y else y) 
# [('"000PZ7S2G"', (-2, 4.3218651186303, '"388.400000"'))]

其中:

  1. 使用map初始化pair-RDD,key=(x['id'], x['code']),value=(x['age'], 1)
  2. 使用reduceByKey计算min_agecount
  3. 使用map重置键=id和值=(-count, min_age, code)
  4. 的pair-RDD
  5. 使用 reduceByKey 求相同 id
  6. 的元组 (-count, min_age, code) 的最小值

以上步骤类似:

  • 步骤 (1) + (2):groupby('id', 'code').agg(min('age'), count())
  • 步骤 (3) + (4):groupby('id').agg(min(struct(negative('count'),'min_age','code')))

然后您可以通过 rdd1.map(lambda x: (x[0], x[1][2], x[1][1])) 在您的 SQL 中获得派生的 table a,但这一步不是必需的。 code可以通过另一个map函数+countByKey()方法直接从上面的rdd1中统计出来,然后对结果进行排序:

sorted(rdd1.map(lambda x: (x[1][2],1)).countByKey().items(), key=lambda y: -y[1])
# [('"388.400000"', 1)]

但是,如果您要查找的是所有 id 的总和(计数),请执行以下操作:

rdd1.map(lambda x: (x[1][2],-x[1][0])).reduceByKey(lambda x,y: x+y).collect()
# [('"388.400000"', 2)]