我可以在多个实体的 map() 之后使用一个 DynamicResource 的输出吗?

Can I use the output of one DynamicResource after a map() for multiple solids?

我正在做与文档中的 dynamic mapping and collect 示例类似的事情。该示例列出目录中的文件,将每个文件映射到计算文件大小的实体,然后收集输出以汇总总体大小。

但是,我想 运行 在每个实体上平行放置多个实体。所以继续这个例子:我会列出一个目录中的文件;然后映射,以便为每个文件计算大小,检查文件权限,并并行计算 md5sum;最后收集输出。

我可以在每个文件中按顺序 运行 这些,例如:

file_results = list_files()
.map(compute_size)
.map(check_permissions)
.map(compute_md5sum)
summarize(file_results.collect())

但如果这些实际上不是串行依赖项,那么最好并行处理每个文件。有没有像这样的语法:

file_results = list_files().map(
compute_md5sum(check_permissions(compute_size)))
summarize(file_results.collect())

如果我没理解错的话,像这样的东西应该可以满足您的需求:

def _process_file(file):
    size = compute_size(file)
    perms = check_permissions(file)
    hash = compute_md5sum(file)
    return summarize_file(size, perms, hash)


file_results = list_files().map(_process_file)
summarize(file_results.collect)