将 Cassandra 数据加载到 Dask Dataframe
Loading Cassandra Data into Dask Dataframe
我正在尝试将数据从 cassandra 数据库加载到 Dask 数据帧中。我尝试查询以下内容但没有成功:
query="""SELECT * FROM document_table"""
df = man.session.execute(query)
df = dd.DataFrame(list(df))
TypeError Traceback (most recent call last)
<ipython-input-135-021507f6f2ab> in <module>()
----> 1 a = dd.DataFrame(list(df))
TypeError: __init__() missing 3 required positional arguments: 'name', 'meta', and 'divisions'
有人知道直接从 Cassandra 加载数据到 Dask 的简单方法吗?内存太多,先加载到 pandas。
您的代码存在一些问题:
行 df =
可能会将整个数据集加载到内存中。此处未调用 Dask,它不参与其中。了解 Cassandra 驱动程序的人可以证实这一点。
list(df)
生成数据框的列名列表并删除所有数据
dd.DataFrame
,如果你看docs不是这样构造的。
您可能想要做的是 a) 创建一个 returns 数据分区的函数,b) 延迟此函数并调用分区的各种值 c) 使用 dd.from_delayed
制作 dask 数据框。例如,假设 table 有一个字段 partfield
,它很容易地具有可能的值 1..6 并且每个分区的行数相似:
@dask.delayed
def part(x):
session = # construct Cassandra session
q = "SELECT * FROM document_table WHERE partfield={}".format(x)
df = man.session.execute(query)
return dd.DataFrame(list(df))
parts = [part(x) for x in range(1, 7)]
df = dd.from_delayed(parts)
我正在尝试将数据从 cassandra 数据库加载到 Dask 数据帧中。我尝试查询以下内容但没有成功:
query="""SELECT * FROM document_table"""
df = man.session.execute(query)
df = dd.DataFrame(list(df))
TypeError Traceback (most recent call last)
<ipython-input-135-021507f6f2ab> in <module>()
----> 1 a = dd.DataFrame(list(df))
TypeError: __init__() missing 3 required positional arguments: 'name', 'meta', and 'divisions'
有人知道直接从 Cassandra 加载数据到 Dask 的简单方法吗?内存太多,先加载到 pandas。
您的代码存在一些问题:
行
df =
可能会将整个数据集加载到内存中。此处未调用 Dask,它不参与其中。了解 Cassandra 驱动程序的人可以证实这一点。list(df)
生成数据框的列名列表并删除所有数据dd.DataFrame
,如果你看docs不是这样构造的。
您可能想要做的是 a) 创建一个 returns 数据分区的函数,b) 延迟此函数并调用分区的各种值 c) 使用 dd.from_delayed
制作 dask 数据框。例如,假设 table 有一个字段 partfield
,它很容易地具有可能的值 1..6 并且每个分区的行数相似:
@dask.delayed
def part(x):
session = # construct Cassandra session
q = "SELECT * FROM document_table WHERE partfield={}".format(x)
df = man.session.execute(query)
return dd.DataFrame(list(df))
parts = [part(x) for x in range(1, 7)]
df = dd.from_delayed(parts)