迭代 pyspark 数据帧并将每个值发送到 UDF
Iterate over pyspark dataframe and send each value to the UDF
我有一个如下所示的数据框:
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+
|firstname|middlename|lastname|id |gender|salary|meta |
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+
|James | |Smith |36636|M |3000 |{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000} |
|Michael |Rose | |40288|M |4000 |{"firstname":"Michael","middlename":"Rose","lastname":"","id":"40288","gender":"M","salary":4000} |
|Robert | |Williams|42114|M |4000 |{"firstname":"Robert","middlename":"","lastname":"Williams","id":"42114","gender":"M","salary":4000}|
|Maria |Anne |Jones |39192|F |4000 |{"firstname":"Maria","middlename":"Anne","lastname":"Jones","id":"39192","gender":"F","salary":4000}|
|Jen |Mary |Brown | |F |-1 |{"firstname":"Jen","middlename":"Mary","lastname":"Brown","id":"","gender":"F","salary":-1} |
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+
现在,有一个 UDF
,我需要对其进行迭代 meta
列并将每一行传递给 UDF
。但是,我只能通过第一行。
代码如下:
def parse_and_post(col):
for i in col.collect():
print(i)
result = json.loads(i)
print(result["firstname"])
#Below is a sample check
if result["firstname"] == "James":
return 200
else:
return -1
#Actual check is as follows
#Format the record in the form of list
#get token
#response = send the record to the API
#return the response
new_df_status = new_df.withColumn("status_of_each_record", lit(parse_and_post(new_df.rdd.map(lambda x: x["meta"]))))
当我执行这段代码时,我得到如下输出。但是,只有第一条记录的 status
应该是 200
,其余的应该是 -1
:
{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}
James
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+---------------------+
|firstname|middlename|lastname|id |gender|salary|meta |status_of_each_record|
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+---------------------+
|James | |Smith |36636|M |3000 |{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000} |200 |
|Michael |Rose | |40288|M |4000 |{"firstname":"Michael","middlename":"Rose","lastname":"","id":"40288","gender":"M","salary":4000} |200 |
|Robert | |Williams|42114|M |4000 |{"firstname":"Robert","middlename":"","lastname":"Williams","id":"42114","gender":"M","salary":4000}|200 |
|Maria |Anne |Jones |39192|F |4000 |{"firstname":"Maria","middlename":"Anne","lastname":"Jones","id":"39192","gender":"F","salary":4000}|200 |
|Jen |Mary |Brown | |F |-1 |{"firstname":"Jen","middlename":"Mary","lastname":"Brown","id":"","gender":"F","salary":-1} |200 |
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+---------------------+
如何遍历列 meta
的每一行。我到底做错了什么?
我认为这里的主要问题是用户定义函数期望每行调用一次,而不是传递整个数据帧。所以对我来说,以下作品:
new_df = ctx.spark.createDataFrame((
["James", "", "Smith", 36636, "M", 3000, '{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}'],
["Michael", "Rose", "", 40288, "M", 4000, '{"firstname":"Michael","middlename":"Rose","lastname":"","id":"40288","gender":"M","salary":4000}'],
["Robert", "", "Williams", 42114, "M", 4000, '{"firstname":"Robert","middlename":"","lastname":"Williams","id":"42114","gender":"M","salary":4000}'],
["Maria", "Anne", "Jones", 39192,"F", 4000, '{"firstname":"Maria","middlename":"Anne","lastname":"Jones","id":"39192","gender":"F","salary":4000}'],
["Jen", "Mary", "Brown", None, "F", -1, '{"firstname":"Jen","middlename":"Mary","lastname":"Brown","id":"","gender":"F","salary":-1}']
)).toDF("firstname", "middlenmame", "lastname", "id", "gender", "salary", "meta")
@udf()
def parse_and_post(meta):
result = json.loads(meta)
print(result["firstname"])
if result["firstname"] == "James":
return 200
else:
return -1
new_df_status = new_df.withColumn(
"status_of_each_record", parse_and_post(new_df.meta))
在您的示例中,您期望整个数据框作为 parse_and_post
的输入,但在这里我们一次只期望一行。这也简化了我们创建新列的方式。
你真的需要 UDF 吗?
您可能要考虑的第二件事是您是否可以完全不使用 UDF 就逃脱?使用 UDF 有点像性能杀手,而且很多时候你可以不用它。例如:
from pyspark.sql.types import StructType, StructField
from pyspark.sql import functions as f
# Let spark know what shape of json data to expect. We can ignore
# fields we don't care about with it being a problem
schema = StructType([StructField("firstname", StringType())])
new_df_status = new_df.withColumn(
"status_of_each_record",
f.when(f.from_json(new_df.meta, schema).firstname == "James", 200)
.otherwise(-1)
)
new_df_status.show()
即使假设您提供了一个玩具示例,也值得让 Spark 尽可能多地完成繁重的工作(例如 json
解析),因为该部分可能会大规模发生。
对于逐行摄取,请参考@Jon Betts 的方法。
如果您为 API 追求批量 POST,并且如果 API 接受 meta
数据数组,您可以执行以下操作。这应该会减少 API 调用的次数,这通常会更有效率。
您可以先创建元列表JSON。
如果id
分布好
from pyspark.sql import functions as F
num_split = 10 # depends on how big is your data and how much the API can handle.
df = (df.groupBy(F.col('id') % num_split)
.agg(F.collect_list('meta')).alias('meta'))
@F.udf
def _call_bulk_api(meta_list):
# call bulk API (PATCH)
# The returned status is highly dependent on the API.
return 200
df = df.withColumn('status', _call_bulk_api(F.col('meta')))
如果id
分布不好,创建递增id。
df = (df.withColumn('sid', F.row_number().over(Window.orderBy('id')))
.groupBy(F.col('sid') % num_split)
.agg(F.collect_list('meta')).alias('meta'))
我有一个如下所示的数据框:
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+
|firstname|middlename|lastname|id |gender|salary|meta |
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+
|James | |Smith |36636|M |3000 |{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000} |
|Michael |Rose | |40288|M |4000 |{"firstname":"Michael","middlename":"Rose","lastname":"","id":"40288","gender":"M","salary":4000} |
|Robert | |Williams|42114|M |4000 |{"firstname":"Robert","middlename":"","lastname":"Williams","id":"42114","gender":"M","salary":4000}|
|Maria |Anne |Jones |39192|F |4000 |{"firstname":"Maria","middlename":"Anne","lastname":"Jones","id":"39192","gender":"F","salary":4000}|
|Jen |Mary |Brown | |F |-1 |{"firstname":"Jen","middlename":"Mary","lastname":"Brown","id":"","gender":"F","salary":-1} |
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+
现在,有一个 UDF
,我需要对其进行迭代 meta
列并将每一行传递给 UDF
。但是,我只能通过第一行。
代码如下:
def parse_and_post(col):
for i in col.collect():
print(i)
result = json.loads(i)
print(result["firstname"])
#Below is a sample check
if result["firstname"] == "James":
return 200
else:
return -1
#Actual check is as follows
#Format the record in the form of list
#get token
#response = send the record to the API
#return the response
new_df_status = new_df.withColumn("status_of_each_record", lit(parse_and_post(new_df.rdd.map(lambda x: x["meta"]))))
当我执行这段代码时,我得到如下输出。但是,只有第一条记录的 status
应该是 200
,其余的应该是 -1
:
{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}
James
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+---------------------+
|firstname|middlename|lastname|id |gender|salary|meta |status_of_each_record|
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+---------------------+
|James | |Smith |36636|M |3000 |{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000} |200 |
|Michael |Rose | |40288|M |4000 |{"firstname":"Michael","middlename":"Rose","lastname":"","id":"40288","gender":"M","salary":4000} |200 |
|Robert | |Williams|42114|M |4000 |{"firstname":"Robert","middlename":"","lastname":"Williams","id":"42114","gender":"M","salary":4000}|200 |
|Maria |Anne |Jones |39192|F |4000 |{"firstname":"Maria","middlename":"Anne","lastname":"Jones","id":"39192","gender":"F","salary":4000}|200 |
|Jen |Mary |Brown | |F |-1 |{"firstname":"Jen","middlename":"Mary","lastname":"Brown","id":"","gender":"F","salary":-1} |200 |
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+---------------------+
如何遍历列 meta
的每一行。我到底做错了什么?
我认为这里的主要问题是用户定义函数期望每行调用一次,而不是传递整个数据帧。所以对我来说,以下作品:
new_df = ctx.spark.createDataFrame((
["James", "", "Smith", 36636, "M", 3000, '{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}'],
["Michael", "Rose", "", 40288, "M", 4000, '{"firstname":"Michael","middlename":"Rose","lastname":"","id":"40288","gender":"M","salary":4000}'],
["Robert", "", "Williams", 42114, "M", 4000, '{"firstname":"Robert","middlename":"","lastname":"Williams","id":"42114","gender":"M","salary":4000}'],
["Maria", "Anne", "Jones", 39192,"F", 4000, '{"firstname":"Maria","middlename":"Anne","lastname":"Jones","id":"39192","gender":"F","salary":4000}'],
["Jen", "Mary", "Brown", None, "F", -1, '{"firstname":"Jen","middlename":"Mary","lastname":"Brown","id":"","gender":"F","salary":-1}']
)).toDF("firstname", "middlenmame", "lastname", "id", "gender", "salary", "meta")
@udf()
def parse_and_post(meta):
result = json.loads(meta)
print(result["firstname"])
if result["firstname"] == "James":
return 200
else:
return -1
new_df_status = new_df.withColumn(
"status_of_each_record", parse_and_post(new_df.meta))
在您的示例中,您期望整个数据框作为 parse_and_post
的输入,但在这里我们一次只期望一行。这也简化了我们创建新列的方式。
你真的需要 UDF 吗?
您可能要考虑的第二件事是您是否可以完全不使用 UDF 就逃脱?使用 UDF 有点像性能杀手,而且很多时候你可以不用它。例如:
from pyspark.sql.types import StructType, StructField
from pyspark.sql import functions as f
# Let spark know what shape of json data to expect. We can ignore
# fields we don't care about with it being a problem
schema = StructType([StructField("firstname", StringType())])
new_df_status = new_df.withColumn(
"status_of_each_record",
f.when(f.from_json(new_df.meta, schema).firstname == "James", 200)
.otherwise(-1)
)
new_df_status.show()
即使假设您提供了一个玩具示例,也值得让 Spark 尽可能多地完成繁重的工作(例如 json
解析),因为该部分可能会大规模发生。
对于逐行摄取,请参考@Jon Betts 的方法。
如果您为 API 追求批量 POST,并且如果 API 接受 meta
数据数组,您可以执行以下操作。这应该会减少 API 调用的次数,这通常会更有效率。
您可以先创建元列表JSON。
如果id
分布好
from pyspark.sql import functions as F
num_split = 10 # depends on how big is your data and how much the API can handle.
df = (df.groupBy(F.col('id') % num_split)
.agg(F.collect_list('meta')).alias('meta'))
@F.udf
def _call_bulk_api(meta_list):
# call bulk API (PATCH)
# The returned status is highly dependent on the API.
return 200
df = df.withColumn('status', _call_bulk_api(F.col('meta')))
如果id
分布不好,创建递增id。
df = (df.withColumn('sid', F.row_number().over(Window.orderBy('id')))
.groupBy(F.col('sid') % num_split)
.agg(F.collect_list('meta')).alias('meta'))