从连续产生的分散数据更新 dask 数组
update dask array from continuously produced scattered data
我正在执行一项分析,该分析“连续”生成数据,旨在更新 dask 数组。
您会在下面找到一个旨在说明工作流程的最小示例。
有没有人知道我应该如何进行或对此有任何想法?
我想避免将数据存储在磁盘上。
A = da.zeros((10000, 10000), chunks=(1000, 1000))
def generate_send_data(i):
for i in range(100): # long loop
x = np.random.randint(0, 10000, 100)
y = np.random.randint(0, 10000, 100)
z = np.random.randn(100)
# send data to appropriate chunk in A in order to update A:
# A[x, y] = A[x, y] + z
# wait for event
sleep(1+i)
F = client.map(generate_send_data, range(10))
Dask 数组懒惰地运行 functionally。这意味着每个操作都关联了一个唯一的键,如果重复,结果将是相同的——这样任何工作人员都可以进行计算,并且可以重新生成图表。
简而言之:如果你改变它们,days 数组将不会像你期望的那样。
您需要使用不同的范例,也许通过使用 submit
来不断发送新数据、变量(由调度程序持有)或 dask Actors(存在于 worker 上的有状态对象)。
我正在执行一项分析,该分析“连续”生成数据,旨在更新 dask 数组。 您会在下面找到一个旨在说明工作流程的最小示例。
有没有人知道我应该如何进行或对此有任何想法? 我想避免将数据存储在磁盘上。
A = da.zeros((10000, 10000), chunks=(1000, 1000))
def generate_send_data(i):
for i in range(100): # long loop
x = np.random.randint(0, 10000, 100)
y = np.random.randint(0, 10000, 100)
z = np.random.randn(100)
# send data to appropriate chunk in A in order to update A:
# A[x, y] = A[x, y] + z
# wait for event
sleep(1+i)
F = client.map(generate_send_data, range(10))
Dask 数组懒惰地运行 functionally。这意味着每个操作都关联了一个唯一的键,如果重复,结果将是相同的——这样任何工作人员都可以进行计算,并且可以重新生成图表。
简而言之:如果你改变它们,days 数组将不会像你期望的那样。
您需要使用不同的范例,也许通过使用 submit
来不断发送新数据、变量(由调度程序持有)或 dask Actors(存在于 worker 上的有状态对象)。