迭代 pyspark 数据帧并将每个值发送到 UDF

Iterate over pyspark dataframe and send each value to the UDF

我有一个如下所示的数据框:

+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+
|firstname|middlename|lastname|id   |gender|salary|meta                                                                                                |
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+
|James    |          |Smith   |36636|M     |3000  |{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}    |
|Michael  |Rose      |        |40288|M     |4000  |{"firstname":"Michael","middlename":"Rose","lastname":"","id":"40288","gender":"M","salary":4000}   |
|Robert   |          |Williams|42114|M     |4000  |{"firstname":"Robert","middlename":"","lastname":"Williams","id":"42114","gender":"M","salary":4000}|
|Maria    |Anne      |Jones   |39192|F     |4000  |{"firstname":"Maria","middlename":"Anne","lastname":"Jones","id":"39192","gender":"F","salary":4000}|
|Jen      |Mary      |Brown   |     |F     |-1    |{"firstname":"Jen","middlename":"Mary","lastname":"Brown","id":"","gender":"F","salary":-1}         |
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+  

现在,有一个 UDF,我需要对其进行迭代 meta 列并将每一行传递给 UDF。但是,我只能通过第一行。

代码如下:

def parse_and_post(col):
    for i in col.collect():
        print(i)
        result = json.loads(i)
        print(result["firstname"])
        #Below is a sample check
        if result["firstname"] == "James":
            return 200
        else:
            return -1
        #Actual check is as follows
        #Format the record in the form of list
        #get token
        #response = send the record to the API
        #return the response


new_df_status = new_df.withColumn("status_of_each_record", lit(parse_and_post(new_df.rdd.map(lambda x: x["meta"]))))  

当我执行这段代码时,我得到如下输出。但是,只有第一条记录的 status 应该是 200,其余的应该是 -1:

{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}
James
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+---------------------+
|firstname|middlename|lastname|id   |gender|salary|meta                                                                                                |status_of_each_record|
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+---------------------+
|James    |          |Smith   |36636|M     |3000  |{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}    |200                  |
|Michael  |Rose      |        |40288|M     |4000  |{"firstname":"Michael","middlename":"Rose","lastname":"","id":"40288","gender":"M","salary":4000}   |200                  |
|Robert   |          |Williams|42114|M     |4000  |{"firstname":"Robert","middlename":"","lastname":"Williams","id":"42114","gender":"M","salary":4000}|200                  |
|Maria    |Anne      |Jones   |39192|F     |4000  |{"firstname":"Maria","middlename":"Anne","lastname":"Jones","id":"39192","gender":"F","salary":4000}|200                  |
|Jen      |Mary      |Brown   |     |F     |-1    |{"firstname":"Jen","middlename":"Mary","lastname":"Brown","id":"","gender":"F","salary":-1}         |200                  |
+---------+----------+--------+-----+------+------+----------------------------------------------------------------------------------------------------+---------------------+

如何遍历列 meta 的每一行。我到底做错了什么?

我认为这里的主要问题是用户定义函数期望每行调用一次,而不是传递整个数据帧。所以对我来说,以下作品:

new_df = ctx.spark.createDataFrame((
    ["James", "", "Smith",  36636, "M", 3000, '{"firstname":"James","middlename":"","lastname":"Smith","id":"36636","gender":"M","salary":3000}'],
    ["Michael", "Rose", "", 40288, "M", 4000, '{"firstname":"Michael","middlename":"Rose","lastname":"","id":"40288","gender":"M","salary":4000}'],
    ["Robert", "", "Williams", 42114, "M", 4000, '{"firstname":"Robert","middlename":"","lastname":"Williams","id":"42114","gender":"M","salary":4000}'],
    ["Maria", "Anne", "Jones", 39192,"F", 4000, '{"firstname":"Maria","middlename":"Anne","lastname":"Jones","id":"39192","gender":"F","salary":4000}'],
    ["Jen", "Mary", "Brown", None, "F", -1, '{"firstname":"Jen","middlename":"Mary","lastname":"Brown","id":"","gender":"F","salary":-1}']
)).toDF("firstname", "middlenmame", "lastname", "id", "gender", "salary", "meta")


@udf()
def parse_and_post(meta):
    result = json.loads(meta)
    print(result["firstname"])
    if result["firstname"] == "James":
        return 200
    else:
        return -1


new_df_status = new_df.withColumn(
    "status_of_each_record", parse_and_post(new_df.meta)) 

在您的示例中,您期望整个数据框作为 parse_and_post 的输入,但在这里我们一次只期望一行。这也简化了我们创建新列的方式。

你真的需要 UDF 吗?

您可能要考虑的第二件事是您是否可以完全不使用 UDF 就逃脱?使用 UDF 有点像性能杀手,而且很多时候你可以不用它。例如:

from pyspark.sql.types import StructType, StructField
from pyspark.sql import functions as f

# Let spark know what shape of json data to expect. We can ignore
# fields we don't care about with it being a problem
schema = StructType([StructField("firstname", StringType())])

new_df_status = new_df.withColumn(
    "status_of_each_record", 
    f.when(f.from_json(new_df.meta, schema).firstname == "James", 200)
    .otherwise(-1)
)  

new_df_status.show()

即使假设您提供了一个玩具示例,也值得让 Spark 尽可能多地完成繁重的工作(例如 json 解析),因为该部分可能会大规模发生。

对于逐行摄取,请参考@Jon Betts 的方法。

如果您为 API 追求批量 POST,并且如果 API 接受 meta 数据数组,您可以执行以下操作。这应该会减少 API 调用的次数,这通常会更有效率。

您可以先创建元列表JSON。

如果id分布好

from pyspark.sql import functions as F

num_split = 10 # depends on how big is your data and how much the API can handle.

df = (df.groupBy(F.col('id') % num_split)
      .agg(F.collect_list('meta')).alias('meta'))

@F.udf
def _call_bulk_api(meta_list):
    # call bulk API (PATCH)
    # The returned status is highly dependent on the API.
    return 200

df = df.withColumn('status', _call_bulk_api(F.col('meta')))

如果id分布不好,创建递增id。

df = (df.withColumn('sid', F.row_number().over(Window.orderBy('id')))
      .groupBy(F.col('sid') % num_split)
      .agg(F.collect_list('meta')).alias('meta'))