如何在bash脚本中实现对基于文件的队列的多线程访问

How to implement multithreaded access to file-based queue in bash script

我在设计一个多处理的 bash 脚本时遇到了一些问题,该脚本通过网站,跟随发现的 links 并在每个新页面上进行一些处理(它实际上收集电子邮件地址,但这是问题的一个不重要的细节。

脚本应该像这样工作:

  1. 下载一个页面
  2. 解析出所有 link 并将它们添加到队列
  3. 做一些不重要的处理
  4. 从队列中弹出一个 URL 并从
  5. 开始

这本身编程起来非常简单,问题来自两个限制和脚本需要具有的一个功能。

现在,我已经想出了一个实现,它使用两个文件作为队列,其中一个存储所有已处理的 URLs,另一个 URLs已找到但尚未处理。

主进程简单地生成一堆共享队列文件的子进程,并且(在一个循环中直到 URLs-to-be-processed-queue 为空)弹出一个 URL 来自 URLs-to-be-processed-queue,处理页面,尝试将每个新发现的 link 添加到 URLs-already-processed-queue,如果成功(URL 不存在)将其添加到 URLs-to-be-processed-queue 还有。

问题在于您不能(AFAIK)使队列文件操作原子化,因此锁定是必要的。锁定 POSIX 顺从的方式是……恐怖……缓慢的恐怖。

我的做法如下:

#Pops first element from a file () and prints it to stdout; if file emepty print out empty return 1
fuPop(){
if [ -s "" ]; then
        sed -nr '1p' ""
        sed -ir '1d' ""
        return 0
else
        return 1
fi
}

#Appends line () to a file () and return 0 if it's not in it yet; if it, is just return 1
fuAppend(){
if grep -Fxq "" < ""; then
        return 1
else
        echo "" >> ""
        return 0
fi
}

#There're multiple processes running this function.
prcsPages(){
while [ -s "$todoLinks" ]; do
        luAckLock "$linksLock"
        linkToProcess="$(fuPop "$todoLinks")"
        luUnlock "$linksLock"

        prcsPage "$linkToProcess"
        ...
done
...
}


#The prcsPage downloads it, does some magic and than calls prcsNewLinks and prcsNewEmails that both get list of new emails / new urls in 
#$doneEmails, ..., contain file path, $mailLock, ..., contain dir path

prcsNewEmails(){
luAckLock "$mailsLock"
for newEmail in ; do
        if fuAppend "$newEmail" "$doneEmails"; then
                echo "$newEmail"
        fi
done
luUnlock "$mailsLock"
}

prcsNewLinks(){
luAckLock "$linksLock"
for newLink in ; do
        if fuAppend "$newLink" "$doneLinks"; then
                fuAppend "$newLink" "$todoLinks"
        fi
done
luUnlock "$linksLock"
}

问题是我的实现很慢(就像真的很慢),几乎慢到使用超过 2 10 没有意义(减少锁等待帮助很多)子进程。你实际上可以禁用锁(只需注释掉 luAckLock 和 luUnlock 位)并且它工作得很好(并且快得多)但是 seds -i 每隔一段时间就会出现竞争条件而且它不会感觉不错。

最糟糕的(如我所见)是锁定在 prcsNewLinks 中,因为它花费了相当多的时间(大部分时间 - 运行 基本上)并且实际上阻止了其他进程开始处理一个新页面(因为它需要从(当前锁定的)$todoLinks 队列中弹出新的 URL)。

现在我的问题是,如何做得更好、更快、更好?

整个脚本是 here(它包含一些信号魔术,大量调试输出,一般来说不是那么好的代码)。

顺便说一句:是的,你是对的,在 bash 中这样做 - 更重要的是在 POSIX 兼容的方式中 - 太疯狂了!但这是大学作业,所以我不得不做

//虽然我觉得我并没有真正期望我解决这些问题(因为竞争条件只有在有 25 个以上的线程时才会更频繁地出现,这可能不是一个理智的人会测试的)。


代码注释:

  1. 是的,等待应该有(并且已经有)随机时间。此处共享的代码只是在实际分析讲座中编写的概念证明。
  2. 是的,调试输出的数量及其格式很糟糕,应该有独立的日志记录功能。然而,这不是我的问题所在。

首先,您需要实现自己的 HTML/HTTP 抓取吗?为什么不让 wgetcurl 为您递归访问网站?

您可以将文件系统滥用为数据库,并让您的队列成为单行文件的目录。 (或文件名是数据的空文件)。这将为您提供生产者-消费者锁定,生产者触摸文件,消费者将其从传入目录移动到 processing/done 目录。

这样做的美妙之处在于多个线程接触同一个文件就可以了。期望的结果是 url 在 "incoming" 列表中出现一次,这就是当多个线程创建具有相同名称的文件时得到的结果。既然要去重,那么写入传入列表时就不需要加锁了。

1) Downloads a page

2) Parses out all links and adds them to queue

每找到一个 link,

grep -ql "$url" already_checked || : > "incoming/$url"

[[ -e "done/$url" ]] || : > "incoming/$url"

3) Does some unimportant processing

4) Pops an URL from queue and starts from 1)

# mostly untested, you might have to debug / tweak this
local inc=( incoming/* )
# edit: this can make threads exit sooner than desired.
# See the comments for some ideas on how to make threads wait for new work
while [[ $inc != "incoming/*" ]]; do
    # $inc is shorthand for "${inc[0]}", the first array entry
    mv "$inc" "done/" || { rm -f "$inc"; continue; } # another thread got that link, or that url already exists in done/
    url=${inc#incoming/}
    download "$url";
    for newurl in $(link_scan "$url"); do
        [[ -e "done/$newurl" ]] || : > "incoming/$newurl"
    done
    process "$url"
    inc=( incoming/* )
done

编辑:将 URL 编码为不包含 / 的字符串留作 reader 的练习。尽管可能 url 将 / 编码为 %2F 就足够了。

我正在考虑将每个线程的 URLs 移动到 "processing" 列表,但实际上如果你不需要能够从中断中恢复,你的 "done" 列表可以改为 "queued & done" 列表。我不认为它实际上对 mv "$url" "threadqueue.$$/" 或其他东西有用。

"done/" 目录会变得很大,并且开始变慢,可能有 10k 个文件,这取决于您使用的文件系统。将 "done" 列表作为每行一个 url 的文件进行维护可能更有效,或者如果有一个数据库 CLI 界面可以快速执行单个命令,则将其维护为一个数据库。

将完成的列表保存为一个文件并不坏,因为您永远不需要删除条目。您可能可以在不锁定它的情况下逃脱,即使对于追加它的多个进程也是如此。 (我不确定如果线程 B 在线程 A 打开文件和线程 A 执行写入之间的 EOF 写入数据会发生什么。线程 A 的文件位置可能是旧的 EOF,在这种情况下它会覆盖线程 B 的条目,或者更糟,只覆盖它的一部分。如果你确实需要锁定,也许 flock(1) 会有用。它获得一个锁,然后执行你作为参数传递的命令。)

如果没有发生因缺少写锁定而损坏的文件,那么您可能不需要写锁定。与必须为每个 check/append.

锁定相比,"done" 列表中偶尔出现的重复条目会稍微减慢速度

如果您需要严格正确地避免多次下载相同的 URL,则需要 readers 等待作者完成。如果您可以 sort -u 最后的电子邮件列表,那么 reader 在附加列表时阅读旧副本并不是一场灾难。那么写入者只需要相互锁定,readers 就可以读取文件了。如果他们在作者设法编写新条目之前达到了 EOF,那就这样吧。

我不确定线程​​在将条目从传入列表中删除之前或之后将条目添加到 "done" 列表是否重要,只要他们在之前将它们添加到 "done"加工。我在想某种方式可能会使比赛更有可能导致重复完成的条目,并且不太可能进行重复下载/处理,但我不确定。