Dask:创建严格递增的索引
Dask: create strictly increasing index
正如有据可查的那样,当 reset_index
被调用时,Dask 在每个分区的基础上创建一个严格递增的索引,从而导致整个集合上的重复索引。在 Dask 中创建一个严格递增的索引的最佳方法是什么(例如计算上最快的) - 这不必是连续的 - 在整个集合中?我希望 map_partitions
会传递分区号,但我认为不会。谢谢
编辑
谢谢@MRocklin,我已经走到这一步了,但我需要一些帮助来了解如何将我的系列与原始数据框重新组合。
def create_increasing_index(ddf:dd.DataFrame):
mps = int(len(ddf) / ddf.npartitions + 1000)
values = ddf.index.values
def do(x, max_partition_size, block_id=None):
length = len(x)
if length == 0:
raise ValueError("Does not work with empty partitions. Consider using dask.repartition.")
start = block_id[0] * max_partition_size
return da.arange(start, start+length, chunks=1)
series = values.map_blocks(do, max_partition_size=mps, dtype=np.int64)
ddf2 = dd.concat([ddf, dd.from_array(series)], axis=1)
return ddf2
我在哪里收到错误 "ValueError: Unable to concatenate DataFrame with unknown division specifying axis=1"。有没有比使用 dd.concat 更好的方法?谢谢
再次编辑
实际上,就我的目的(以及我测试的数据量 - 只有几 GB)而言,cumsum 已经足够快了。当这变得太慢时,我会重新访问!
完成此操作的一种相当缓慢的方法是创建一个新列,然后使用 cumsum
ddf['x'] = 1
ddf['x'] = ddf.x.cumsum()
ddf = ddf.set_index('x', sorted=True)
这既不是很慢也不是免费的。
考虑到您的问题的措辞方式,我怀疑您只是想为每个分区创建一个范围,该范围由一个非常大的值分隔,您知道该值大于最大行数。 map_partitions
没有提供分区号是对的。您可以改用以下两种解决方案之一。
- 转换为 dask.array(使用
.values
),使用 map_blocks
方法,它提供块索引,然后使用 [=15= 转换回系列].
- 转换为 dask.delayed 个对象的列表,自己创建延迟系列,然后使用
dd.from_delayed
转换回 dask 系列
正如有据可查的那样,当 reset_index
被调用时,Dask 在每个分区的基础上创建一个严格递增的索引,从而导致整个集合上的重复索引。在 Dask 中创建一个严格递增的索引的最佳方法是什么(例如计算上最快的) - 这不必是连续的 - 在整个集合中?我希望 map_partitions
会传递分区号,但我认为不会。谢谢
编辑
谢谢@MRocklin,我已经走到这一步了,但我需要一些帮助来了解如何将我的系列与原始数据框重新组合。
def create_increasing_index(ddf:dd.DataFrame):
mps = int(len(ddf) / ddf.npartitions + 1000)
values = ddf.index.values
def do(x, max_partition_size, block_id=None):
length = len(x)
if length == 0:
raise ValueError("Does not work with empty partitions. Consider using dask.repartition.")
start = block_id[0] * max_partition_size
return da.arange(start, start+length, chunks=1)
series = values.map_blocks(do, max_partition_size=mps, dtype=np.int64)
ddf2 = dd.concat([ddf, dd.from_array(series)], axis=1)
return ddf2
我在哪里收到错误 "ValueError: Unable to concatenate DataFrame with unknown division specifying axis=1"。有没有比使用 dd.concat 更好的方法?谢谢
再次编辑
实际上,就我的目的(以及我测试的数据量 - 只有几 GB)而言,cumsum 已经足够快了。当这变得太慢时,我会重新访问!
完成此操作的一种相当缓慢的方法是创建一个新列,然后使用 cumsum
ddf['x'] = 1
ddf['x'] = ddf.x.cumsum()
ddf = ddf.set_index('x', sorted=True)
这既不是很慢也不是免费的。
考虑到您的问题的措辞方式,我怀疑您只是想为每个分区创建一个范围,该范围由一个非常大的值分隔,您知道该值大于最大行数。 map_partitions
没有提供分区号是对的。您可以改用以下两种解决方案之一。
- 转换为 dask.array(使用
.values
),使用map_blocks
方法,它提供块索引,然后使用 [=15= 转换回系列]. - 转换为 dask.delayed 个对象的列表,自己创建延迟系列,然后使用
dd.from_delayed
转换回 dask 系列