Python Spark 将事务分组到嵌套模式中
Python Spark group transactions into nested schema
我想将存储在 pyspark.sql.dataframe.DataFrame
“ddf
”中的交易分组为“key
”列,该列指示交易来源(在本例中为客户 ID) .
分组是一个非常昂贵的过程,所以我想以嵌套模式将组写入磁盘:
(key, [[c1, c2, c3,...], ...])
这将使我能够快速加载密钥上的所有交易,并开发复杂的自定义聚合器而无需重新运行分组。
如何创建嵌套架构并将其写入磁盘?
虽然答案很简单,但我花了很长时间才弄明白,所以我想我 post 我的解决方案在这里。
首先将所有交易减少key
(客户ID):
from operators import add
# ddf is a dataframe with a transaction in each row. Key is the column
# we want to group the transactions by.
txnrdd = ddf.rdd.map(lambda row: (row['key'], [row],) ).reduceByKey(add)
这给出了一个看起来像 (key, [list of Rows])
的 rdd
。要将其写回到 dataframe
中,您需要构建架构。交易列表可以通过 ArrayType
.
建模
from pyspark.sql import types as sqxt
txn_schema = sqxt.StructType([
sqxt.StructField('Key', sqxt.StringType()),
sqxt.StructField('Transactions', sqxt.ArrayType(ddf.schema))
])
那么直接把数据写入磁盘就是这个结构:
txnddf = txnrdd.toDF(schema=txn_schema)
txnddf.write.parquet('customer-transactions.parquet')
性能似乎还可以。如果不通过 RDD,就无法找到执行此操作的方法。
我想将存储在 pyspark.sql.dataframe.DataFrame
“ddf
”中的交易分组为“key
”列,该列指示交易来源(在本例中为客户 ID) .
分组是一个非常昂贵的过程,所以我想以嵌套模式将组写入磁盘:
(key, [[c1, c2, c3,...], ...])
这将使我能够快速加载密钥上的所有交易,并开发复杂的自定义聚合器而无需重新运行分组。
如何创建嵌套架构并将其写入磁盘?
虽然答案很简单,但我花了很长时间才弄明白,所以我想我 post 我的解决方案在这里。
首先将所有交易减少key
(客户ID):
from operators import add
# ddf is a dataframe with a transaction in each row. Key is the column
# we want to group the transactions by.
txnrdd = ddf.rdd.map(lambda row: (row['key'], [row],) ).reduceByKey(add)
这给出了一个看起来像 (key, [list of Rows])
的 rdd
。要将其写回到 dataframe
中,您需要构建架构。交易列表可以通过 ArrayType
.
from pyspark.sql import types as sqxt
txn_schema = sqxt.StructType([
sqxt.StructField('Key', sqxt.StringType()),
sqxt.StructField('Transactions', sqxt.ArrayType(ddf.schema))
])
那么直接把数据写入磁盘就是这个结构:
txnddf = txnrdd.toDF(schema=txn_schema)
txnddf.write.parquet('customer-transactions.parquet')
性能似乎还可以。如果不通过 RDD,就无法找到执行此操作的方法。