PySpark Working with Delta tables - For Loop Optimization with Union
PySpark Working with Delta tables - For Loop Optimization with Union
我目前在数据块中工作,并且有一个包含 20 多列的增量 table。我基本上需要从每一行的 1 列中取一个值,将它发送到一个 api 其中 returns 两个 values/columns,然后创建另外 26 个以将值合并回原始值增量 table。所以输入是 28 列,输出是 28 列。目前我的代码如下:
from pyspark.sql.types import *
from pyspark.sql import functions as F
import requests, uuid, json
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col,lit
from functools import reduce
spark.conf.set("spark.sql.adaptive.enabled","true")
spark.conf.set("spark.databricks.adaptive.autoOptimizeShuffle.enabled", "true")
spark.sql('set spark.sql.execution.arrow.pyspark.enabled = true')
spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true")
spark.conf.set("spark.sql.parquet.compression.codec","gzip")
spark.conf.set("spark.sql.inMemorycolumnarStorage.compressed","true")
spark.conf.set("spark.databricks.optimizer.dynamicFilePruning","true");
output=spark.sql("select * from delta.`table`").cache()
SeriesAppend=[]
for i in output.collect():
#small mapping fix
if i['col1']=='val1':
var0='a'
elif i['col1']=='val2':
var0='b'
elif i['col1']=='val3':
var0='c'
elif i['col1']=='val4':
var0='d'
var0=set([var0])
req_var = set(['a','b','c','d'])
var_list=list(req_var-var0)
#subscription info
headers = {header}
body = [{
'text': i['col2']
}]
if len(i['col2'])<500:
request = requests.post(constructed_url, params=params, headers=headers, json=body)
response = request.json()
dumps=json.dumps(response[0])
loads = json.loads(dumps)
json_rdd = sc.parallelize(loads)
json_df = spark.read.json(json_rdd)
json_df = json_df.withColumn('col1',lit(i['col1']))
json_df = json_df.withColumn('col2',lit(i['col2']))
json_df = json_df.withColumn('col3',lit(i['col3']))
...
SeriesAppend.append(json_df)
else:
pass
Series_output=reduce(DataFrame.unionAll, SeriesAppend)
只有 3 列的样本 DF:
df = spark.createDataFrame(
[
("a", "cat","owner1"), # create your data here, be consistent in the types.
("b", "dog","owner2"),
("c", "fish","owner3"),
("d", "fox","owner4"),
("e", "rat","owner5"),
],
["col1", "col2", "col3"]) # add your column names here
我真的只需要将响应 + 其他列值写入增量 table,因此不一定需要数据帧,但还没有找到比上述方法更快的方法。现在,我可以 运行 5 个输入,其中 returns 15 个在 25.3 秒内没有 unionAll。加上工会,变成3分钟
最终输出如下:
df = spark.createDataFrame(
[
("a", "cat","owner1","MI", 48003), # create your data here, be consistent in the types.
("b", "dog","owner2", "MI", 48003),
("c", "fish","owner3","MI", 48003),
("d", "fox","owner4","MI", 48003),
("e", "rat","owner5","MI", 48003),
],
["col1", "col2", "col3", "col4", "col5"]) # add your column names here
我怎样才能使它在 spark 中更快?
正如我在评论中提到的,您应该使用 UDF 将更多的工作量分配给工作人员而不是 collect
并让一台机器(驱动程序)来 运行 全部。这是完全错误的方法,不可扩展。
# This is your main function, pure Python and you can unittest it in any way you want.
# The most important about this function is:
# - everything must be encapsulated inside the function, no global variable works here
def req(col1, col2):
if col1 == 'val1':
var0 = 'a'
elif col1 == 'val2':
var0 = 'b'
elif col1 == 'val3':
var0 = 'c'
elif col1 == 'val4':
var0 = 'd'
var0=set([var0])
req_var = set(['a','b','c','d'])
var_list = list(req_var - var0)
#subscription info
headers = {header} # !!! `header` must available **inside** this function, global won't work
body = [{
'text': col2
}]
if len(col2) < 500:
# !!! same as `header`, `constructed_url` must available **inside** this function, global won't work
request = requests.post(constructed_url, params=params, headers=headers, json=body)
response = request.json()
return (response.col4, response.col5)
else:
return None
# Now you wrap the function above into a Spark UDF.
# I'm using only 2 columns here as input, but you can use as many columns as you wish.
# Same as output, I'm using only a tuple with 2 elements, you can make it as many items as you wish.
df.withColumn('temp', F.udf(req, T.ArrayType(T.StringType()))('col1', 'col2')).show()
# Output
# +----+----+------+------------------+
# |col1|col2| col3| temp|
# +----+----+------+------------------+
# | a| cat|owner1|[foo_cat, bar_cat]|
# | b| dog|owner2|[foo_dog, bar_dog]|
# | c|fish|owner3| null|
# | d| fox|owner4| null|
# | e| rat|owner5| null|
# +----+----+------+------------------+
# Now all you have to do is extract the tuple and assign to separate columns
# (and delete temp column to cleanup)
(df
.withColumn('col4', F.col('temp')[0])
.withColumn('col5', F.col('temp')[1])
.drop('temp')
.show()
)
# Output
# +----+----+------+-------+-------+
# |col1|col2| col3| col4| col5|
# +----+----+------+-------+-------+
# | a| cat|owner1|foo_cat|bar_cat|
# | b| dog|owner2|foo_dog|bar_dog|
# | c|fish|owner3| null| null|
# | d| fox|owner4| null| null|
# | e| rat|owner5| null| null|
# +----+----+------+-------+-------+
我目前在数据块中工作,并且有一个包含 20 多列的增量 table。我基本上需要从每一行的 1 列中取一个值,将它发送到一个 api 其中 returns 两个 values/columns,然后创建另外 26 个以将值合并回原始值增量 table。所以输入是 28 列,输出是 28 列。目前我的代码如下:
from pyspark.sql.types import *
from pyspark.sql import functions as F
import requests, uuid, json
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col,lit
from functools import reduce
spark.conf.set("spark.sql.adaptive.enabled","true")
spark.conf.set("spark.databricks.adaptive.autoOptimizeShuffle.enabled", "true")
spark.sql('set spark.sql.execution.arrow.pyspark.enabled = true')
spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true")
spark.conf.set("spark.sql.parquet.compression.codec","gzip")
spark.conf.set("spark.sql.inMemorycolumnarStorage.compressed","true")
spark.conf.set("spark.databricks.optimizer.dynamicFilePruning","true");
output=spark.sql("select * from delta.`table`").cache()
SeriesAppend=[]
for i in output.collect():
#small mapping fix
if i['col1']=='val1':
var0='a'
elif i['col1']=='val2':
var0='b'
elif i['col1']=='val3':
var0='c'
elif i['col1']=='val4':
var0='d'
var0=set([var0])
req_var = set(['a','b','c','d'])
var_list=list(req_var-var0)
#subscription info
headers = {header}
body = [{
'text': i['col2']
}]
if len(i['col2'])<500:
request = requests.post(constructed_url, params=params, headers=headers, json=body)
response = request.json()
dumps=json.dumps(response[0])
loads = json.loads(dumps)
json_rdd = sc.parallelize(loads)
json_df = spark.read.json(json_rdd)
json_df = json_df.withColumn('col1',lit(i['col1']))
json_df = json_df.withColumn('col2',lit(i['col2']))
json_df = json_df.withColumn('col3',lit(i['col3']))
...
SeriesAppend.append(json_df)
else:
pass
Series_output=reduce(DataFrame.unionAll, SeriesAppend)
只有 3 列的样本 DF:
df = spark.createDataFrame(
[
("a", "cat","owner1"), # create your data here, be consistent in the types.
("b", "dog","owner2"),
("c", "fish","owner3"),
("d", "fox","owner4"),
("e", "rat","owner5"),
],
["col1", "col2", "col3"]) # add your column names here
我真的只需要将响应 + 其他列值写入增量 table,因此不一定需要数据帧,但还没有找到比上述方法更快的方法。现在,我可以 运行 5 个输入,其中 returns 15 个在 25.3 秒内没有 unionAll。加上工会,变成3分钟
最终输出如下:
df = spark.createDataFrame(
[
("a", "cat","owner1","MI", 48003), # create your data here, be consistent in the types.
("b", "dog","owner2", "MI", 48003),
("c", "fish","owner3","MI", 48003),
("d", "fox","owner4","MI", 48003),
("e", "rat","owner5","MI", 48003),
],
["col1", "col2", "col3", "col4", "col5"]) # add your column names here
我怎样才能使它在 spark 中更快?
正如我在评论中提到的,您应该使用 UDF 将更多的工作量分配给工作人员而不是 collect
并让一台机器(驱动程序)来 运行 全部。这是完全错误的方法,不可扩展。
# This is your main function, pure Python and you can unittest it in any way you want.
# The most important about this function is:
# - everything must be encapsulated inside the function, no global variable works here
def req(col1, col2):
if col1 == 'val1':
var0 = 'a'
elif col1 == 'val2':
var0 = 'b'
elif col1 == 'val3':
var0 = 'c'
elif col1 == 'val4':
var0 = 'd'
var0=set([var0])
req_var = set(['a','b','c','d'])
var_list = list(req_var - var0)
#subscription info
headers = {header} # !!! `header` must available **inside** this function, global won't work
body = [{
'text': col2
}]
if len(col2) < 500:
# !!! same as `header`, `constructed_url` must available **inside** this function, global won't work
request = requests.post(constructed_url, params=params, headers=headers, json=body)
response = request.json()
return (response.col4, response.col5)
else:
return None
# Now you wrap the function above into a Spark UDF.
# I'm using only 2 columns here as input, but you can use as many columns as you wish.
# Same as output, I'm using only a tuple with 2 elements, you can make it as many items as you wish.
df.withColumn('temp', F.udf(req, T.ArrayType(T.StringType()))('col1', 'col2')).show()
# Output
# +----+----+------+------------------+
# |col1|col2| col3| temp|
# +----+----+------+------------------+
# | a| cat|owner1|[foo_cat, bar_cat]|
# | b| dog|owner2|[foo_dog, bar_dog]|
# | c|fish|owner3| null|
# | d| fox|owner4| null|
# | e| rat|owner5| null|
# +----+----+------+------------------+
# Now all you have to do is extract the tuple and assign to separate columns
# (and delete temp column to cleanup)
(df
.withColumn('col4', F.col('temp')[0])
.withColumn('col5', F.col('temp')[1])
.drop('temp')
.show()
)
# Output
# +----+----+------+-------+-------+
# |col1|col2| col3| col4| col5|
# +----+----+------+-------+-------+
# | a| cat|owner1|foo_cat|bar_cat|
# | b| dog|owner2|foo_dog|bar_dog|
# | c|fish|owner3| null| null|
# | d| fox|owner4| null| null|
# | e| rat|owner5| null| null|
# +----+----+------+-------+-------+