在 Bash 中使用 fifo 队列进行 inotifywait

inotifywait with a fifo queue in Bash

我写了一个小 Bash 脚本,它使用 inotify-tools 和 inotify 接口。我的问题是,此函数中的命令之一可能会阻止执行直到完成。这样函数就卡住了。

为了解决这个问题,我想将检测到的文件排队(通过关闭事件),并从另一个函数读取队列。有人知道如何在 Bash 中执行此操作吗?

下面的变量是用于查找目录或分配文件名的简单字符串。

inotifywait -mrq -e close --format %w%f /some/dir/ | while read FILE
do
    NAME=$(echo $CAP)_"`date +"%F-%H-%M-%S"`.pcap"
    logger -i "$FILE was just closed"
    # cp "$FILE" "$DATA/$CAP/$ENV/$NAME"
    rsync -avz --stats --log-file=/root/rsync.log "$FILE" "$DATA/$CAP/$ENV/$NAME" >> /root/rsync_stats.log
    RESULT=$?
    if [ $RESULT -eq 0 ] ; then
        logger -i "Success: $FILE copied to SAN $DATA/$CAP/$ENV/$NAME, code $RESULT"
    else
        logger -i "Fail:    $FILE copy failed to SAN for $DATA/$CAP/$ENV/$NAME, code $RESULT"
    fi

    rm "$FILE"
    RESULT=$?
    if [ $RESULT -eq 0 ] ; then
        logger -i "Success: deletion successfull for $FILE, code $RESULT"
    else
        logger -i "Fail:    deletion failed for $FILE on SSD, code $RESULT"
    fi

    do_something()
    logger -i "$NAME was handled"
    # for stdout
    echo "`date`: Moved file" 
done

我正在将文件复制到 SAN 卷,该卷有时会出现响应时间变化。这就是为什么这个函数会卡住一段时间的原因。我用 Rsync 替换了 cp 因为我需要吞吐量统计信息。 Cp(来自 coreutils)显然不会这样做。

几个想法:

1) 您可以使用命名管道作为有限大小的队列:

mkfifo pipe

your_message_source | while read MSG
do
  #collect files in a pipe 
  echo "$MSG" >> pipe
done &

while read MSG 
do
 #Do your blocking work here
done < pipe

当管道的缓冲区被填满时,这将在 echo "$MSG" >> pipe 上阻塞(您可以使用 ulimit -p 获得该缓冲区的大小(乘以 512)。这对于某些情况可能就足够了。

2) 您可以将文件用作消息队列并在每次操作时对其进行文件锁定:

 #Feeder part
    your_message_source | while read MSG     
       do
            (
            flock 9
            echo "$MSG" >> file_based_queue 
            ) 9> file_based_queue 
       done &

   # Worker part
   while :
   do 
    #Lock shared queue and cut'n'paste it's content to the worker's private queue
    (
      flock 9
      cp file_based_queue workers_queue
      truncate -s0 file_based_queue   
    ) 9> file_based_queue

    #process private queue
    while read MSG 
    do
     #Do your blocking work here   
    done < workers_queue 
   done

如果您同时处于 (flock ...) 9>file_based_queue 子 shell 的工作循环中并且在 flock 命令之后,您只会阻塞 inotifywait。您可以将队列放在 RAMdisk (/dev/shm) 中,以最大限度地减少您在那里花费的时间,这样您就不会错过 FS 事件。

3) 或者您可以使用某些 bash 接口(或以具有接口的语言执行脚本)数据库支持的消息队列或 SysV 消息队列。

这是使用文件作为 FIFO 队列的示例, 具有无限大小,在系统重启时持续存在,并且 允许多个读者和作者。

#!/bin/bash

# manages a FIFO queue on a system file.
#  every message is a line of text.
#  supports multiple writers and multiple readers.
#
# Requires: bash, inotify-tools: /usr/bin/inotifywait,
# ed, util-linux: /usr/bin/flock

set -e

# Retrieves one element
# param:
#  pipe_name
# writes to stdout:
#  element_string
# returns:
#  true on succes, false on error or end of data
_pipe_pull() {
    local pipe=""
    local msg pid

_pipe_pop() {
    local fd1
    ( if ! flock --timeout 1 --exclusive ${fd1}; then
        echo "Error: _pipe_pop can't get a lock." >&2
        return 1
    fi
        [ ! -s "${pipe}" ] || \
            ed -s "${pipe}" <<< $'1p\n1d\nw'
    ) {fd1}< "${pipe}"
    :
}

    msg=""
    while [ -z "${msg}" ]; do
        if [ ! -s "${pipe}" ]; then
            inotifywait -e modify "${pipe}" > /dev/null 2>&1 &
            pid="${!}"
            wait "${pid}" || return 1
        fi

        msg="$(_pipe_pop)" || \
            return 1

        if [ "${msg}" = $'\x04' ]; then
            echo "_pipe_pull: end of data." >&2
            return 1
        fi
    done
    printf '%s\n' "${msg}"
    :
}

# Adds multiple elements at once
# param:
#  pipe_name elem_1 ... elem_N
# returns:
#  true on succes, false on error
_pipe_push() {
    local pipe=""
    local fd1
    shift

    ( if ! flock --timeout 10 --exclusive ${fd1}; then
        echo "Error: _pipe_push can't get a lock." >&2
        return 1
    fi
        printf '%s\n' "${@}" >> "${pipe}"
    ) {fd1}< "${pipe}"
}

pipe_file_1="$(mktemp /tmp/pipe-XXXXXX.txt)"

# submit first reader process
while msg="$(_pipe_pull "${pipe_file_1}")"; do
    printf 'Reader 1:%s\n' "${msg}"
done &

# submit second reader process
while msg="$(_pipe_pull "${pipe_file_1}")"; do
    printf 'Reader 2:%s\n' "${msg}"
done &

# submit first writer process
for i in {1..10}; do
    _pipe_push "${pipe_file_1}" "Next comes ${i}" "${i}"
done &
pid1="${!}"

# submit second writer process
for i in {11..20}; do
    _pipe_push "${pipe_file_1}" "${i}" "Previous was ${i}"
done &
pid2="${!}"

# submit third writer process
for i in {21..30}; do
    _pipe_push "${pipe_file_1}" "${i}"
done &
pid3="${!}"

# waiting for the end of writer processes
wait ${pid1} ${pid2} ${pid3}

# signal end of data to two readers
_pipe_push "${pipe_file_1}" $'\x04' $'\x04'

# waiting for the end of reader processes
wait

# cleaning
rm -vf "${pipe_file_1}"
: