根据来自第二个数据帧的匹配键将列表附加到 Spark 数据帧列
Append List to a Spark Data frame column based on matching Keys from a second data frame
我有 2 个列名相同的 spark 数据框,当键列相互匹配时,我想使用 df2 中同一列中的列表扩展第一个 df 中的一些列。
df1:
+----+---+--++------+---------+-----+--------+--------+-------+
|k1 | k2 |list1 | list2 |list3|list4 |list5 |list 6 |
+----+---+--+-------+---------------------------------+-------+
| a| 121 |[car1] |[price1] |[1] |[False] |[0.000] |[vfdvf]|
| b| 11 |[car3] |[price3] |[2] |[False] |[1.000] |[00000]|
| c| 23 |[car3] |[price3] |[4] |[False] |[2.500] |[fdabh]|
| d| 250 |[car6] |[price6] |[6] |[True] |[0.450] |[00000]|
+----+---+--++----+---+--+--++----+---+------+----------------+
df2:
+----+---+--++------+---------+-----+--------+--------+-------+
|k1 | k2 |list1 | list2 |list3|list4 |list5 |list 6 |
+----+---+--+-------+---------------------------------+-------+
| m| 121 |[car5] |[price5] |[5] |[False] |[3.000] |[vfdvf]|
| b| 11 |[car8] |[price8] |[8] |[False] |[2.000] |[mnfaf]|
| c| 23 |[car7] |[price7] |[7] |[False] |[1.500] |[00000]|
| n| 250 |[car9] |[price9] |[9] |[False] |[0.450] |[00000]|
+----+---+--++----+---+--+--++----+---+------+----------------+
由于项目列表的列彼此相关,因此订单必须保持不变。只有当 key1 和 key2 在两个 df 之间匹配时,有没有一种方法可以将整个列表从 df2 附加到 df1?
结果应如下所示(我无法放入列表 6 列,但希望在与其他列表列具有相同模式的结果中看到它):
+--+--+-----------+---------------+-----+------------+--------------+
|k1|k2|list1 | list2 |list3|list4 |list5 |
+--+--+-----------+---------------+-----+------------+--------------+
|b |11|[car3,car8]|[price3,price8]|[2,8]|[False,False]|[1.000,2.000]|
|c |23|[car3,car7]|[price3,price7]|[4,7]|[False,False]|[2.500,1.500]|
+--+--+-----------+---------------+-----+-------------+-------------+
我对使用 UDF 还是很陌生,在 Whosebug 上找不到类似的问题,我发现的唯一类似问题是使用 pandas(),这对我的用例来说太慢了。对此有任何见解将不胜感激。
首先,您需要像我在下面所做的那样创建您自己的架构,然后您的代码才能工作,请使用我更新的代码
试试这个:您不需要 UDF,首先执行内部连接然后连接它
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql import *
from collections import *
from pyspark.sql.functions import udf,explode
from pyspark.sql.types import StringType
table_schema = StructType([StructField('key1', StringType(), True),
StructField('key2', IntegerType(), True),
StructField('list1', ArrayType(StringType()), False),
StructField('list2', ArrayType(StringType()), False),
StructField('list3', ArrayType(IntegerType()), False),
StructField('list4', StringType(), False),
StructField('list5', ArrayType(FloatType()), False),
StructField('list6', ArrayType(StringType()), False)
])
df= spark.createDataFrame(
[
( "a", 121 ,["car1"] ,["price1"] ,[1] ,["False"] ,[0.000] ,["vfdvf"]),
( "b", 11 ,["car3"] ,["price3"] ,[2] ,["False"] ,[1.000] ,[00000]),
( "c", 23 ,["car3"] ,["price3"] ,[4] ,["False"] ,[2.500] ,["fdabh"]),
( "d", 250 ,["car6"] ,["price6"] ,[6] ,["True"] ,[0.450] ,[00000])
],table_schema
)
df2= spark.createDataFrame(
[
("m", 121 ,["car5"] ,["price5"] ,[5] ,["False"] ,[3.000] ,["vfdvf"]),
( "b", 11 ,["car8"] ,["price8"] ,[8] ,["False"] ,[2.000] ,["mnfaf"]),
( "c", 23 ,["car7"] ,["price7"] ,[7] ,["False"] ,[1.500] ,[00000]),
( "n", 250 ,["car9"] ,["price9"] ,[9] ,["False"] ,[0.450] ,[00000])
],table_schema
)
df.createOrReplaceTempView("A")
df2.createOrReplaceTempView("B")
spark.sql("select a.key1,a.key2,concat(a.list1,b.list1)List1 ,concat(a.list2,b.list2)List2, \
concat(a.list3,b.list3)List3 ,concat(a.list4,b.list4)List4,\
concat(a.list5,b.list5)List5 ,\
concat(a.list6,b.list6)List6 \
from A a inner join B b on a.key1=b.key1 order by a.key1").show(truncate=False)
+----+----+------------+----------------+------+--------------+----------+----------+
|key1|key2|List1 |List2 |List3 |List4 |List5 |List6 |
+----+----+------------+----------------+------+--------------+----------+----------+
|b |11 |[car3, car8]|[price3, price8]|[2, 8]|[False][False]|[1.0, 2.0]|[0, mnfaf]|
|c |23 |[car3, car7]|[price3, price7]|[4, 7]|[False][False]|[2.5, 1.5]|[fdabh, 0]|
+----+----+------------+----------------+------+--------------+----------+----------+
我找到了我的问题的答案,并想post在这里分享给可能面临同样问题的其他人,并供我将来参考。
from pyspark.sql.types import BooleanType
from pyspark.sql.types import StringType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import ArrayType
from pyspark.sql.types import LongType
from pyspark.sql.types import ByteType
def concatTypesFunc(array1, array2):
final_array=array1+array2
return final_array
spark.udf.register("concat_types", concatTypesFunc,
ArrayType(BooleanType())
spark.udf.register("concat_types", concatTypesFunc,
ArrayType(StringType())
spark.udf.register("concat_types", concatTypesFunc,
ArrayType(DoubleType())
spark.udf.register("concat_types", concatTypesFunc,
ArrayType(IntegerType())
spark.udf.register("concat_types", concatTypesFunc,
ArrayType(LongType())
spark.udf.register("concat_types", concatTypesFunc,
ByteType(LongType())
df= spark.createDataFrame(
[
( "a", 121 ,["car1"] ,["price1"] ,[1] ,["False"] ,[0.000] ,["vfdvf"]),
( "b", 11 ,["car3"] ,["price3"] ,[2] ,["False"] ,[1.000] ,[00000]),
( "c", 23 ,["car3"] ,["price3"] ,[4] ,["False"] ,[2.500] ,["fdabh"]),
( "d", 250 ,["car6"] ,["price6"] ,[6] ,["True"] ,[0.450] ,[00000])
],table_schema
)
df2= spark.createDataFrame(
[
("m", 121 ,["car5"] ,["price5"] ,[5] ,["False"] ,[3.000] ,["vfdvf"]),
( "b", 11 ,["car8"] ,["price8"] ,[8] ,["False"] ,[2.000] ,["mnfaf"]),
( "c", 23 ,["car7"] ,["price7"] ,[7] ,["False"] ,[1.500] ,[00000]),
( "n", 250 ,["car9"] ,["price9"] ,[9] ,["False"] ,[0.450] ,[00000])
],table_schema
)
df.createOrReplaceTempView("a")
df2.createOrReplaceTempView("b")
spark.sql("select a.key1, a.key2, concat_types(a.list1,b.list1)List1 ,concat_types(a.list2,b.list2)List2, \
concat_types(a.list3,b.list3)List3 ,concat_types(a.list4,b.list4)List4,\
concat_types(a.list5,b.list5)List5 ,\
concat_types(a.list6,b.list6)List6 \
from a inner join b on a.key1=b.key1 order by a.key1").show(truncate=False)
+----+----+------------+----------------+------+--------------+----------+----------+
|key1|key2|List1 |List2 |List3 |List4 |List5 |List6 |
+----+----+------------+----------------+------+--------------+----------+----------+
|b |11 |[car3, car8]|[price3, price8]|[2, 8]|[False][False]|[1.0, 2.0]|[0, mnfaf]|
|c |23 |[car3, car7]|[price3, price7]|[4, 7]|[False][False]|[2.5, 1.5]|[fdabh, 0]|
+----+----+------------+----------------+------+--------------+----------+----------+
我有 2 个列名相同的 spark 数据框,当键列相互匹配时,我想使用 df2 中同一列中的列表扩展第一个 df 中的一些列。
df1:
+----+---+--++------+---------+-----+--------+--------+-------+
|k1 | k2 |list1 | list2 |list3|list4 |list5 |list 6 |
+----+---+--+-------+---------------------------------+-------+
| a| 121 |[car1] |[price1] |[1] |[False] |[0.000] |[vfdvf]|
| b| 11 |[car3] |[price3] |[2] |[False] |[1.000] |[00000]|
| c| 23 |[car3] |[price3] |[4] |[False] |[2.500] |[fdabh]|
| d| 250 |[car6] |[price6] |[6] |[True] |[0.450] |[00000]|
+----+---+--++----+---+--+--++----+---+------+----------------+
df2:
+----+---+--++------+---------+-----+--------+--------+-------+
|k1 | k2 |list1 | list2 |list3|list4 |list5 |list 6 |
+----+---+--+-------+---------------------------------+-------+
| m| 121 |[car5] |[price5] |[5] |[False] |[3.000] |[vfdvf]|
| b| 11 |[car8] |[price8] |[8] |[False] |[2.000] |[mnfaf]|
| c| 23 |[car7] |[price7] |[7] |[False] |[1.500] |[00000]|
| n| 250 |[car9] |[price9] |[9] |[False] |[0.450] |[00000]|
+----+---+--++----+---+--+--++----+---+------+----------------+
由于项目列表的列彼此相关,因此订单必须保持不变。只有当 key1 和 key2 在两个 df 之间匹配时,有没有一种方法可以将整个列表从 df2 附加到 df1?
结果应如下所示(我无法放入列表 6 列,但希望在与其他列表列具有相同模式的结果中看到它):
+--+--+-----------+---------------+-----+------------+--------------+
|k1|k2|list1 | list2 |list3|list4 |list5 |
+--+--+-----------+---------------+-----+------------+--------------+
|b |11|[car3,car8]|[price3,price8]|[2,8]|[False,False]|[1.000,2.000]|
|c |23|[car3,car7]|[price3,price7]|[4,7]|[False,False]|[2.500,1.500]|
+--+--+-----------+---------------+-----+-------------+-------------+
我对使用 UDF 还是很陌生,在 Whosebug 上找不到类似的问题,我发现的唯一类似问题是使用 pandas(
首先,您需要像我在下面所做的那样创建您自己的架构,然后您的代码才能工作,请使用我更新的代码
试试这个:您不需要 UDF,首先执行内部连接然后连接它
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql import *
from collections import *
from pyspark.sql.functions import udf,explode
from pyspark.sql.types import StringType
table_schema = StructType([StructField('key1', StringType(), True),
StructField('key2', IntegerType(), True),
StructField('list1', ArrayType(StringType()), False),
StructField('list2', ArrayType(StringType()), False),
StructField('list3', ArrayType(IntegerType()), False),
StructField('list4', StringType(), False),
StructField('list5', ArrayType(FloatType()), False),
StructField('list6', ArrayType(StringType()), False)
])
df= spark.createDataFrame(
[
( "a", 121 ,["car1"] ,["price1"] ,[1] ,["False"] ,[0.000] ,["vfdvf"]),
( "b", 11 ,["car3"] ,["price3"] ,[2] ,["False"] ,[1.000] ,[00000]),
( "c", 23 ,["car3"] ,["price3"] ,[4] ,["False"] ,[2.500] ,["fdabh"]),
( "d", 250 ,["car6"] ,["price6"] ,[6] ,["True"] ,[0.450] ,[00000])
],table_schema
)
df2= spark.createDataFrame(
[
("m", 121 ,["car5"] ,["price5"] ,[5] ,["False"] ,[3.000] ,["vfdvf"]),
( "b", 11 ,["car8"] ,["price8"] ,[8] ,["False"] ,[2.000] ,["mnfaf"]),
( "c", 23 ,["car7"] ,["price7"] ,[7] ,["False"] ,[1.500] ,[00000]),
( "n", 250 ,["car9"] ,["price9"] ,[9] ,["False"] ,[0.450] ,[00000])
],table_schema
)
df.createOrReplaceTempView("A")
df2.createOrReplaceTempView("B")
spark.sql("select a.key1,a.key2,concat(a.list1,b.list1)List1 ,concat(a.list2,b.list2)List2, \
concat(a.list3,b.list3)List3 ,concat(a.list4,b.list4)List4,\
concat(a.list5,b.list5)List5 ,\
concat(a.list6,b.list6)List6 \
from A a inner join B b on a.key1=b.key1 order by a.key1").show(truncate=False)
+----+----+------------+----------------+------+--------------+----------+----------+
|key1|key2|List1 |List2 |List3 |List4 |List5 |List6 |
+----+----+------------+----------------+------+--------------+----------+----------+
|b |11 |[car3, car8]|[price3, price8]|[2, 8]|[False][False]|[1.0, 2.0]|[0, mnfaf]|
|c |23 |[car3, car7]|[price3, price7]|[4, 7]|[False][False]|[2.5, 1.5]|[fdabh, 0]|
+----+----+------------+----------------+------+--------------+----------+----------+
我找到了我的问题的答案,并想post在这里分享给可能面临同样问题的其他人,并供我将来参考。
from pyspark.sql.types import BooleanType
from pyspark.sql.types import StringType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import ArrayType
from pyspark.sql.types import LongType
from pyspark.sql.types import ByteType
def concatTypesFunc(array1, array2):
final_array=array1+array2
return final_array
spark.udf.register("concat_types", concatTypesFunc,
ArrayType(BooleanType())
spark.udf.register("concat_types", concatTypesFunc,
ArrayType(StringType())
spark.udf.register("concat_types", concatTypesFunc,
ArrayType(DoubleType())
spark.udf.register("concat_types", concatTypesFunc,
ArrayType(IntegerType())
spark.udf.register("concat_types", concatTypesFunc,
ArrayType(LongType())
spark.udf.register("concat_types", concatTypesFunc,
ByteType(LongType())
df= spark.createDataFrame(
[
( "a", 121 ,["car1"] ,["price1"] ,[1] ,["False"] ,[0.000] ,["vfdvf"]),
( "b", 11 ,["car3"] ,["price3"] ,[2] ,["False"] ,[1.000] ,[00000]),
( "c", 23 ,["car3"] ,["price3"] ,[4] ,["False"] ,[2.500] ,["fdabh"]),
( "d", 250 ,["car6"] ,["price6"] ,[6] ,["True"] ,[0.450] ,[00000])
],table_schema
)
df2= spark.createDataFrame(
[
("m", 121 ,["car5"] ,["price5"] ,[5] ,["False"] ,[3.000] ,["vfdvf"]),
( "b", 11 ,["car8"] ,["price8"] ,[8] ,["False"] ,[2.000] ,["mnfaf"]),
( "c", 23 ,["car7"] ,["price7"] ,[7] ,["False"] ,[1.500] ,[00000]),
( "n", 250 ,["car9"] ,["price9"] ,[9] ,["False"] ,[0.450] ,[00000])
],table_schema
)
df.createOrReplaceTempView("a")
df2.createOrReplaceTempView("b")
spark.sql("select a.key1, a.key2, concat_types(a.list1,b.list1)List1 ,concat_types(a.list2,b.list2)List2, \
concat_types(a.list3,b.list3)List3 ,concat_types(a.list4,b.list4)List4,\
concat_types(a.list5,b.list5)List5 ,\
concat_types(a.list6,b.list6)List6 \
from a inner join b on a.key1=b.key1 order by a.key1").show(truncate=False)
+----+----+------------+----------------+------+--------------+----------+----------+
|key1|key2|List1 |List2 |List3 |List4 |List5 |List6 |
+----+----+------------+----------------+------+--------------+----------+----------+
|b |11 |[car3, car8]|[price3, price8]|[2, 8]|[False][False]|[1.0, 2.0]|[0, mnfaf]|
|c |23 |[car3, car7]|[price3, price7]|[4, 7]|[False][False]|[2.5, 1.5]|[fdabh, 0]|
+----+----+------------+----------------+------+--------------+----------+----------+