为什么我的 Dask Futures 卡在 'pending' 并且永远不会完成?

Why do my Dask Futures get stuck in 'pending' and never finish?

我有一些很长的 运行ning 代码(处理大约 5-10 分钟),我正在尝试 运行 作为 Dask Future。这是一系列的几个离散步骤,我可以 运行 作为一个函数:

result : Future = client.submit(my_function, arg1, arg2)

或者我可以分成中间步骤:

# compose the result from the same intermediate results but with Futures
intermediate1 = client.submit(my_function1, arg1)
intermediate2 = client.submit(my_function2, arg1, arg2)
intermediate3 = client.submit(my_function3, intermediate2, arg1)
result = client.submit(my_function4, intermediate3)

如果我 运行 在本地(例如,result = my_function(arg1, arg2)),它会完成。如果我将它提交给 Dask,我会立即取回我的 Future - 正如预期的那样 - 但工作永远不会完成。此外,如果我抓住 result.key 作为跟踪工作状态的一种方式,稍后将未来重建为 result = Future(key),它 总是 的状态为 pending.

我想先按原样 运行 获取它,这样我就可以将我的处理工作卸载到我的 Dask worker 而不是处理请求的 API,然后我想能够开始跨节点拆分工作,这样我就可以提高性能。但为什么我的工作只是蒸发?查看我的 Dask 调度程序 Web 界面,似乎没有显示作业。但我知道 Dask 正在运行,因为我可以从我的 Jupyter 笔记本向它提交代码。

我正在从 Flask 服务器调用 client.submit,我正在返回密钥以便稍后使用。大致:

@app.route('/submit')
def submit():
    # ...
    future = client.submit(my_function, arg1, arg2)
    return jsonify({"key": future.key})

@app.route('/status/<key>')
def status(key):
    future = Future(key)
    return jsonify({"status": future.status})

当我的应用程序部署到 Kubernetes 时,我的 /submit 路由返回了一个 Future 键,但是我的 Dask 状态页面没有显示任何处理任务。如果我在本地 运行 Flask,我会看到一个任务出现,并且我的工作 确实 在预期的延迟后出现;然而,当我使用从 /submit 返回的 Future 键点击我自己的 /status/<key> 路径时,它总是显示状态为 pending.

如果指向某个任务的所有 future 都消失了,那么 Dask 可以随意忘记该任务。这允许 Dask 清理工作,而不是让所有中间结果永远存在。

如果您想保留参考文献,那么您需要保留期货。这告诉 Dask 你仍然关心结果。您可以通过创建字典在您的 Flask 应用程序中本地执行此操作。

futures = {}

@app.route('/submit')
def submit():
    # ...
    future = client.submit(my_function, arg1, arg2)
    futures[future.key] = future
    return jsonify({"key": future.key})

@app.route('/status/<key>')
def status(key):
    future = futures[key]
    return jsonify({"status": future.status})

但您还需要考虑何时可以清理并释放这些期货。通过这种方法,你会慢慢填满你的记忆。