使用线程读取输入时缺少字符

Missing characters while reading input with threads

假设我们有一个脚本可以打开一个文件,然后逐行读取它并将该行打印到终端。我们有单线程和多线程版本。

问题是两个脚本的结果输出几乎相同,但不完全相同。在多线程版本中,大约有 10 行缺少前 2 个字符。我的意思是,如果实际行是行 "Whosebug rocks",我会得到 "ackoverflow rocks"。

我认为这与某些竞争条件有关,因为如果我调整参数以创建大量小工人,与使用更少和更大的工人相比,我会遇到更多错误。

单线程是这样的:

$file = "some/file.txt";
open (INPUT, $file) or die "Error: $!\n";

while ($line = <STDIN>) {
    print $line;
}

多线程版本使用线程队列,此实现基于 @ikegami 方法:

use threads            qw( async );
use Thread::Queue 3.01 qw( );

use constant NUM_WORKERS    => 4;
use constant WORK_UNIT_SIZE => 100000;

sub worker {
    my ($job) = @_;
    for (@$job) {
        print $_;
    }
}

my $q = Thread::Queue->new();


async { while (defined( my $job = $q->dequeue() )) { worker($job); } }
    for 1..NUM_WORKERS;

my $done = 0;    

while (!$done) {
    my @lines;

    while (@lines < WORK_UNIT_SIZE) {
        my $line = <>;
        if (!defined($line)) {
            $done = 1;
            last;
        }

    push @lines, $line;
}

$q->enqueue(\@lines) if @lines;
}

$q->end();
$_->join for threads->list;

我试过你的程序并得到了类似的(错误的)结果。我在 print 周围使用 threads::shared 中的 lock 而不是 Thread::Semaphore,因为它比 T::S 使用起来更简单,即:

use threads;
use threads::shared;
...
my $mtx : shared;

sub worker
{
    my ($job) = @_;
    for (@$job) {
        lock($mtx); # (b)locks
        print $_;
                    # autom. unlocked here
    }
}
...

全局变量$mtx作为互斥量。它的值无关紧要,甚至 undef (像这里一样)也可以。 仅当当前没有其他线程持有该变量的锁时,对 lock 的调用才会阻塞和 returns。 当它超出范围时,它会自动解锁(从而使 lock return)。在这个发生的例子中 在 for 循环的每一次迭代之后;不需要额外的 {…} 块。

现在我们已经同步了 print 个调用…

但是这也不起作用,因为print确实缓冲了I/O(好吧,只有O)。所以我强制无缓冲输出:

use threads;
use threads::shared;
...
my $mtx : shared;
$| = 1;  # force unbuffered output

sub worker
{
    # as above
}
...

然后就成功了。令我惊讶的是,我可以删除 lock 并且它仍然有效。也许是偶然的。请注意,如果没有缓冲,您的脚本将 运行 显着变慢。

我的结论是:你是 suffering from buffering