如何在bash脚本中实现对基于文件的队列的多线程访问
How to implement multithreaded access to file-based queue in bash script
我在设计一个多处理的 bash 脚本时遇到了一些问题,该脚本通过网站,跟随发现的 links 并在每个新页面上进行一些处理(它实际上收集电子邮件地址,但这是问题的一个不重要的细节。
脚本应该像这样工作:
- 下载一个页面
- 解析出所有 link 并将它们添加到队列
- 做一些不重要的处理
- 从队列中弹出一个 URL 并从
开始
这本身编程起来非常简单,问题来自两个限制和脚本需要具有的一个功能。
- 脚本不能处理一个URL两次
- 脚本必须能够一次处理 n 个(作为参数提供)页面
- 脚本必须 POSIX 兼容(curl 除外)-> 所以没有花哨的锁
现在,我已经想出了一个实现,它使用两个文件作为队列,其中一个存储所有已处理的 URLs,另一个 URLs已找到但尚未处理。
主进程简单地生成一堆共享队列文件的子进程,并且(在一个循环中直到 URLs-to-be-processed-queue 为空)弹出一个 URL 来自 URLs-to-be-processed-queue
,处理页面,尝试将每个新发现的 link 添加到 URLs-already-processed-queue
,如果成功(URL 不存在)将其添加到 URLs-to-be-processed-queue
还有。
问题在于您不能(AFAIK)使队列文件操作原子化,因此锁定是必要的。锁定 POSIX 顺从的方式是……恐怖……缓慢的恐怖。
我的做法如下:
#Pops first element from a file () and prints it to stdout; if file emepty print out empty return 1
fuPop(){
if [ -s "" ]; then
sed -nr '1p' ""
sed -ir '1d' ""
return 0
else
return 1
fi
}
#Appends line () to a file () and return 0 if it's not in it yet; if it, is just return 1
fuAppend(){
if grep -Fxq "" < ""; then
return 1
else
echo "" >> ""
return 0
fi
}
#There're multiple processes running this function.
prcsPages(){
while [ -s "$todoLinks" ]; do
luAckLock "$linksLock"
linkToProcess="$(fuPop "$todoLinks")"
luUnlock "$linksLock"
prcsPage "$linkToProcess"
...
done
...
}
#The prcsPage downloads it, does some magic and than calls prcsNewLinks and prcsNewEmails that both get list of new emails / new urls in
#$doneEmails, ..., contain file path, $mailLock, ..., contain dir path
prcsNewEmails(){
luAckLock "$mailsLock"
for newEmail in ; do
if fuAppend "$newEmail" "$doneEmails"; then
echo "$newEmail"
fi
done
luUnlock "$mailsLock"
}
prcsNewLinks(){
luAckLock "$linksLock"
for newLink in ; do
if fuAppend "$newLink" "$doneLinks"; then
fuAppend "$newLink" "$todoLinks"
fi
done
luUnlock "$linksLock"
}
问题是我的实现很慢(就像真的很慢),几乎慢到使用超过 2 10 没有意义(减少锁等待帮助很多)子进程。你实际上可以禁用锁(只需注释掉 luAckLock 和 luUnlock 位)并且它工作得很好(并且快得多)但是 seds -i
每隔一段时间就会出现竞争条件而且它不会感觉不错。
最糟糕的(如我所见)是锁定在 prcsNewLinks
中,因为它花费了相当多的时间(大部分时间 - 运行 基本上)并且实际上阻止了其他进程开始处理一个新页面(因为它需要从(当前锁定的)$todoLinks
队列中弹出新的 URL)。
现在我的问题是,如何做得更好、更快、更好?
整个脚本是 here(它包含一些信号魔术,大量调试输出,一般来说不是那么好的代码)。
顺便说一句:是的,你是对的,在 bash 中这样做 - 更重要的是在 POSIX 兼容的方式中 - 太疯狂了!但这是大学作业,所以我不得不做
//虽然我觉得我并没有真正期望我解决这些问题(因为竞争条件只有在有 25 个以上的线程时才会更频繁地出现,这可能不是一个理智的人会测试的)。
代码注释:
- 是的,等待应该有(并且已经有)随机时间。此处共享的代码只是在实际分析讲座中编写的概念证明。
- 是的,调试输出的数量及其格式很糟糕,应该有独立的日志记录功能。然而,这不是我的问题所在。
首先,您需要实现自己的 HTML/HTTP 抓取吗?为什么不让 wget
或 curl
为您递归访问网站?
您可以将文件系统滥用为数据库,并让您的队列成为单行文件的目录。 (或文件名是数据的空文件)。这将为您提供生产者-消费者锁定,生产者触摸文件,消费者将其从传入目录移动到 processing/done 目录。
这样做的美妙之处在于多个线程接触同一个文件就可以了。期望的结果是 url 在 "incoming" 列表中出现一次,这就是当多个线程创建具有相同名称的文件时得到的结果。既然要去重,那么写入传入列表时就不需要加锁了。
1) Downloads a page
2) Parses out all links and adds them to queue
每找到一个 link,
grep -ql "$url" already_checked || : > "incoming/$url"
或
[[ -e "done/$url" ]] || : > "incoming/$url"
3) Does some unimportant processing
4) Pops an URL from queue and starts from 1)
# mostly untested, you might have to debug / tweak this
local inc=( incoming/* )
# edit: this can make threads exit sooner than desired.
# See the comments for some ideas on how to make threads wait for new work
while [[ $inc != "incoming/*" ]]; do
# $inc is shorthand for "${inc[0]}", the first array entry
mv "$inc" "done/" || { rm -f "$inc"; continue; } # another thread got that link, or that url already exists in done/
url=${inc#incoming/}
download "$url";
for newurl in $(link_scan "$url"); do
[[ -e "done/$newurl" ]] || : > "incoming/$newurl"
done
process "$url"
inc=( incoming/* )
done
编辑:将 URL 编码为不包含 /
的字符串留作 reader 的练习。尽管可能 url 将 /
编码为 %2F
就足够了。
我正在考虑将每个线程的 URLs 移动到 "processing" 列表,但实际上如果你不需要能够从中断中恢复,你的 "done" 列表可以改为 "queued & done" 列表。我不认为它实际上对 mv "$url" "threadqueue.$$/
" 或其他东西有用。
"done/" 目录会变得很大,并且开始变慢,可能有 10k 个文件,这取决于您使用的文件系统。将 "done" 列表作为每行一个 url 的文件进行维护可能更有效,或者如果有一个数据库 CLI 界面可以快速执行单个命令,则将其维护为一个数据库。
将完成的列表保存为一个文件并不坏,因为您永远不需要删除条目。您可能可以在不锁定它的情况下逃脱,即使对于追加它的多个进程也是如此。 (我不确定如果线程 B 在线程 A 打开文件和线程 A 执行写入之间的 EOF 写入数据会发生什么。线程 A 的文件位置可能是旧的 EOF,在这种情况下它会覆盖线程 B 的条目,或者更糟,只覆盖它的一部分。如果你确实需要锁定,也许 flock(1)
会有用。它获得一个锁,然后执行你作为参数传递的命令。)
如果没有发生因缺少写锁定而损坏的文件,那么您可能不需要写锁定。与必须为每个 check/append.
锁定相比,"done" 列表中偶尔出现的重复条目会稍微减慢速度
如果您需要严格正确地避免多次下载相同的 URL,则需要 readers 等待作者完成。如果您可以 sort -u
最后的电子邮件列表,那么 reader 在附加列表时阅读旧副本并不是一场灾难。那么写入者只需要相互锁定,readers 就可以读取文件了。如果他们在作者设法编写新条目之前达到了 EOF,那就这样吧。
我不确定线程在将条目从传入列表中删除之前或之后将条目添加到 "done" 列表是否重要,只要他们在之前将它们添加到 "done"加工。我在想某种方式可能会使比赛更有可能导致重复完成的条目,并且不太可能进行重复下载/处理,但我不确定。
我在设计一个多处理的 bash 脚本时遇到了一些问题,该脚本通过网站,跟随发现的 links 并在每个新页面上进行一些处理(它实际上收集电子邮件地址,但这是问题的一个不重要的细节。
脚本应该像这样工作:
- 下载一个页面
- 解析出所有 link 并将它们添加到队列
- 做一些不重要的处理
- 从队列中弹出一个 URL 并从 开始
这本身编程起来非常简单,问题来自两个限制和脚本需要具有的一个功能。
- 脚本不能处理一个URL两次
- 脚本必须能够一次处理 n 个(作为参数提供)页面
- 脚本必须 POSIX 兼容(curl 除外)-> 所以没有花哨的锁
现在,我已经想出了一个实现,它使用两个文件作为队列,其中一个存储所有已处理的 URLs,另一个 URLs已找到但尚未处理。
主进程简单地生成一堆共享队列文件的子进程,并且(在一个循环中直到 URLs-to-be-processed-queue 为空)弹出一个 URL 来自 URLs-to-be-processed-queue
,处理页面,尝试将每个新发现的 link 添加到 URLs-already-processed-queue
,如果成功(URL 不存在)将其添加到 URLs-to-be-processed-queue
还有。
问题在于您不能(AFAIK)使队列文件操作原子化,因此锁定是必要的。锁定 POSIX 顺从的方式是……恐怖……缓慢的恐怖。
我的做法如下:
#Pops first element from a file () and prints it to stdout; if file emepty print out empty return 1
fuPop(){
if [ -s "" ]; then
sed -nr '1p' ""
sed -ir '1d' ""
return 0
else
return 1
fi
}
#Appends line () to a file () and return 0 if it's not in it yet; if it, is just return 1
fuAppend(){
if grep -Fxq "" < ""; then
return 1
else
echo "" >> ""
return 0
fi
}
#There're multiple processes running this function.
prcsPages(){
while [ -s "$todoLinks" ]; do
luAckLock "$linksLock"
linkToProcess="$(fuPop "$todoLinks")"
luUnlock "$linksLock"
prcsPage "$linkToProcess"
...
done
...
}
#The prcsPage downloads it, does some magic and than calls prcsNewLinks and prcsNewEmails that both get list of new emails / new urls in
#$doneEmails, ..., contain file path, $mailLock, ..., contain dir path
prcsNewEmails(){
luAckLock "$mailsLock"
for newEmail in ; do
if fuAppend "$newEmail" "$doneEmails"; then
echo "$newEmail"
fi
done
luUnlock "$mailsLock"
}
prcsNewLinks(){
luAckLock "$linksLock"
for newLink in ; do
if fuAppend "$newLink" "$doneLinks"; then
fuAppend "$newLink" "$todoLinks"
fi
done
luUnlock "$linksLock"
}
问题是我的实现很慢(就像真的很慢),几乎慢到使用超过 2 10 没有意义(减少锁等待帮助很多)子进程。你实际上可以禁用锁(只需注释掉 luAckLock 和 luUnlock 位)并且它工作得很好(并且快得多)但是 seds -i
每隔一段时间就会出现竞争条件而且它不会感觉不错。
最糟糕的(如我所见)是锁定在 prcsNewLinks
中,因为它花费了相当多的时间(大部分时间 - 运行 基本上)并且实际上阻止了其他进程开始处理一个新页面(因为它需要从(当前锁定的)$todoLinks
队列中弹出新的 URL)。
现在我的问题是,如何做得更好、更快、更好?
整个脚本是 here(它包含一些信号魔术,大量调试输出,一般来说不是那么好的代码)。
顺便说一句:是的,你是对的,在 bash 中这样做 - 更重要的是在 POSIX 兼容的方式中 - 太疯狂了!但这是大学作业,所以我不得不做
//虽然我觉得我并没有真正期望我解决这些问题(因为竞争条件只有在有 25 个以上的线程时才会更频繁地出现,这可能不是一个理智的人会测试的)。
代码注释:
- 是的,等待应该有(并且已经有)随机时间。此处共享的代码只是在实际分析讲座中编写的概念证明。
- 是的,调试输出的数量及其格式很糟糕,应该有独立的日志记录功能。然而,这不是我的问题所在。
首先,您需要实现自己的 HTML/HTTP 抓取吗?为什么不让 wget
或 curl
为您递归访问网站?
您可以将文件系统滥用为数据库,并让您的队列成为单行文件的目录。 (或文件名是数据的空文件)。这将为您提供生产者-消费者锁定,生产者触摸文件,消费者将其从传入目录移动到 processing/done 目录。
这样做的美妙之处在于多个线程接触同一个文件就可以了。期望的结果是 url 在 "incoming" 列表中出现一次,这就是当多个线程创建具有相同名称的文件时得到的结果。既然要去重,那么写入传入列表时就不需要加锁了。
1) Downloads a page
2) Parses out all links and adds them to queue
每找到一个 link,
grep -ql "$url" already_checked || : > "incoming/$url"
或
[[ -e "done/$url" ]] || : > "incoming/$url"
3) Does some unimportant processing
4) Pops an URL from queue and starts from 1)
# mostly untested, you might have to debug / tweak this
local inc=( incoming/* )
# edit: this can make threads exit sooner than desired.
# See the comments for some ideas on how to make threads wait for new work
while [[ $inc != "incoming/*" ]]; do
# $inc is shorthand for "${inc[0]}", the first array entry
mv "$inc" "done/" || { rm -f "$inc"; continue; } # another thread got that link, or that url already exists in done/
url=${inc#incoming/}
download "$url";
for newurl in $(link_scan "$url"); do
[[ -e "done/$newurl" ]] || : > "incoming/$newurl"
done
process "$url"
inc=( incoming/* )
done
编辑:将 URL 编码为不包含 /
的字符串留作 reader 的练习。尽管可能 url 将 /
编码为 %2F
就足够了。
我正在考虑将每个线程的 URLs 移动到 "processing" 列表,但实际上如果你不需要能够从中断中恢复,你的 "done" 列表可以改为 "queued & done" 列表。我不认为它实际上对 mv "$url" "threadqueue.$$/
" 或其他东西有用。
"done/" 目录会变得很大,并且开始变慢,可能有 10k 个文件,这取决于您使用的文件系统。将 "done" 列表作为每行一个 url 的文件进行维护可能更有效,或者如果有一个数据库 CLI 界面可以快速执行单个命令,则将其维护为一个数据库。
将完成的列表保存为一个文件并不坏,因为您永远不需要删除条目。您可能可以在不锁定它的情况下逃脱,即使对于追加它的多个进程也是如此。 (我不确定如果线程 B 在线程 A 打开文件和线程 A 执行写入之间的 EOF 写入数据会发生什么。线程 A 的文件位置可能是旧的 EOF,在这种情况下它会覆盖线程 B 的条目,或者更糟,只覆盖它的一部分。如果你确实需要锁定,也许 flock(1)
会有用。它获得一个锁,然后执行你作为参数传递的命令。)
如果没有发生因缺少写锁定而损坏的文件,那么您可能不需要写锁定。与必须为每个 check/append.
锁定相比,"done" 列表中偶尔出现的重复条目会稍微减慢速度如果您需要严格正确地避免多次下载相同的 URL,则需要 readers 等待作者完成。如果您可以 sort -u
最后的电子邮件列表,那么 reader 在附加列表时阅读旧副本并不是一场灾难。那么写入者只需要相互锁定,readers 就可以读取文件了。如果他们在作者设法编写新条目之前达到了 EOF,那就这样吧。
我不确定线程在将条目从传入列表中删除之前或之后将条目添加到 "done" 列表是否重要,只要他们在之前将它们添加到 "done"加工。我在想某种方式可能会使比赛更有可能导致重复完成的条目,并且不太可能进行重复下载/处理,但我不确定。