无法序列化调用外部 Web 服务的 Databricks UDF(PicklingError)
Databricks UDF calling an external web service cannot be serialised (PicklingError)
我正在使用 Databricks 并在数据框中有一列,我需要通过外部 Web 服务调用为每条记录更新该列。在本例中,它使用 Azure 机器学习服务 SDK 并进行服务调用。当不是 运行 作为 spark 中的 UDF(即只是 python)时,此代码工作正常,但是当我尝试将其作为 UDF 调用时,它会抛出序列化错误。如果我使用 lambda 和带有 rdd 的映射,也会发生同样的情况。
该模型使用 fastText,可以从 Postman 或 python 通过正常的 http 调用或使用来自 AMLS 的 WebService SDK 正常调用 - 只是当它是 UDF 时失败并显示此消息:
类型错误:无法腌制 _thread._local 个对象
我能想到的唯一解决方法是按顺序循环遍历数据帧中的每条记录并通过调用更新记录,但这不是很有效。我不知道这是 spark 错误还是因为服务正在加载 fasttext 模型。当我使用 UDF 并模拟一个 return 值时,它仍然有效。
底部错误...
from azureml.core.webservice import Webservice, AciWebservice
from azureml.core import Workspace
def predictModelValue2(summary, modelName, modelLabel):
raw_data = '[{"label": "' + modelLabel + '", "model": "' + modelName + '", "as_full_account": "' + summary + '"}]'
prediction = service.run(raw_data)
return prediction
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf
predictModelValueUDF = udf(predictModelValue2)
DVIRCRAMFItemsDFScored1 = DVIRCRAMFItemsDF.withColumn("Result", predictModelValueUDF("Summary", "ModelName", "ModelLabel"))
TypeError: can't pickle _thread._local objects
During handling of the above exception, another exception occurred:
PicklingError Traceback (most recent call
last) in
----> 2 x = df.withColumn("Result", predictModelValueUDF("Summary",
"ModelName", "ModelLabel"))
/databricks/spark/python/pyspark/sql/udf.py in wrapper(*args)
194 @functools.wraps(self.func, assigned=assignments)
195 def wrapper(*args):
--> 196 return self(*args)
197
198 wrapper.name = self._name
/databricks/spark/python/pyspark/sql/udf.py in call(self, *cols)
172
173 def call(self, *cols):
--> 174 judf = self._judf
175 sc = SparkContext._active_spark_context
176 return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
/databricks/spark/python/pyspark/sql/udf.py in _judf(self)
156 # and should have a minimal performance impact.
157 if self._judf_placeholder is None:
--> 158 self._judf_placeholder = self._create_judf()
159 return self._judf_placeholder
160
/databricks/spark/python/pyspark/sql/udf.py in _create_judf(self)
165 sc = spark.sparkContext
166
--> 167 wrapped_func = _wrap_function(sc, self.func, self.returnType)
168 jdt = spark._jsparkSession.parseDataType(self.returnType.json())
169 judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
/databricks/spark/python/pyspark/sql/udf.py in _wrap_function(sc,
func, returnType)
33 def _wrap_function(sc, func, returnType):
34 command = (func, returnType)
---> 35 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
36 return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
37 sc.pythonVer, broadcast_vars, sc._javaAccumulator)
/databricks/spark/python/pyspark/rdd.py in _prepare_for_python_RDD(sc,
command) 2461 # the serialized command will be compressed by
broadcast 2462 ser = CloudPickleSerializer()
-> 2463 pickled_command = ser.dumps(command) 2464 if len(pickled_command) >
sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc): # Default 1M
2465 # The broadcast will have same life cycle as created
PythonRDD
/databricks/spark/python/pyspark/serializers.py in dumps(self, obj)
709 msg = "Could not serialize object: %s: %s" % (e.class.name, emsg)
710 cloudpickle.print_exec(sys.stderr)
--> 711 raise pickle.PicklingError(msg)
712
713
PicklingError: Could not serialize object: TypeError: can't pickle
_thread._local objects
我不是 DataBricks 或 Spark 方面的专家,但是当您接触像 service
对象这样的复杂对象时,从本地笔记本上下文中挑选函数总是有问题的。在这种特殊情况下,我会建议删除对 azureML service
对象的依赖,并仅使用 requests
来调用该服务。
从服务中提取密钥:
# retrieve the API keys. two keys were generated.
key1, key2 = service.get_keys()
scoring_uri = service.scoring_uri
您应该能够直接在 UDF 中使用这些字符串而不会出现 pickling 问题 -- here is an example 您将如何仅通过请求调用服务。以下应用于您的 UDF:
import requests, json
def predictModelValue2(summary, modelName, modelLabel):
input_data = json.dumps({"summary": summary, "modelName":, ....})
headers = {'Content-Type':'application/json', 'Authorization': 'Bearer ' + key1}
# call the service for scoring
resp = requests.post(scoring_uri, input_data, headers=headers)
return resp.text[1]
不过,在侧节点上:您的 UDF 将针对数据框中的每一行进行调用,并且每次它都会进行网络调用——这将非常慢。我建议寻找批量执行的方法。正如您从构造的 json service.run
中看到的那样,将接受一个项目数组,因此您应该以 100s 左右为单位调用它。
我正在使用 Databricks 并在数据框中有一列,我需要通过外部 Web 服务调用为每条记录更新该列。在本例中,它使用 Azure 机器学习服务 SDK 并进行服务调用。当不是 运行 作为 spark 中的 UDF(即只是 python)时,此代码工作正常,但是当我尝试将其作为 UDF 调用时,它会抛出序列化错误。如果我使用 lambda 和带有 rdd 的映射,也会发生同样的情况。
该模型使用 fastText,可以从 Postman 或 python 通过正常的 http 调用或使用来自 AMLS 的 WebService SDK 正常调用 - 只是当它是 UDF 时失败并显示此消息:
类型错误:无法腌制 _thread._local 个对象
我能想到的唯一解决方法是按顺序循环遍历数据帧中的每条记录并通过调用更新记录,但这不是很有效。我不知道这是 spark 错误还是因为服务正在加载 fasttext 模型。当我使用 UDF 并模拟一个 return 值时,它仍然有效。
底部错误...
from azureml.core.webservice import Webservice, AciWebservice
from azureml.core import Workspace
def predictModelValue2(summary, modelName, modelLabel):
raw_data = '[{"label": "' + modelLabel + '", "model": "' + modelName + '", "as_full_account": "' + summary + '"}]'
prediction = service.run(raw_data)
return prediction
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf
predictModelValueUDF = udf(predictModelValue2)
DVIRCRAMFItemsDFScored1 = DVIRCRAMFItemsDF.withColumn("Result", predictModelValueUDF("Summary", "ModelName", "ModelLabel"))
TypeError: can't pickle _thread._local objects
During handling of the above exception, another exception occurred:
PicklingError Traceback (most recent call last) in ----> 2 x = df.withColumn("Result", predictModelValueUDF("Summary", "ModelName", "ModelLabel"))
/databricks/spark/python/pyspark/sql/udf.py in wrapper(*args) 194 @functools.wraps(self.func, assigned=assignments) 195 def wrapper(*args): --> 196 return self(*args) 197 198 wrapper.name = self._name
/databricks/spark/python/pyspark/sql/udf.py in call(self, *cols) 172 173 def call(self, *cols): --> 174 judf = self._judf 175 sc = SparkContext._active_spark_context 176 return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
/databricks/spark/python/pyspark/sql/udf.py in _judf(self) 156 # and should have a minimal performance impact. 157 if self._judf_placeholder is None: --> 158 self._judf_placeholder = self._create_judf() 159 return self._judf_placeholder 160
/databricks/spark/python/pyspark/sql/udf.py in _create_judf(self) 165 sc = spark.sparkContext 166 --> 167 wrapped_func = _wrap_function(sc, self.func, self.returnType) 168 jdt = spark._jsparkSession.parseDataType(self.returnType.json()) 169 judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
/databricks/spark/python/pyspark/sql/udf.py in _wrap_function(sc, func, returnType) 33 def _wrap_function(sc, func, returnType): 34 command = (func, returnType) ---> 35 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) 36 return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec, 37 sc.pythonVer, broadcast_vars, sc._javaAccumulator)
/databricks/spark/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command) 2461 # the serialized command will be compressed by broadcast 2462 ser = CloudPickleSerializer() -> 2463 pickled_command = ser.dumps(command) 2464 if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc): # Default 1M
2465 # The broadcast will have same life cycle as created PythonRDD/databricks/spark/python/pyspark/serializers.py in dumps(self, obj) 709 msg = "Could not serialize object: %s: %s" % (e.class.name, emsg) 710 cloudpickle.print_exec(sys.stderr) --> 711 raise pickle.PicklingError(msg) 712 713
PicklingError: Could not serialize object: TypeError: can't pickle _thread._local objects
我不是 DataBricks 或 Spark 方面的专家,但是当您接触像 service
对象这样的复杂对象时,从本地笔记本上下文中挑选函数总是有问题的。在这种特殊情况下,我会建议删除对 azureML service
对象的依赖,并仅使用 requests
来调用该服务。
从服务中提取密钥:
# retrieve the API keys. two keys were generated.
key1, key2 = service.get_keys()
scoring_uri = service.scoring_uri
您应该能够直接在 UDF 中使用这些字符串而不会出现 pickling 问题 -- here is an example 您将如何仅通过请求调用服务。以下应用于您的 UDF:
import requests, json
def predictModelValue2(summary, modelName, modelLabel):
input_data = json.dumps({"summary": summary, "modelName":, ....})
headers = {'Content-Type':'application/json', 'Authorization': 'Bearer ' + key1}
# call the service for scoring
resp = requests.post(scoring_uri, input_data, headers=headers)
return resp.text[1]
不过,在侧节点上:您的 UDF 将针对数据框中的每一行进行调用,并且每次它都会进行网络调用——这将非常慢。我建议寻找批量执行的方法。正如您从构造的 json service.run
中看到的那样,将接受一个项目数组,因此您应该以 100s 左右为单位调用它。