Pyspark 数组保留顺序
Pyspark array preserving order
我有这样的结构,发票 table 和发票行 table。我想将行输出为强制模式中的 JSON 有序数组,按行号排序但行号不在模式中(假定它隐含在数组中)。据我了解,pyspark 和 json 都将在创建后保留数组顺序。请参阅下面的粗略示例。我怎样才能确保发票行保留行号顺序。我可以使用列表理解来做到这一点,但这意味着放弃我认为效率低下的火花。
from pyspark.sql.functions import collect_list, struct
invColumns = StructType([
StructField("invoiceNo",StringType(),True),
StructField("invoiceStuff",StringType(),True)
])
invData = [("1", "stuff"), ("2", "other stuff"), ("3", "more stuff")]
invLines = StructType([
StructField("lineNo",IntegerType(),True),
StructField("invoiceNo",StringType(),True),
StructField("detail",StringType(),True),
StructField("quantity",IntegerType(),True)
])
lineData = [(1,"1","item stuff",3),(2,"1","new item stuff",2),(3,"1","old item stuff",5),(1,"2","item stuff",3),(1,"3","item stuff",3),(2,"3","more item stuff",7)]
invoice_df = spark.createDataFrame(data=invData,schema=invColumns)
#in reality read from a spark table
invLine_df = spark.createDataFrame(data=lineData,schema=invLines)
#in reality read from a spark table
invoicesTemp_df = (invoice_df.select('invoiceNo',
'invoiceStuff')
.join(invLine_df.select('lineNo',
'InvoiceNo',
'detail',
'quantity'
),
on='invoiceNo'))
invoicesOut_df = (invoicesTemp_df.withColumn('invoiceLines',struct('detail','quantity'))
.groupBy('invoiceNo','invoiceStuff').agg(collect_list('invoiceLines').alias('invoiceLines'))
.select('invoiceNo',
'invoiceStuff',
'invoiceLines'
))
display(invoicesOut_df)
3 -- more stuff -- array -- 0: -- {"detail": "item stuff", "quantity": 3}
-- 1: -- {"detail": "more item stuff", "quantity": 7}
1 -- stuff -- array -- 0: -- {"detail": "new item stuff", "quantity": 2}
-- 1: -- {"detail": "old item stuff", "quantity": 5}
-- 2: -- {"detail": "item stuff", "quantity": 3}
2 -- other stuff -- array -- 0: -- {"detail": "item stuff", "quantity": 3}
以下为输入数据
Invoice Table
"InvoiceNo", "InvoiceStuff",
"1","stuff",
"2","other stuff",
"3","more stuff"
Invoice Lines Table
"LineNo","InvoiceNo","Detail","Quantity",
1,"1","item stuff",3,
2,"1","new item stuff",2,
3,"1","old item stuff",5,
1,"2","item stuff",3,
1,"3","item stuff",3,
2,"3","more item stuff",7
输出应如下所示,但数组应按发票行 table 中的行号排序,即使它不在输出中。
Output
"1","stuff","[{"detail": "item stuff", "quantity": 3},{"detail": "new item stuff", "quantity": 2},{"detail": "old item stuff", "quantity": 5}]",
"2","other stuff","[{"detail": "item stuff", "quantity": 3}]"
"3","more stuff","[{"detail": "item stuff", "quantity": 3},{"detail": "more item stuff", "quantity": 7}]"
collect_list
不 尊重数据的顺序
Note The function is non-deterministic because the order of collected results depends on the order of the rows which may be non-deterministic after a shuffle.
一种可能的方法是应用 collect_list
和 window 函数,您可以在其中控制顺序。
from pyspark.sql import functions as F
from pyspark.sql import Window as W
(invoice_df
.join(invLine_df, on='invoiceNo')
.withColumn('invoiceLines', F.struct('lineNo', 'detail','quantity'))
.withColumn('a', F.collect_list('invoiceLines').over(W.partitionBy('invoiceNo').orderBy('lineNo')))
.groupBy('invoiceNo')
.agg(F.max('a').alias('invoiceLines'))
.show(10, False)
)
+---------+--------------------------------------------------------------------+
|invoiceNo|invoiceLines |
+---------+--------------------------------------------------------------------+
|1 |[{1, item stuff, 3}, {2, new item stuff, 2}, {3, old item stuff, 5}]|
|2 |[{1, item stuff, 3}] |
|3 |[{1, item stuff, 3}, {2, more item stuff, 7}] |
+---------+--------------------------------------------------------------------+
我有这样的结构,发票 table 和发票行 table。我想将行输出为强制模式中的 JSON 有序数组,按行号排序但行号不在模式中(假定它隐含在数组中)。据我了解,pyspark 和 json 都将在创建后保留数组顺序。请参阅下面的粗略示例。我怎样才能确保发票行保留行号顺序。我可以使用列表理解来做到这一点,但这意味着放弃我认为效率低下的火花。
from pyspark.sql.functions import collect_list, struct
invColumns = StructType([
StructField("invoiceNo",StringType(),True),
StructField("invoiceStuff",StringType(),True)
])
invData = [("1", "stuff"), ("2", "other stuff"), ("3", "more stuff")]
invLines = StructType([
StructField("lineNo",IntegerType(),True),
StructField("invoiceNo",StringType(),True),
StructField("detail",StringType(),True),
StructField("quantity",IntegerType(),True)
])
lineData = [(1,"1","item stuff",3),(2,"1","new item stuff",2),(3,"1","old item stuff",5),(1,"2","item stuff",3),(1,"3","item stuff",3),(2,"3","more item stuff",7)]
invoice_df = spark.createDataFrame(data=invData,schema=invColumns)
#in reality read from a spark table
invLine_df = spark.createDataFrame(data=lineData,schema=invLines)
#in reality read from a spark table
invoicesTemp_df = (invoice_df.select('invoiceNo',
'invoiceStuff')
.join(invLine_df.select('lineNo',
'InvoiceNo',
'detail',
'quantity'
),
on='invoiceNo'))
invoicesOut_df = (invoicesTemp_df.withColumn('invoiceLines',struct('detail','quantity'))
.groupBy('invoiceNo','invoiceStuff').agg(collect_list('invoiceLines').alias('invoiceLines'))
.select('invoiceNo',
'invoiceStuff',
'invoiceLines'
))
display(invoicesOut_df)
3 -- more stuff -- array -- 0: -- {"detail": "item stuff", "quantity": 3}
-- 1: -- {"detail": "more item stuff", "quantity": 7}
1 -- stuff -- array -- 0: -- {"detail": "new item stuff", "quantity": 2}
-- 1: -- {"detail": "old item stuff", "quantity": 5}
-- 2: -- {"detail": "item stuff", "quantity": 3}
2 -- other stuff -- array -- 0: -- {"detail": "item stuff", "quantity": 3}
以下为输入数据
Invoice Table
"InvoiceNo", "InvoiceStuff",
"1","stuff",
"2","other stuff",
"3","more stuff"
Invoice Lines Table
"LineNo","InvoiceNo","Detail","Quantity",
1,"1","item stuff",3,
2,"1","new item stuff",2,
3,"1","old item stuff",5,
1,"2","item stuff",3,
1,"3","item stuff",3,
2,"3","more item stuff",7
输出应如下所示,但数组应按发票行 table 中的行号排序,即使它不在输出中。
Output
"1","stuff","[{"detail": "item stuff", "quantity": 3},{"detail": "new item stuff", "quantity": 2},{"detail": "old item stuff", "quantity": 5}]",
"2","other stuff","[{"detail": "item stuff", "quantity": 3}]"
"3","more stuff","[{"detail": "item stuff", "quantity": 3},{"detail": "more item stuff", "quantity": 7}]"
collect_list
不 尊重数据的顺序
Note The function is non-deterministic because the order of collected results depends on the order of the rows which may be non-deterministic after a shuffle.
一种可能的方法是应用 collect_list
和 window 函数,您可以在其中控制顺序。
from pyspark.sql import functions as F
from pyspark.sql import Window as W
(invoice_df
.join(invLine_df, on='invoiceNo')
.withColumn('invoiceLines', F.struct('lineNo', 'detail','quantity'))
.withColumn('a', F.collect_list('invoiceLines').over(W.partitionBy('invoiceNo').orderBy('lineNo')))
.groupBy('invoiceNo')
.agg(F.max('a').alias('invoiceLines'))
.show(10, False)
)
+---------+--------------------------------------------------------------------+
|invoiceNo|invoiceLines |
+---------+--------------------------------------------------------------------+
|1 |[{1, item stuff, 3}, {2, new item stuff, 2}, {3, old item stuff, 5}]|
|2 |[{1, item stuff, 3}] |
|3 |[{1, item stuff, 3}, {2, more item stuff, 7}] |
+---------+--------------------------------------------------------------------+