这个例子到底发生了什么

What exactly happens in this example

我正在 dask 中的 TPC-H 数据集的一部分上编写 TPC-H 查询 6:

start = time.time()
lineitem = dd.read_csv("s3://tpc-h-csv/lineitem/lineitem.tbl.1",sep="|", header = 0)
df = lineitem.rename(columns=dict(zip(lineitem.columns, lineitem_scheme)))
filtered_df = df.loc[(df.l_shipdate > "1994-01-01") & (df.l_discount >= 0.05) & (df.l_discount <= 0.07) & (df.l_quantity < 24)]
filtered_df['product'] = filtered_df.l_extendedprice * filtered_df.l_discount
print(filtered_df.product.sum().compute())
print(time.time() - start)

我有几个问题:

  1. 这是在 Dask 中编写上述查询的最快方法吗?

  2. 我从S3下载的数据是48GB。我节点上的内存是 16 GB。 Dask 是否进行批量计算?它会持久化到磁盘然后从磁盘读取吗?会发生什么?

对于您的第一个问题,这似乎是一种在 Dask 中编写查询的有效方法,但鉴于 s3 存储桶不是 public,因此很难测试。 None 您的操作需要改组,因此它们应该都相当便宜。对于第二个问题,答案基本上是肯定的,因为您正在下载的数据集大于可用内存,Dask 将溢出到磁盘。微调 Dask 管理内存的方式有点棘手,如果您有兴趣,还有更多内容 here