Spark 2.4 上带有字典的 UDF
UDFs with Dictionaries on Spark 2.4
我正在使用 Pyspark 2.4.4,我需要使用 UDF 来创建我想要的输出。此 UDF 使用广播字典。首先,看起来我需要修改 UDF 的代码以接受字典。其次,我不确定我正在做的是进入 Spark 2.4 的最有效方式。我的代码如下:
# This is a sample of the original Spark dataframe, which I will use to create the dictionary
df = spark.createDataFrame([(220, 2, '2012-11-22 22:03:42'), (2382556,3, '2012-11-23 22:03:42'), (7854140,3,'2012-11-28 22:03:42')], ["user", "preacher", "time"])
# I am converting the above dataframe to pandas dataframe in order to create my dictionary
Dict = df.toPandas().groupby('preacher')['user','time'].apply(lambda g: list(map(tuple, g.values.tolist()))).to_dict()
#Broadcast the dictionary
pcDict = sc.broadcast(Dict)
## Function that calls the dictionary
def example(n):
nodes = []
children = [i[0] for i in pcD.value[n]]
for child in children:
nodes.append(child)
return Row('Out1', 'Out2')(nodes, [(n, n+2), (n, n+4)])
## Convert the Python function to UDF
schema = StructType([
StructField("Out1", ArrayType(IntegerType()), False),
StructField("Out2", ArrayType(StructType([StructField("_1", IntegerType(), False), StructField("_2", IntegerType(), False)])))])
example_udf = F.udf(example, schema)
# Create sample dataframe to test the UDF function
testDf = spark.createDataFrame([(3, 4), (220,5)], ["user", "Number"])
### Final output
newDf = testDf.withColumn("Output", F.explode(F.array(example_udf(testDf["user"]))))
newDf = newDf.select("user", "Output.*")
我的第一个问题是关于字典的。我应该使用它还是有其他更有效的方法?我在考虑 collectAsMap(),但考虑到它可用于 rdds,我不确定这是否是 Spark 2.4 的方式。
第二个问题,dictionary就是这样,udf函数应该怎么修改?
提前致谢!
关于第一个问题,我认为 pandas 提供了一种将数据转换为字典的优雅方式。尽管由于 pandas 将在一个节点中执行,您可能需要利用集群的力量,因此决定使用 Spark 版本。另一个因素是字典本身的大小。如果您确定字典可以很容易地放入一个节点中,您可以安全地保留 pandas 版本,否则请尝试下一个 Spark 代码:
from pyspark.sql import functions as F
# This is a sample of the original Spark dataframe, which I will use to create the dictionary
df = spark.createDataFrame([(220, 2, '2012-11-22 22:03:42'), (2382556,3, '2012-11-23 22:03:42'), (7854140,3,'2012-11-28 22:03:42')], ["user", "preacher", "time"])
df = df.rdd.map(lambda r: (r[1], (r[0], r[2]))) \
.toDF(["preacher", "tuple"]) \
.groupBy("preacher") \
.agg(F.collect_list("tuple").alias("tuple"))
dict = {}
for k,v in df.rdd.collectAsMap().items():
dict[k] = list(map(lambda row: (row[0], row[1]), v))
dict
# {3: [(2382556, '2012-11-23 22:03:42'), (7854140, '2012-11-28 22:03:42')],
# 2: [(220, '2012-11-22 22:03:42')]}
另外值得一提的是,Spark 会将程序中使用的所有局部变量打包并与每个任务一起发送。因此 broadcast
适用于应该存储在执行程序上以便任何任务都可以轻松访问的大变量。
我正在使用 Pyspark 2.4.4,我需要使用 UDF 来创建我想要的输出。此 UDF 使用广播字典。首先,看起来我需要修改 UDF 的代码以接受字典。其次,我不确定我正在做的是进入 Spark 2.4 的最有效方式。我的代码如下:
# This is a sample of the original Spark dataframe, which I will use to create the dictionary
df = spark.createDataFrame([(220, 2, '2012-11-22 22:03:42'), (2382556,3, '2012-11-23 22:03:42'), (7854140,3,'2012-11-28 22:03:42')], ["user", "preacher", "time"])
# I am converting the above dataframe to pandas dataframe in order to create my dictionary
Dict = df.toPandas().groupby('preacher')['user','time'].apply(lambda g: list(map(tuple, g.values.tolist()))).to_dict()
#Broadcast the dictionary
pcDict = sc.broadcast(Dict)
## Function that calls the dictionary
def example(n):
nodes = []
children = [i[0] for i in pcD.value[n]]
for child in children:
nodes.append(child)
return Row('Out1', 'Out2')(nodes, [(n, n+2), (n, n+4)])
## Convert the Python function to UDF
schema = StructType([
StructField("Out1", ArrayType(IntegerType()), False),
StructField("Out2", ArrayType(StructType([StructField("_1", IntegerType(), False), StructField("_2", IntegerType(), False)])))])
example_udf = F.udf(example, schema)
# Create sample dataframe to test the UDF function
testDf = spark.createDataFrame([(3, 4), (220,5)], ["user", "Number"])
### Final output
newDf = testDf.withColumn("Output", F.explode(F.array(example_udf(testDf["user"]))))
newDf = newDf.select("user", "Output.*")
我的第一个问题是关于字典的。我应该使用它还是有其他更有效的方法?我在考虑 collectAsMap(),但考虑到它可用于 rdds,我不确定这是否是 Spark 2.4 的方式。
第二个问题,dictionary就是这样,udf函数应该怎么修改?
提前致谢!
关于第一个问题,我认为 pandas 提供了一种将数据转换为字典的优雅方式。尽管由于 pandas 将在一个节点中执行,您可能需要利用集群的力量,因此决定使用 Spark 版本。另一个因素是字典本身的大小。如果您确定字典可以很容易地放入一个节点中,您可以安全地保留 pandas 版本,否则请尝试下一个 Spark 代码:
from pyspark.sql import functions as F
# This is a sample of the original Spark dataframe, which I will use to create the dictionary
df = spark.createDataFrame([(220, 2, '2012-11-22 22:03:42'), (2382556,3, '2012-11-23 22:03:42'), (7854140,3,'2012-11-28 22:03:42')], ["user", "preacher", "time"])
df = df.rdd.map(lambda r: (r[1], (r[0], r[2]))) \
.toDF(["preacher", "tuple"]) \
.groupBy("preacher") \
.agg(F.collect_list("tuple").alias("tuple"))
dict = {}
for k,v in df.rdd.collectAsMap().items():
dict[k] = list(map(lambda row: (row[0], row[1]), v))
dict
# {3: [(2382556, '2012-11-23 22:03:42'), (7854140, '2012-11-28 22:03:42')],
# 2: [(220, '2012-11-22 22:03:42')]}
另外值得一提的是,Spark 会将程序中使用的所有局部变量打包并与每个任务一起发送。因此 broadcast
适用于应该存储在执行程序上以便任何任务都可以轻松访问的大变量。