Azure Databricks 中的多处理
Multi-processing in Azure Databricks
我最近接到任务,将 JSON 响应提取到 Databricks Delta-lake 上。我必须使用不同的参数点击 REST API 端点 URL 6500 次并拉取响应。
我尝试了 multiprocessing 库中的两个模块,ThreadPool 和 Pool,使每次执行都快一点。
线程池:
- 当 Azure Databricks 集群设置为从 2 个工作节点自动缩放到 13 个工作节点时,如何为 ThreadPool 选择线程数?
现在,我设置了 n_pool = multiprocessing.cpu_count(),如果集群自动缩放,会有什么不同吗?
池
- 当我使用 Pool 时使用处理器而不是线程。我在每次执行时随机看到以下错误。好吧,我从错误中了解到 Spark Session/Conf 丢失了,我需要从每个进程中设置它。但是我在启用了默认 spark 会话的 Databricks 上,那为什么我会看到这些错误。
Py4JError: SparkConf does not exist in the JVM
**OR**
py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM
- 最后,计划用 'concurrent.futures.ProcessPoolExecutor' 替换多处理。这有什么区别吗?
您可以尝试以下方式解决
Py4JError: SparkConf does not exist in the JVM
**OR**
py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM
错误
Install findspark
$pip install findspark
Code:
import findsparkfindspark.init()
参考文献:Py4JError: SparkConf does not exist in the JVM and py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM
如果您正在使用线程池,它们将 运行 仅在驱动程序节点上,执行程序将处于空闲状态。相反,您需要使用 Spark 本身来并行化请求。这通常是通过创建一个包含 URLs 列表的数据框(如果基数 URL 相同,则 URL 的参数)创建一个数据框,然后使用 Spark user defined function 进行实际操作要求。像这样:
import urllib
df = spark.createDataFrame([("url1", "params1"), ("url2", "params2")],
("url", "params"))
@udf("body string, status int")
def do_request(url: str, params: str):
full_url = url + "?" + params # adjust this as required
with urllib.request.urlopen(full_url) as f:
status = f.status
body = f.read().decode("utf-8")
return {'status': status, 'body': body}
res = df.withColumn("result", do_requests(col("url"), col("params")))
这将 return 数据框包含一个名为 result
的新列,该列将包含两个字段 - status
和 body
(JSON 以字符串形式回答) .
我最近接到任务,将 JSON 响应提取到 Databricks Delta-lake 上。我必须使用不同的参数点击 REST API 端点 URL 6500 次并拉取响应。
我尝试了 multiprocessing 库中的两个模块,ThreadPool 和 Pool,使每次执行都快一点。
线程池:
- 当 Azure Databricks 集群设置为从 2 个工作节点自动缩放到 13 个工作节点时,如何为 ThreadPool 选择线程数?
现在,我设置了 n_pool = multiprocessing.cpu_count(),如果集群自动缩放,会有什么不同吗?
池
- 当我使用 Pool 时使用处理器而不是线程。我在每次执行时随机看到以下错误。好吧,我从错误中了解到 Spark Session/Conf 丢失了,我需要从每个进程中设置它。但是我在启用了默认 spark 会话的 Databricks 上,那为什么我会看到这些错误。
Py4JError: SparkConf does not exist in the JVM
**OR**
py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM
- 最后,计划用 'concurrent.futures.ProcessPoolExecutor' 替换多处理。这有什么区别吗?
您可以尝试以下方式解决
Py4JError: SparkConf does not exist in the JVM
**OR**
py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM
错误
Install findspark
$pip install findspark
Code:
import findsparkfindspark.init()
参考文献:Py4JError: SparkConf does not exist in the JVM and py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM
如果您正在使用线程池,它们将 运行 仅在驱动程序节点上,执行程序将处于空闲状态。相反,您需要使用 Spark 本身来并行化请求。这通常是通过创建一个包含 URLs 列表的数据框(如果基数 URL 相同,则 URL 的参数)创建一个数据框,然后使用 Spark user defined function 进行实际操作要求。像这样:
import urllib
df = spark.createDataFrame([("url1", "params1"), ("url2", "params2")],
("url", "params"))
@udf("body string, status int")
def do_request(url: str, params: str):
full_url = url + "?" + params # adjust this as required
with urllib.request.urlopen(full_url) as f:
status = f.status
body = f.read().decode("utf-8")
return {'status': status, 'body': body}
res = df.withColumn("result", do_requests(col("url"), col("params")))
这将 return 数据框包含一个名为 result
的新列,该列将包含两个字段 - status
和 body
(JSON 以字符串形式回答) .