dask 可以处理无穷无尽的流输入吗
Can dask work with an endless streaming input
我知道 dask 在像这样的批处理模式下工作得很好
def load(filename):
...
def clean(data):
...
def analyze(sequence_of_data):
...
def store(result):
with open(..., 'w') as f:
f.write(result)
dsk = {'load-1': (load, 'myfile.a.data'),
'load-2': (load, 'myfile.b.data'),
'load-3': (load, 'myfile.c.data'),
'clean-1': (clean, 'load-1'),
'clean-2': (clean, 'load-2'),
'clean-3': (clean, 'load-3'),
'analyze': (analyze, ['clean-%d' % i for i in [1, 2, 3]]),
'store': (store, 'analyze')}
from dask.multiprocessing import get
get(dsk, 'store') # executes in parallel
- 我们可以使用 dask 来处理流媒体通道吗,其中块的数量是未知的,甚至是无穷无尽的?
- 是否可以增量计算。例如,上面的 'analyze' 步骤可以处理正在进行的块吗?
- 我们是否必须在所有数据块已知后才调用 "get" 操作,我们是否可以在调用 "get" 之后添加新块
编辑:查看下面较新的答案
没有
dask 中的当前任务调度程序需要单个计算图。它不支持动态添加到此图形或从中删除。调度器设计用于在少量内存中评估大图;提前了解整个图表对此至关重要。
但是,这并不能阻止人们创建具有不同属性的其他调度程序。一个简单的解决方案是在多台机器上使用像 conncurrent.futures
on a single machine or distributed
这样的模块。
其实是的
分布式调度程序现在完全异步运行,您可以在计算期间提交任务、等待其中的一些、提交更多、取消任务、add/remove worker 等。有几种方法可以做到这一点,但最简单的可能是这里简要描述的新 concurrent.futures
界面:
我知道 dask 在像这样的批处理模式下工作得很好
def load(filename):
...
def clean(data):
...
def analyze(sequence_of_data):
...
def store(result):
with open(..., 'w') as f:
f.write(result)
dsk = {'load-1': (load, 'myfile.a.data'),
'load-2': (load, 'myfile.b.data'),
'load-3': (load, 'myfile.c.data'),
'clean-1': (clean, 'load-1'),
'clean-2': (clean, 'load-2'),
'clean-3': (clean, 'load-3'),
'analyze': (analyze, ['clean-%d' % i for i in [1, 2, 3]]),
'store': (store, 'analyze')}
from dask.multiprocessing import get
get(dsk, 'store') # executes in parallel
- 我们可以使用 dask 来处理流媒体通道吗,其中块的数量是未知的,甚至是无穷无尽的?
- 是否可以增量计算。例如,上面的 'analyze' 步骤可以处理正在进行的块吗?
- 我们是否必须在所有数据块已知后才调用 "get" 操作,我们是否可以在调用 "get" 之后添加新块
编辑:查看下面较新的答案
没有
dask 中的当前任务调度程序需要单个计算图。它不支持动态添加到此图形或从中删除。调度器设计用于在少量内存中评估大图;提前了解整个图表对此至关重要。
但是,这并不能阻止人们创建具有不同属性的其他调度程序。一个简单的解决方案是在多台机器上使用像 conncurrent.futures
on a single machine or distributed
这样的模块。
其实是的
分布式调度程序现在完全异步运行,您可以在计算期间提交任务、等待其中的一些、提交更多、取消任务、add/remove worker 等。有几种方法可以做到这一点,但最简单的可能是这里简要描述的新 concurrent.futures
界面: