如何将 .pem 文件发送到 Dask 集群?

How to Send .pem file to Dask Cluster?

我有一个 dask 表达式,如下所示,我试图以分布式方式 运行 sqlalchemy 查询。但是,它引用了在 connect_args 参数中输入的 .pem 密钥文件。我如何将这个密钥文件上传到 dask cluster/workers 以便它允许我 运行 这个 sqlalchemy 查询?

def execute_query(q):
    conn = create_engine(f'presto://{user}:{password}@{host}:{port}/{catalog}/{schema}',
                               connect_args={'protocol': 'https',
                                             'requests_kwargs': {'verify': key}})
    return pd.read_sql(q, conn)

df = dd.from_delayed([
    delayed(execute_query)(q) for q in queries])

我尝试使用 client.upload_file 将本地文件发送到集群,但它抱怨无法找到 .pem 密钥的路径

OSError: Could not find a suitable TLS CA certificate bundle, invalid path: hdsj1ptc001.pem

虽然 Dask 可以为您处理一些文件操作(请参阅 client.upload_file),但您应该使用自己的方法将敏感文件(例如凭据)分发到工作文件系统上的特定位置。选项包括 scp、kubernetes secrets 和许多其他方法。

如果您确定集群的安全性,您可以将密钥文件包含在函数的参数中,并将其写入函数中的文件(见下文),或者,如果调用允许的话,直接传递字节。

def execute_query(q, key):
    if not os.path.exists(keyfile):   # if the data needs to be in a file
        open(keyfile, 'wb').write(key)
    conn = create_engine(f'presto://{user}:{password}@{host}:{port}/{catalog}/{schema}',
                               connect_args={'protocol': 'https',
                                             'requests_kwargs': {'verify': keyfile}})
    return pd.read_sql(q, conn)

key = dask.delayed(open('keyfile.pem', 'rb').read())
df = dd.from_delayed([
    delayed(execute_query)(q, key) for q in queries])