基于spark中的列组合数据
Combining data based on column in spark
我在配置单元中有以下格式的数据 table。
user | purchase | time_of_purchase
我想在
中获取数据
user | list of purchases ordered by time
如何在 pyspark 或 hiveQL 中执行此操作?
我试过在配置单元中使用 collect_list,但它没有按时间戳正确保留顺序。
编辑:
按照 KartikKannapur 的要求添加样本数据。
这是一个样本数据
94438fef-c503-4326-9562-230e78796f16 | Bread | Jul 7 20:48
94438fef-c503-4326-9562-230e78796f16 | Shaving Cream | July 10 14:20
a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad | Milk | July 7 3:48
a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad | Bread | July 7 3:49
a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad | Lotion | July 7 15:30
我想要的输出是
94438fef-c503-4326-9562-230e78796f16 | Bread , Shaving Cream
a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad | Milk , Bread , Lotion
一种方法是
首先创建一个hive context,然后读取table到一个RDD。
from pyspark import HiveContext
purchaseList = HiveContext(sc).sql('from purchaseList select *')
然后处理RDD
from datetime import datetime as dt
purchaseList = purchaseList.map(lambda x:(x[0],[x[1],dt.strptime(x[2],"%b %d %H:%M")]))
purchaseByUser = purchaseList.groupByKey()
purchaseByUser = purchaseByUser.map(lambda x:(x[0],[y[0] for y in sorted(x[1], key=lambda z:z[1])]))
print(purchaseByUser.take(2))
输出
[('94438fef-c503-4326-9562-230e78796f16', ['Bread', 'Shaving Cream']), ('a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad', ['Milk', 'Bread', 'Lotion'])]
将 RDD 保存为新的配置单元 table
schema_rdd = HiveContext(sc).inferSchema(purchaseByUser)
schema_rdd.saveAsTable('purchaseByUser')
对于读写配置单元 table 看这个 Whosebug question and spark docs
我在配置单元中有以下格式的数据 table。
user | purchase | time_of_purchase
我想在
中获取数据user | list of purchases ordered by time
如何在 pyspark 或 hiveQL 中执行此操作?
我试过在配置单元中使用 collect_list,但它没有按时间戳正确保留顺序。
编辑: 按照 KartikKannapur 的要求添加样本数据。 这是一个样本数据
94438fef-c503-4326-9562-230e78796f16 | Bread | Jul 7 20:48
94438fef-c503-4326-9562-230e78796f16 | Shaving Cream | July 10 14:20
a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad | Milk | July 7 3:48
a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad | Bread | July 7 3:49
a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad | Lotion | July 7 15:30
我想要的输出是
94438fef-c503-4326-9562-230e78796f16 | Bread , Shaving Cream
a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad | Milk , Bread , Lotion
一种方法是
首先创建一个hive context,然后读取table到一个RDD。
from pyspark import HiveContext
purchaseList = HiveContext(sc).sql('from purchaseList select *')
然后处理RDD
from datetime import datetime as dt
purchaseList = purchaseList.map(lambda x:(x[0],[x[1],dt.strptime(x[2],"%b %d %H:%M")]))
purchaseByUser = purchaseList.groupByKey()
purchaseByUser = purchaseByUser.map(lambda x:(x[0],[y[0] for y in sorted(x[1], key=lambda z:z[1])]))
print(purchaseByUser.take(2))
输出
[('94438fef-c503-4326-9562-230e78796f16', ['Bread', 'Shaving Cream']), ('a0dcbb3b-d1dd-43aa-91d7-e92f48cee0ad', ['Milk', 'Bread', 'Lotion'])]
将 RDD 保存为新的配置单元 table
schema_rdd = HiveContext(sc).inferSchema(purchaseByUser)
schema_rdd.saveAsTable('purchaseByUser')
对于读写配置单元 table 看这个 Whosebug question and spark docs