如何有效地迭代 pyspark 中的一个非常大的列表
How to efficiently iterate over a very large list in pyspark
我下面有一个 sample
数据框:
firstname
middlename
lastname
id
gender
salary
James
Smith
36636
M
3000
Michael
Rose
40288
M
4000
Robert
Williams
42114
M
4000
Maria
Anne
Jones
39192
F
4000
Jen
Mary
Brown
F
-1
现在我想将其转换成如下所示的 JSON
列表:
[{'firstname': 'James', 'middlename': '', 'lastname': 'Smith', 'id': '36636', 'gender': 'M', 'salary': 3000}, {'firstname': 'Michael', 'middlename': 'Rose', 'lastname': '', 'id': '40288', 'gender': 'M', 'salary': 4000}, {'firstname': 'Robert', 'middlename': '', 'lastname': 'Williams', 'id': '42114', 'gender': 'M', 'salary': 4000}, {'firstname': 'Maria', 'middlename': 'Anne', 'lastname': 'Jones', 'id': '39192', 'gender': 'F', 'salary': 4000}, {'firstname': 'Jen', 'middlename': 'Mary', 'lastname': 'Brown', 'id': '', 'gender': 'F', 'salary': -1}]
我使用以下代码做到了这一点:
result = json.loads((df.toPandas().to_json(orient="records")))
现在我要做的是,我要将JSON
条记录一条一条发送出去,然后点击API
。我无法一次发送所有记录,还有 millions
条记录要发送。那么,我如何使用 Map()
或其他方式隔离这些记录,以便它以分布式方式工作?当我在此列表上迭代 for loop
时效果很好,但需要时间。所以想为这个用例实现最有效的方法。 for循环代码如下:
for i in result_json:
try:
token = get_token(tokenUrl, tokenBody)
custRequestBody = {
"Token": token,
"CustomerName": "",
"Object": "",
"Data": [i]
}
#print("::::Customer Request Body::::::")
#print(custRequestBody)
response = call_to_cust_bulk_api(apiUrl, custRequestBody)
output = {
"headers": {
"Content-Type": "",
"X-Content-Type-Options": "",
"X-XSS-Protection": "",
"X-Frame-Options": "DENY",
"Strict-Transport-Security": ""
},
"body": {
"Response code": 200,
"ResponseMessage": response
}
}
此处,result_json
已转换为 JSON
记录列表:
您可以使用 udf(用户定义函数)对您的 df 执行操作 row-wise。
Spark 将 运行 以分布式方式在所有执行程序上执行此功能
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
# Your custom function you want to run in pyspark
@udf(returnType=IntegerType())
def parse_and_post(*args):
print(args, type(args)) # args is of type typle
# Convert the args tuple to json
# Send the json to API
# Return a Status value based on API success of failure
"""if success:
return 200
else
return -1"""
df = spark.createDataFrame([(1, "John Doe", 21), (2, "Simple", 33)], ("id", "name", "age"))
# Apply the UDF to your Dataframe (called "df")
new_df = df.withColumn("post_status", parse_and_post( *[df[x] for x in df.columns] ))
备注
您可能想在 df 上调用 collect()
函数,然后迭代行,但它会将所有数据加载到驱动程序中。这超出了分布式计算的目的。
此外,由于 spark 的惰性求值 use/show new_df,该函数将不会执行。
阅读更多关于 udf 的内容here
我下面有一个 sample
数据框:
firstname | middlename | lastname | id | gender | salary |
---|---|---|---|---|---|
James | Smith | 36636 | M | 3000 | |
Michael | Rose | 40288 | M | 4000 | |
Robert | Williams | 42114 | M | 4000 | |
Maria | Anne | Jones | 39192 | F | 4000 |
Jen | Mary | Brown | F | -1 |
现在我想将其转换成如下所示的 JSON
列表:
[{'firstname': 'James', 'middlename': '', 'lastname': 'Smith', 'id': '36636', 'gender': 'M', 'salary': 3000}, {'firstname': 'Michael', 'middlename': 'Rose', 'lastname': '', 'id': '40288', 'gender': 'M', 'salary': 4000}, {'firstname': 'Robert', 'middlename': '', 'lastname': 'Williams', 'id': '42114', 'gender': 'M', 'salary': 4000}, {'firstname': 'Maria', 'middlename': 'Anne', 'lastname': 'Jones', 'id': '39192', 'gender': 'F', 'salary': 4000}, {'firstname': 'Jen', 'middlename': 'Mary', 'lastname': 'Brown', 'id': '', 'gender': 'F', 'salary': -1}]
我使用以下代码做到了这一点:
result = json.loads((df.toPandas().to_json(orient="records")))
现在我要做的是,我要将JSON
条记录一条一条发送出去,然后点击API
。我无法一次发送所有记录,还有 millions
条记录要发送。那么,我如何使用 Map()
或其他方式隔离这些记录,以便它以分布式方式工作?当我在此列表上迭代 for loop
时效果很好,但需要时间。所以想为这个用例实现最有效的方法。 for循环代码如下:
for i in result_json:
try:
token = get_token(tokenUrl, tokenBody)
custRequestBody = {
"Token": token,
"CustomerName": "",
"Object": "",
"Data": [i]
}
#print("::::Customer Request Body::::::")
#print(custRequestBody)
response = call_to_cust_bulk_api(apiUrl, custRequestBody)
output = {
"headers": {
"Content-Type": "",
"X-Content-Type-Options": "",
"X-XSS-Protection": "",
"X-Frame-Options": "DENY",
"Strict-Transport-Security": ""
},
"body": {
"Response code": 200,
"ResponseMessage": response
}
}
此处,result_json
已转换为 JSON
记录列表:
您可以使用 udf(用户定义函数)对您的 df 执行操作 row-wise。 Spark 将 运行 以分布式方式在所有执行程序上执行此功能
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
# Your custom function you want to run in pyspark
@udf(returnType=IntegerType())
def parse_and_post(*args):
print(args, type(args)) # args is of type typle
# Convert the args tuple to json
# Send the json to API
# Return a Status value based on API success of failure
"""if success:
return 200
else
return -1"""
df = spark.createDataFrame([(1, "John Doe", 21), (2, "Simple", 33)], ("id", "name", "age"))
# Apply the UDF to your Dataframe (called "df")
new_df = df.withColumn("post_status", parse_and_post( *[df[x] for x in df.columns] ))
备注
您可能想在 df 上调用 collect()
函数,然后迭代行,但它会将所有数据加载到驱动程序中。这超出了分布式计算的目的。
此外,由于 spark 的惰性求值 use/show new_df,该函数将不会执行。
阅读更多关于 udf 的内容here