如何有效地迭代 pyspark 中的一个非常大的列表

How to efficiently iterate over a very large list in pyspark

我下面有一个 sample 数据框:

firstname middlename lastname id gender salary
James Smith 36636 M 3000
Michael Rose 40288 M 4000
Robert Williams 42114 M 4000
Maria Anne Jones 39192 F 4000
Jen Mary Brown F -1

现在我想将其转换成如下所示的 JSON 列表:

[{'firstname': 'James', 'middlename': '', 'lastname': 'Smith', 'id': '36636', 'gender': 'M', 'salary': 3000}, {'firstname': 'Michael', 'middlename': 'Rose', 'lastname': '', 'id': '40288', 'gender': 'M', 'salary': 4000}, {'firstname': 'Robert', 'middlename': '', 'lastname': 'Williams', 'id': '42114', 'gender': 'M', 'salary': 4000}, {'firstname': 'Maria', 'middlename': 'Anne', 'lastname': 'Jones', 'id': '39192', 'gender': 'F', 'salary': 4000}, {'firstname': 'Jen', 'middlename': 'Mary', 'lastname': 'Brown', 'id': '', 'gender': 'F', 'salary': -1}]  

我使用以下代码做到了这一点:

result = json.loads((df.toPandas().to_json(orient="records")))

现在我要做的是,我要将JSON条记录一条一条发送出去,然后点击API。我无法一次发送所有记录,还有 millions 条记录要发送。那么,我如何使用 Map() 或其他方式隔离这些记录,以便它以分布式方式工作?当我在此列表上迭代 for loop 时效果很好,但需要时间。所以想为这个用例实现最有效的方法。 for循环代码如下:

for i in result_json:
            try:
                token = get_token(tokenUrl, tokenBody)
                custRequestBody = {
                            "Token": token,
                            "CustomerName": "",
                            "Object": "",
                            "Data": [i]
                }
                
                #print("::::Customer Request Body::::::")
                #print(custRequestBody)
                response = call_to_cust_bulk_api(apiUrl, custRequestBody)
                output = {
                "headers": {
                    "Content-Type": "",
                    "X-Content-Type-Options": "",
                    "X-XSS-Protection": "",
                    "X-Frame-Options": "DENY",
                    "Strict-Transport-Security": ""
                    },
                "body": {
                    "Response code": 200,
                    "ResponseMessage": response
                }
                }

此处,result_json 已转换为 JSON 记录列表:

您可以使用 udf(用户定义函数)对您的 df 执行操作 row-wise。 Spark 将 运行 以分布式方式在所有执行程序上执行此功能

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

# Your custom function you want to run in pyspark
@udf(returnType=IntegerType())
def parse_and_post(*args):
    print(args, type(args)) # args is of type typle
    # Convert the args tuple to json
    # Send the json to API
    # Return a Status value based on API success of failure
    """if success:
        return 200
    else
        return -1"""

df = spark.createDataFrame([(1, "John Doe", 21), (2, "Simple", 33)], ("id", "name", "age"))

# Apply the UDF to your Dataframe (called "df")
new_df = df.withColumn("post_status", parse_and_post( *[df[x] for x in df.columns] ))

备注

您可能想在 df 上调用 collect() 函数,然后迭代行,但它会将所有数据加载到驱动程序中。这超出了分布式计算的目的。

此外,由于 spark 的惰性求值 use/show new_df,该函数将不会执行。

阅读更多关于 udf 的内容here