从连续产生的分散数据更新 dask 数组

update dask array from continuously produced scattered data

我正在执行一项分析,该分析“连续”生成数据,旨在更新 dask 数组。 您会在下面找到一个旨在说明工作流程的最小示例。

有没有人知道我应该如何进行或对此有任何想法? 我想避免将数据存储在磁盘上。

A = da.zeros((10000, 10000), chunks=(1000, 1000))

def generate_send_data(i):
    
    for i in range(100):  # long loop
        x = np.random.randint(0, 10000, 100)
        y = np.random.randint(0, 10000, 100)
        z = np.random.randn(100)
        
        # send data to appropriate chunk in A in order to update A: 
        # A[x, y] = A[x, y] + z
        
        # wait for event
        sleep(1+i)

F = client.map(generate_send_data, range(10))

Dask 数组懒惰地运行 functionally。这意味着每个操作都关联了一个唯一的键,如果重复,结果将是相同的——这样任何工作人员都可以进行计算,并且可以重新生成图表。

简而言之:如果你改变它们,days 数组将不会像你期望的那样。

您需要使用不同的范例,也许通过使用 submit 来不断发送新数据、变量(由调度程序持有)或 dask Actors(存在于 worker 上的有状态对象)。