Dask Delayed Error - AttributeError: '_thread._local' object has no attribute 'value'
Dask Delayed Error - AttributeError: '_thread._local' object has no attribute 'value'
我绞尽脑汁想弄清楚为什么我不能在 Dask 上执行这个可并行化的函数。本质上,我有一个加载到 keras 模型中的函数(我正在使用 mlflow 存储模型),然后对我批量发送的一些输入数据使用模型的预测方法。这段代码(下方)导致以下错误:
AttributeError: '_thread._local' object has no attribute 'value'
代码示例:
@delayed
def load_and_predict(input_data_chunk):
def contrastive_loss(y_true, y_pred):
margin = 1
square_pred = K.square(y_pred)
margin_square = K.square(K.maximum(margin - y_pred, 0))
return K.mean(y_true * square_pred + (1 - y_true) * margin_square)
mlflow.set_tracking_uri('<tracking_uri>')
mlflow.set_experiment('experiment_name')
runs = mlflow.search_runs()
artifact_uri = runs.loc[runs['start_time'].idxmax()]['artifact_uri']
model = mlflow.keras.load_model(artifact_uri + '/model', custom_objects={'contrastive_loss': contrastive_loss})
y_pred = model.predict(input_data_chunk)
return y_pred
with Client(<scheduler_ip:port>) as client:
batch_array = np.array_split(X_test, 10)
results = []
for batch in batch_array:
prediction = load_and_predict(batch)
results.append(prediction)
compute(*results)
诚然,我对 Dask 很陌生,因此非常感谢有关此问题的任何专家指导。
如果您打算使用您在上下文中创建的 Client
进行计算,那么 compute()
行也必须在上下文中:请将其缩进。
我绞尽脑汁想弄清楚为什么我不能在 Dask 上执行这个可并行化的函数。本质上,我有一个加载到 keras 模型中的函数(我正在使用 mlflow 存储模型),然后对我批量发送的一些输入数据使用模型的预测方法。这段代码(下方)导致以下错误:
AttributeError: '_thread._local' object has no attribute 'value'
代码示例:
@delayed
def load_and_predict(input_data_chunk):
def contrastive_loss(y_true, y_pred):
margin = 1
square_pred = K.square(y_pred)
margin_square = K.square(K.maximum(margin - y_pred, 0))
return K.mean(y_true * square_pred + (1 - y_true) * margin_square)
mlflow.set_tracking_uri('<tracking_uri>')
mlflow.set_experiment('experiment_name')
runs = mlflow.search_runs()
artifact_uri = runs.loc[runs['start_time'].idxmax()]['artifact_uri']
model = mlflow.keras.load_model(artifact_uri + '/model', custom_objects={'contrastive_loss': contrastive_loss})
y_pred = model.predict(input_data_chunk)
return y_pred
with Client(<scheduler_ip:port>) as client:
batch_array = np.array_split(X_test, 10)
results = []
for batch in batch_array:
prediction = load_and_predict(batch)
results.append(prediction)
compute(*results)
诚然,我对 Dask 很陌生,因此非常感谢有关此问题的任何专家指导。
如果您打算使用您在上下文中创建的 Client
进行计算,那么 compute()
行也必须在上下文中:请将其缩进。