使用 map_partitions 和 pd.df.to_sql 从 dask 数据帧创建 sql table

Create sql table from dask dataframe using map_partitions and pd.df.to_sql

Dask 没有像 pandas 这样的 df.to_sql() ,所以我试图复制该功能并使用 [= 创建一个 sql table 15=] 方法来做到这一点。这是我的代码:

import dask.dataframe as dd
import pandas as pd
import sqlalchemy_utils as sqla_utils

db_url = 'my_db_url_connection'
conn = sqla.create_engine(db_url)

ddf = dd.read_csv('data/prod.csv')
meta=dict(ddf.dtypes)
ddf.map_partitions(lambda df: df.to_sql('table_name', db_url, if_exists='append',index=True), ddf, meta=meta)

这是我的 returns 我的 dask 数据框对象,但是当我查看我的 psql 服务器时,没有新的 table...这里出了什么问题?

更新 仍然无法让它工作,但由于独立问题。追问:

简单地说,您已经创建了一个数据框,它是要完成的工作的处方,但您还没有执行它。要执行,您需要对结果调用 .compute()

请注意,这里的输出并不是真正的数据帧,每个分区的计算结果为 None(因为 to_sql 没有输出),因此用 [=14= 表达可能更清晰],类似于

dto_sql = dask.delayed(pd.DataFrame.to_sql)
out = [dto_sql(d, 'table_name', db_url, if_exists='append', index=True)
       for d in ddf.to_delayed()]
dask.compute(*out)

另请注意,能否获得良好的并行性将取决于数据库驱动程序和数据系统本身。

更新 : Dask to_sql() 现已可用 https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.to_sql