在 Bash 中使用 fifo 队列进行 inotifywait
inotifywait with a fifo queue in Bash
我写了一个小 Bash 脚本,它使用 inotify-tools 和 inotify 接口。我的问题是,此函数中的命令之一可能会阻止执行直到完成。这样函数就卡住了。
为了解决这个问题,我想将检测到的文件排队(通过关闭事件),并从另一个函数读取队列。有人知道如何在 Bash 中执行此操作吗?
下面的变量是用于查找目录或分配文件名的简单字符串。
inotifywait -mrq -e close --format %w%f /some/dir/ | while read FILE
do
NAME=$(echo $CAP)_"`date +"%F-%H-%M-%S"`.pcap"
logger -i "$FILE was just closed"
# cp "$FILE" "$DATA/$CAP/$ENV/$NAME"
rsync -avz --stats --log-file=/root/rsync.log "$FILE" "$DATA/$CAP/$ENV/$NAME" >> /root/rsync_stats.log
RESULT=$?
if [ $RESULT -eq 0 ] ; then
logger -i "Success: $FILE copied to SAN $DATA/$CAP/$ENV/$NAME, code $RESULT"
else
logger -i "Fail: $FILE copy failed to SAN for $DATA/$CAP/$ENV/$NAME, code $RESULT"
fi
rm "$FILE"
RESULT=$?
if [ $RESULT -eq 0 ] ; then
logger -i "Success: deletion successfull for $FILE, code $RESULT"
else
logger -i "Fail: deletion failed for $FILE on SSD, code $RESULT"
fi
do_something()
logger -i "$NAME was handled"
# for stdout
echo "`date`: Moved file"
done
我正在将文件复制到 SAN 卷,该卷有时会出现响应时间变化。这就是为什么这个函数会卡住一段时间的原因。我用 Rsync 替换了 cp 因为我需要吞吐量统计信息。 Cp(来自 coreutils)显然不会这样做。
几个想法:
1) 您可以使用命名管道作为有限大小的队列:
mkfifo pipe
your_message_source | while read MSG
do
#collect files in a pipe
echo "$MSG" >> pipe
done &
while read MSG
do
#Do your blocking work here
done < pipe
当管道的缓冲区被填满时,这将在 echo "$MSG" >> pipe
上阻塞(您可以使用 ulimit -p
获得该缓冲区的大小(乘以 512)。这对于某些情况可能就足够了。
2) 您可以将文件用作消息队列并在每次操作时对其进行文件锁定:
#Feeder part
your_message_source | while read MSG
do
(
flock 9
echo "$MSG" >> file_based_queue
) 9> file_based_queue
done &
# Worker part
while :
do
#Lock shared queue and cut'n'paste it's content to the worker's private queue
(
flock 9
cp file_based_queue workers_queue
truncate -s0 file_based_queue
) 9> file_based_queue
#process private queue
while read MSG
do
#Do your blocking work here
done < workers_queue
done
如果您同时处于 (flock ...) 9>file_based_queue 子 shell 的工作循环中并且在 flock 命令之后,您只会阻塞 inotifywait。您可以将队列放在 RAMdisk (/dev/shm) 中,以最大限度地减少您在那里花费的时间,这样您就不会错过 FS 事件。
3) 或者您可以使用某些 bash 接口(或以具有接口的语言执行脚本)数据库支持的消息队列或 SysV 消息队列。
这是使用文件作为 FIFO 队列的示例,
具有无限大小,在系统重启时持续存在,并且
允许多个读者和作者。
#!/bin/bash
# manages a FIFO queue on a system file.
# every message is a line of text.
# supports multiple writers and multiple readers.
#
# Requires: bash, inotify-tools: /usr/bin/inotifywait,
# ed, util-linux: /usr/bin/flock
set -e
# Retrieves one element
# param:
# pipe_name
# writes to stdout:
# element_string
# returns:
# true on succes, false on error or end of data
_pipe_pull() {
local pipe=""
local msg pid
_pipe_pop() {
local fd1
( if ! flock --timeout 1 --exclusive ${fd1}; then
echo "Error: _pipe_pop can't get a lock." >&2
return 1
fi
[ ! -s "${pipe}" ] || \
ed -s "${pipe}" <<< $'1p\n1d\nw'
) {fd1}< "${pipe}"
:
}
msg=""
while [ -z "${msg}" ]; do
if [ ! -s "${pipe}" ]; then
inotifywait -e modify "${pipe}" > /dev/null 2>&1 &
pid="${!}"
wait "${pid}" || return 1
fi
msg="$(_pipe_pop)" || \
return 1
if [ "${msg}" = $'\x04' ]; then
echo "_pipe_pull: end of data." >&2
return 1
fi
done
printf '%s\n' "${msg}"
:
}
# Adds multiple elements at once
# param:
# pipe_name elem_1 ... elem_N
# returns:
# true on succes, false on error
_pipe_push() {
local pipe=""
local fd1
shift
( if ! flock --timeout 10 --exclusive ${fd1}; then
echo "Error: _pipe_push can't get a lock." >&2
return 1
fi
printf '%s\n' "${@}" >> "${pipe}"
) {fd1}< "${pipe}"
}
pipe_file_1="$(mktemp /tmp/pipe-XXXXXX.txt)"
# submit first reader process
while msg="$(_pipe_pull "${pipe_file_1}")"; do
printf 'Reader 1:%s\n' "${msg}"
done &
# submit second reader process
while msg="$(_pipe_pull "${pipe_file_1}")"; do
printf 'Reader 2:%s\n' "${msg}"
done &
# submit first writer process
for i in {1..10}; do
_pipe_push "${pipe_file_1}" "Next comes ${i}" "${i}"
done &
pid1="${!}"
# submit second writer process
for i in {11..20}; do
_pipe_push "${pipe_file_1}" "${i}" "Previous was ${i}"
done &
pid2="${!}"
# submit third writer process
for i in {21..30}; do
_pipe_push "${pipe_file_1}" "${i}"
done &
pid3="${!}"
# waiting for the end of writer processes
wait ${pid1} ${pid2} ${pid3}
# signal end of data to two readers
_pipe_push "${pipe_file_1}" $'\x04' $'\x04'
# waiting for the end of reader processes
wait
# cleaning
rm -vf "${pipe_file_1}"
:
我写了一个小 Bash 脚本,它使用 inotify-tools 和 inotify 接口。我的问题是,此函数中的命令之一可能会阻止执行直到完成。这样函数就卡住了。
为了解决这个问题,我想将检测到的文件排队(通过关闭事件),并从另一个函数读取队列。有人知道如何在 Bash 中执行此操作吗?
下面的变量是用于查找目录或分配文件名的简单字符串。
inotifywait -mrq -e close --format %w%f /some/dir/ | while read FILE
do
NAME=$(echo $CAP)_"`date +"%F-%H-%M-%S"`.pcap"
logger -i "$FILE was just closed"
# cp "$FILE" "$DATA/$CAP/$ENV/$NAME"
rsync -avz --stats --log-file=/root/rsync.log "$FILE" "$DATA/$CAP/$ENV/$NAME" >> /root/rsync_stats.log
RESULT=$?
if [ $RESULT -eq 0 ] ; then
logger -i "Success: $FILE copied to SAN $DATA/$CAP/$ENV/$NAME, code $RESULT"
else
logger -i "Fail: $FILE copy failed to SAN for $DATA/$CAP/$ENV/$NAME, code $RESULT"
fi
rm "$FILE"
RESULT=$?
if [ $RESULT -eq 0 ] ; then
logger -i "Success: deletion successfull for $FILE, code $RESULT"
else
logger -i "Fail: deletion failed for $FILE on SSD, code $RESULT"
fi
do_something()
logger -i "$NAME was handled"
# for stdout
echo "`date`: Moved file"
done
我正在将文件复制到 SAN 卷,该卷有时会出现响应时间变化。这就是为什么这个函数会卡住一段时间的原因。我用 Rsync 替换了 cp 因为我需要吞吐量统计信息。 Cp(来自 coreutils)显然不会这样做。
几个想法:
1) 您可以使用命名管道作为有限大小的队列:
mkfifo pipe
your_message_source | while read MSG
do
#collect files in a pipe
echo "$MSG" >> pipe
done &
while read MSG
do
#Do your blocking work here
done < pipe
当管道的缓冲区被填满时,这将在 echo "$MSG" >> pipe
上阻塞(您可以使用 ulimit -p
获得该缓冲区的大小(乘以 512)。这对于某些情况可能就足够了。
2) 您可以将文件用作消息队列并在每次操作时对其进行文件锁定:
#Feeder part
your_message_source | while read MSG
do
(
flock 9
echo "$MSG" >> file_based_queue
) 9> file_based_queue
done &
# Worker part
while :
do
#Lock shared queue and cut'n'paste it's content to the worker's private queue
(
flock 9
cp file_based_queue workers_queue
truncate -s0 file_based_queue
) 9> file_based_queue
#process private queue
while read MSG
do
#Do your blocking work here
done < workers_queue
done
如果您同时处于 (flock ...) 9>file_based_queue 子 shell 的工作循环中并且在 flock 命令之后,您只会阻塞 inotifywait。您可以将队列放在 RAMdisk (/dev/shm) 中,以最大限度地减少您在那里花费的时间,这样您就不会错过 FS 事件。
3) 或者您可以使用某些 bash 接口(或以具有接口的语言执行脚本)数据库支持的消息队列或 SysV 消息队列。
这是使用文件作为 FIFO 队列的示例, 具有无限大小,在系统重启时持续存在,并且 允许多个读者和作者。
#!/bin/bash
# manages a FIFO queue on a system file.
# every message is a line of text.
# supports multiple writers and multiple readers.
#
# Requires: bash, inotify-tools: /usr/bin/inotifywait,
# ed, util-linux: /usr/bin/flock
set -e
# Retrieves one element
# param:
# pipe_name
# writes to stdout:
# element_string
# returns:
# true on succes, false on error or end of data
_pipe_pull() {
local pipe=""
local msg pid
_pipe_pop() {
local fd1
( if ! flock --timeout 1 --exclusive ${fd1}; then
echo "Error: _pipe_pop can't get a lock." >&2
return 1
fi
[ ! -s "${pipe}" ] || \
ed -s "${pipe}" <<< $'1p\n1d\nw'
) {fd1}< "${pipe}"
:
}
msg=""
while [ -z "${msg}" ]; do
if [ ! -s "${pipe}" ]; then
inotifywait -e modify "${pipe}" > /dev/null 2>&1 &
pid="${!}"
wait "${pid}" || return 1
fi
msg="$(_pipe_pop)" || \
return 1
if [ "${msg}" = $'\x04' ]; then
echo "_pipe_pull: end of data." >&2
return 1
fi
done
printf '%s\n' "${msg}"
:
}
# Adds multiple elements at once
# param:
# pipe_name elem_1 ... elem_N
# returns:
# true on succes, false on error
_pipe_push() {
local pipe=""
local fd1
shift
( if ! flock --timeout 10 --exclusive ${fd1}; then
echo "Error: _pipe_push can't get a lock." >&2
return 1
fi
printf '%s\n' "${@}" >> "${pipe}"
) {fd1}< "${pipe}"
}
pipe_file_1="$(mktemp /tmp/pipe-XXXXXX.txt)"
# submit first reader process
while msg="$(_pipe_pull "${pipe_file_1}")"; do
printf 'Reader 1:%s\n' "${msg}"
done &
# submit second reader process
while msg="$(_pipe_pull "${pipe_file_1}")"; do
printf 'Reader 2:%s\n' "${msg}"
done &
# submit first writer process
for i in {1..10}; do
_pipe_push "${pipe_file_1}" "Next comes ${i}" "${i}"
done &
pid1="${!}"
# submit second writer process
for i in {11..20}; do
_pipe_push "${pipe_file_1}" "${i}" "Previous was ${i}"
done &
pid2="${!}"
# submit third writer process
for i in {21..30}; do
_pipe_push "${pipe_file_1}" "${i}"
done &
pid3="${!}"
# waiting for the end of writer processes
wait ${pid1} ${pid2} ${pid3}
# signal end of data to two readers
_pipe_push "${pipe_file_1}" $'\x04' $'\x04'
# waiting for the end of reader processes
wait
# cleaning
rm -vf "${pipe_file_1}"
: