在 Pyspark 中使用 RDD 按键(月)排序
Sort by key (Month) using RDDs in Pyspark
我有这个 RDD,想按月份(1 月 --> 12 月)对其进行排序。我怎样才能在 pyspark 中做到这一点?
注意:不想使用 spark.sql 或 Dataframe.
+-----+-----+
|Month|count|
+-----+-----+
| Oct| 1176|
| Sep| 1167|
| Dec| 2084|
| Aug| 1126|
| May| 1176|
| Jun| 1424|
| Feb| 1286|
| Nov| 1078|
| Mar| 1740|
| Jan| 1544|
| Apr| 1080|
| Jul| 1237|
+-----+-----+
您可以将 rdd.sortBy 与 python 的日历模块中可用的辅助字典一起使用,或者创建您自己的月份字典:
import calendar
d = {i:e for e,i in enumerate(calendar.month_abbr[1:],1)}
#{'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7,
#'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12}
myrdd.sortBy(keyfunc=lambda x: d.get(x[0])).collect()
[('Jan', 1544),
('Feb', 1286),
('Mar', 1740),
('Apr', 1080),
('May', 1176),
('Jun', 1424),
('Jul', 1237),
('Aug', 1126),
('Sep', 1167),
('Oct', 1176),
('Nov', 1078),
('Dec', 2084)]
myList = myrdd.collect()
my_list_dict = dict(myList)
months = ['Jan', 'Feb', 'Mar', 'Apr', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
newList = []
for m in months:
newList.append((m, my_list_dict[m]))
print(newList)
我有这个 RDD,想按月份(1 月 --> 12 月)对其进行排序。我怎样才能在 pyspark 中做到这一点? 注意:不想使用 spark.sql 或 Dataframe.
+-----+-----+
|Month|count|
+-----+-----+
| Oct| 1176|
| Sep| 1167|
| Dec| 2084|
| Aug| 1126|
| May| 1176|
| Jun| 1424|
| Feb| 1286|
| Nov| 1078|
| Mar| 1740|
| Jan| 1544|
| Apr| 1080|
| Jul| 1237|
+-----+-----+
您可以将 rdd.sortBy 与 python 的日历模块中可用的辅助字典一起使用,或者创建您自己的月份字典:
import calendar
d = {i:e for e,i in enumerate(calendar.month_abbr[1:],1)}
#{'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7,
#'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12}
myrdd.sortBy(keyfunc=lambda x: d.get(x[0])).collect()
[('Jan', 1544),
('Feb', 1286),
('Mar', 1740),
('Apr', 1080),
('May', 1176),
('Jun', 1424),
('Jul', 1237),
('Aug', 1126),
('Sep', 1167),
('Oct', 1176),
('Nov', 1078),
('Dec', 2084)]
myList = myrdd.collect()
my_list_dict = dict(myList)
months = ['Jan', 'Feb', 'Mar', 'Apr', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
newList = []
for m in months:
newList.append((m, my_list_dict[m]))
print(newList)