是否有类似 xargs 的管道数据而不是使用参数?

Is there something like xargs which pipes data instead of using arguments?

xargs 如果您有一个通过命令行参数接受其输入的命令,那么它很棒,但如果您有一个通过标准输入接受其输入的命令,它就没有那么有用了。

我所看到的针对类似这种情况的建议是使用 tee 将行复制到多个下游进程,如下所示:

producer | tee >(consumer0 >out0) >(consumer1 >out1) >(consumer2 >out2) | consumer3 >out3;
cat out* | next-stage-of-pipeline

这有一个缺点,即所有消费者都会收到所有生产的行,并且假设 consumer{0..3} 是不同的进程。如果我需要每个消费者都相同,并处理部分输入(例如,并行化顺序消费者),它也不起作用。

我正在寻找的是像 xargs 这样的东西,它允许在同一消费者的多个实例之间拆分工作,然后组合输入并继续处理。输出组合的方式应该模仿 xargs,所以不能保证行的顺序,但是两行不会拼接在一起(例如:"Hello Fred" 和 "Hello George" 可能会出现任何顺序,但您不会看到 "HHello GeoFregedello").

像这样的东西的主要用途是处理大量数据,如果为输入的每一行启动一个,消费者的启动延迟会很明显。如果 consumer 的启动成本低廉,我只需将它包装在一个小的 shell 脚本中,该脚本将其参数通过管道传输到 consumer 并使用 xargs.[=20= 调用它]

至少对于我想到的用例,生产者将从内部服务中获取大量数据,之后消费者需要转换该数据并进行一些 API 调用。所以生产者和消费者都将是一个很长的 运行 进程,运行 并行 API 调用确实会加快速度。

像这样的东西是理想的,但我找不到任何东西可以做到这一点:

producer | ??? -P20 consumer | next-stage-of-pipeline

是否有提供此功能的命令行工具?

splitting up the work between multiple instances of the same consumer

这可以在 xargs 的消费者端完成。

例如:

consumer() {
   consumer$(($RANDOM % 4)) "$@"
}
export -f consumer
producer | xargs -P20 bash -c consumer --

会随机挑选一个消费者。

那个:

consumer() {
   seq 4 |
   xargs -i{} consumer{} "$@"
}
export -f consumer
producer | xargs -P20 bash -c consumer --

将运行输入每个消费者。

总之,你应该明白了。

对于您的 T 恤,无需任何临时文件即可轻松完成:

consumer() { sed "s/^/: /"; }
producer() { seq 3; }
next-stage-of-pipeline() { sed "s/^/Result: /"; }
producer |
{ tee >(consumer 0 >&10) >(consumer 1 >&11) >(consumer 2 >&12) | consumer 3 >&14 ;} 10>&1 11>&1 12>&1 14>&1 | 
next-stage-of-pipeline

如果你需要前。将每个消费者的输入分成 4 份,仍然很容易做到:

filter() { awk -vN="" '(NR + N) % 4 == 1'
producer |
{ tee >(
     filter 1 | consumer 0 >&10
) >(
     filter 2 | consumer 1 >&11
) >(
     filter 3 | consumer 2 >&12
) | 
     filter 4 | consumer 3 >&14 
;} 10>&1 11>&1 12>&1 14>&1 | 
next-stage-of-pipeline

我认为 GNU Paralell 可能会满足您的需求。

producer | parallel consumer | next command in pipeline

编辑:在我开始玩 parallel 之前,我 完全 理解这个问题。 Parallel 可以将函数作为消费者,您可以在该函数内使用流。

例如

consumer () 
{ 
    echo "$@" | awk '{print }'
}
export -f consumer
for i in {1..30}; do echo "foo $i bar"; done | parallel consumer

编辑 2:

您可以使用 --pipe 选项,该选项将数据通过管道传递给消费者。 -q 参数在消费者周围放置 shell 引号:

for i in {1..30}; do echo "foo $i bar"; done | parallel --pipe -q awk '{print }'

奖金:

  1. parallel 将 运行 顺序处理器上的作业。如果你有 8 个核心和 16 个消费者,每个核心将获得 2 个消费者(这是可配置的)
  2. parallel 可以 运行 跨网络上的多台机器,只要它可以 ssh 到它们而不被提示输入密码(例如使用 ssh-agent) .

GNU Parallel 可以将标准输入分割成同一个命令:

... | parallel --pipe consumer

它默认在 \n 上分块,块大小约为 1 MB。这可以用 --recstart/--recend--block.

来改变

如果输入是一个文件,这样会更快:

parallel -a bigfile --pipepart --block -1 consumer

这将找到 bigfile 的大小并将其拆分为每个 CPU 线程的一个块。这是即时完成的 - 因此不会创建临时文件。

https://zenodo.org/record/1146014 的第 9 章对此进行了详细介绍。

但如果您真的希望将整个输入传送给不同的消费者,GNU Parallel 也可以做到:

# Pipe 1..10 to consumer-a..z
seq 10 | parallel --pipe --tee 'echo consumer-{}; cat' ::: {a..z}