将 Cassandra 数据加载到 Dask Dataframe

Loading Cassandra Data into Dask Dataframe

我正在尝试将数据从 cassandra 数据库加载到 Dask 数据帧中。我尝试查询以下内容但没有成功:

query="""SELECT * FROM document_table"""
df = man.session.execute(query)
df = dd.DataFrame(list(df)) 

TypeError                                 Traceback (most recent call last)
<ipython-input-135-021507f6f2ab> in <module>()
----> 1 a = dd.DataFrame(list(df))

    TypeError: __init__() missing 3 required positional arguments: 'name', 'meta', and 'divisions'

有人知道直接从 Cassandra 加载数据到 Dask 的简单方法吗?内存太多,先加载到 pandas。

您的代码存在一些问题:

  • df = 可能会将整个数据集加载到内存中。此处未调用 Dask,它不参与其中。了解 Cassandra 驱动程序的人可以证实这一点。

  • list(df) 生成数据框的列名列表并删除所有数据

  • dd.DataFrame,如果你看docs不是这样构造的。

您可能想要做的是 a) 创建一个 returns 数据分区的函数,b) 延迟此函数并调用分区的各种值 c) 使用 dd.from_delayed 制作 dask 数据框。例如,假设 table 有一个字段 partfield,它很容易地具有可能的值 1..6 并且每个分区的行数相似:

@dask.delayed
def part(x):
    session = # construct Cassandra session
    q = "SELECT * FROM document_table WHERE partfield={}".format(x)
    df = man.session.execute(query)
    return dd.DataFrame(list(df)) 

parts = [part(x) for x in range(1, 7)]
df = dd.from_delayed(parts)