Azure Databricks 中的多处理

Multi-processing in Azure Databricks

我最近接到任务,将 JSON 响应提取到 Databricks Delta-lake 上。我必须使用不同的参数点击 REST API 端点 URL 6500 次并拉取响应。

我尝试了 multiprocessing 库中的两个模块,ThreadPool 和 Pool,使每次执行都快一点。

线程池:

  1. 当 Azure Databricks 集群设置为从 2 个工作节点自动缩放到 13 个工作节点时,如何为 ThreadPool 选择线程数?

现在,我设置了 n_pool = multiprocessing.cpu_count(),如果集群自动缩放,会有什么不同吗?

  1. 当我使用 Pool 时使用处理器而不是线程。我在每次执行时随机看到以下错误。好吧,我从错误中了解到 Spark Session/Conf 丢失了,我需要从每个进程中设置它。但是我在启用了默认 spark 会话的 Databricks 上,那为什么我会看到这些错误。
Py4JError: SparkConf does not exist in the JVM 
**OR** 
py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM
  1. 最后,计划用 'concurrent.futures.ProcessPoolExecutor' 替换多处理。这有什么区别吗?

您可以尝试以下方式解决

Py4JError: SparkConf does not exist in the JVM
**OR** 
py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM

错误

Install findspark 
$pip install findspark
Code:
import findsparkfindspark.init()

参考文献:Py4JError: SparkConf does not exist in the JVM and py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM

如果您正在使用线程池,它们将 运行 仅在驱动程序节点上,执行程序将处于空闲状态。相反,您需要使用 Spark 本身来并行化请求。这通常是通过创建一个包含 URLs 列表的数据框(如果基数 URL 相同,则 URL 的参数)创建一个数据框,然后使用 Spark user defined function 进行实际操作要求。像这样:

import urllib

df = spark.createDataFrame([("url1", "params1"), ("url2", "params2")], 
                           ("url", "params"))

@udf("body string, status int")
def do_request(url: str, params: str):
  full_url = url + "?" + params # adjust this as required
  with urllib.request.urlopen(full_url) as f:
    status = f.status
    body = f.read().decode("utf-8")
  
  return {'status': status, 'body': body}
  

res = df.withColumn("result", do_requests(col("url"), col("params")))

这将 return 数据框包含一个名为 result 的新列,该列将包含两个字段 - statusbody(JSON 以字符串形式回答) .