通过使用 gnu 信号量将其包装在 bash 脚本中来并行化命令行工具

Parallelizing command line tool by wrapping it in a bash script with gnu semaphore

我必须在包含 50000 个文件的相当大的基准测试中对命令行工具进行评估。
不幸的是,该工具没有并行化,运行 花了很长时间
我读了一些关于 gnu parallel(或 gnu semaphore)的文章,但是我找不到一个很好的例子来说明如何结合产生的多个后台进程的结果通过 gnu 信号量。

解包工具需要一个文件作为输入参数,我必须想出一种方法来收集通过 运行并行多次使用该工具产生的所有结果。
此外,我不想在崩溃时丢失任何结果。
无论何时取消脚本,它都不应该重新处理之前已经处理过的任何文件。

为确保后台进程 worker 有足够的工作要做,下面的脚本一次将多个文件传递给 worker
bash 脚本非常适合我的用例。

如果有人有类似的问题,我想与您分享脚本。
可以通过修改 worker 函数和修改变量来使脚本适应其他用例$JOBS$WPSIZE

如果您能就如何提高脚本效率提供一些反馈,我将非常高兴。

非常感谢, 朱利安

#!/bin/bash

# make variables available in function started by
# gnu semaphore
export FINALRES="result.log"
export RESFIFO="/tmp/res.fifo"
export FILFIFO="/tmp/fil.fifo"
export FILELIST="/tmp/flist"
export WPSIZE=5
export JOBS=4

PUTFPID=""
WRITPID=""

# find input files fo process
find . -name "*.txt" > ${FILELIST}

# setup fifos and files
[ ! -e "${FINALRES}" ] && touch "${FINALRES}"
[ ! -e "${RESFIFO}" ] && mkfifo "${RESFIFO}"
[ ! -e "${FILFIFO}" ] && mkfifo "${FILFIFO}"

FILES=$(diff ${FINALRES} ${FILELIST} | grep '>' | cut -d '>' -f2 | tr -d ' ')
exec 4<> ${RESFIFO}
exec 5<> ${FILFIFO}

trap cleanup EXIT TERM

function cleanup() {
  # write results that have been obainted so far
  echo "cleanup"
  [ -n "${PUTFPID}" ] && (kill -9 ${PUTFPID} 2>&1) > /dev/null
  [ -n "${WRITPID}" ] && (kill -9 ${WRITPID} 2>&1) > /dev/null
  rm -f "${RESFIFO}"
  rm -f "${FILFIFO}"
  rm -f "${LOCKFILE}"
}

# this function takes always #WPSIZE (or less) files from the fifo
function readf() {
  local cnt=0
  while read -r -t 2 line; do
    echo "$line"
    [ -z "${files}" ] && { files=${line}; let cnt=${cnt}+1; continue; }
    let cnt=${cnt}+1
    [ ${cnt} -eq ${WPSIZE} ] && break
  done <& 5
}

# this function is called by gnu semaphore and executed in the background
function worker () {
  for fil in "${@}"; do
    # do something ...
    echo "result" > "${RESFIFO}"
  done
  exit 0
}

# this function is used (at the end) to write the comutation results to a file
function writeresult() {
  while read -r line; do
    [ "${line}" = "quit" ] && break
    echo "${line}" >> ${FINALRES}
  done < ${RESFIFO}
}

# this simple helper puts all input files into a fifo
function putf() {
  for fil in $FILES; do
    echo "${fil}" > "${FILFIFO}"
  done
}

# make function worker known to gnu semaphore
export -f worker
# put file into fifo
putf &
PUTFPID=$!
writeresult &
WRITPID=$!

while true; do
  ARGS=$(readf)
  [ -z "${ARGS}" ] && break
  # used word spitting on purpose here (call worker with multiple params)
  sem --bg --jobs "${JOBS}" worker ${ARGS}
done

sem --wait

echo "quit" > ${RESFIFO}
wait

echo "all jobs are finished"
exit 0

并行附加到 FIFO 通常不是一个好主意:您确实需要了解很多有关此版本 OS 缓冲 ​​FIFO 的信息,以确保安全。这个例子说明了原因:

#!/bin/bash

size=3000

myfifo=/tmp/myfifo$$
mkfifo $myfifo

printone() {
  a=$( perl -e 'print ((shift)x'$size')'  )
  # Print a single string
  echo $a >> $myfifo
}
printone a &
printone b &
printone c &
printone d &

# Wait a little to get the printones started
sleep .1

cat $myfifo | perl -ne 'for(split//,$_){
  if($_ eq $l) {
    $c++
  } else {
    /\n/ and next;
    print $l,1+$c," "; $l=$_; $c=0;
  }
}'
echo

使用 size=10 你将永远得到:

1 a10 b10 c10 

这意味着从 FIFO 中读取了 10 个 a,然后是 10 个 b,然后是 10 个 c。 IE。没有混合。

但将其更改为 size=100000,您会得到如下内容:

1 d65536 b65536 c100000 d34256 b34256 a100000 d208 

读取了 65K d,然后是 65K b,然后是 100k c,然后是 34K d,32K b,然后是 100k a,最后是 208 d。 IE。四个输出混合在一起。非常不好。

出于这个原因,我建议不要并行追加到同一个 FIFO:存在竞争条件的风险,而且通常可以避免。

在您的情况下,您似乎只想 # do something ... 到 50000 个文件中的每一个,这非常简单:

do_something() {
  # do something ...
  echo do something to 
  echo result of  is foo
}
export -f do_something
find . -name "*.txt" | parallel do_something > results

这里 GNU Parallel 通过确保 stdout 和 stderr 不会混合用于每个作业来帮助您。

为避免在出现 crash/cancel 时重新处理,请使用 --joblog--resume