运行 Perl 6 中的自馈频道

Running self-feeding channels in Perl 6

我想设置多个在通道上并发运行的线程,并且这些线程中的每一个都应该同时为通道提供数据。其中一个线程将决定何时停止。然而,这是我最接近的一次:

use Algorithm::Evolutionary::Simple;

my $length = 32;
my $supplier = Supplier.new;
my $supply   = $supplier.Supply;
my $channel-one = $supply.Channel;
my $pairs-supply = $supply.batch( elems => 2 );
my $channel-two = $pairs-supply.Channel;

my $single = start {
    react  {
        whenever $channel-one -> $item {
            say "via Channel 1:", max-ones($item);
        }
    }
}

my $pairs = start {
    react  {
        whenever $channel-two -> @pair {
        my @new-chromosome = crossover( @pair[0], @pair[1] );
        say "In Channel 2: ", @new-chromosome;
        $supplier.emit( @new-chromosome[0]);
        $supplier.emit( @new-chromosome[1]);
        }
    }
}

await (^10).map: -> $r {
    start {
    sleep $r/100.0;
        $supplier.emit( random-chromosome($length) );
    }
}

$supplier.done;

这会在多次排放后停止。无论如何,它可能不会同时 运行ning。我正在使用通道而不是供应和水龙头,因为它们不是同时 运行,而是异步的。我 需要 用品,因为我想有一个 seudo-channel 成对地接收元素,就像上面所做的那样;我还没有看到用纯渠道做到这一点的方法。 如果我将供应的 emit 更改为渠道的 send,上面没有区别。

这里有几个问题

  1. 这些 react 块 运行 在不同的线程中吗?如果不是,那该怎么做?

  2. 即使不是,为什么即使$pairs一直在向频道发射它也会停止?

  3. 我可以让 "batch" 个频道从单项频道自动创建吗?

更新 1:如果我从末尾删除 $supplier.done,它只会阻塞。如果我为每次读取创建一个 promise in whenever,它只会阻塞并且什么都不做。

答案就在这里,精简到最低限度

my Channel $c .= new;
my Channel $c2 = $c.Supply.batch( elems => 2).Channel;
my Channel $output .= new;
my $count = 0;
$c.send(1) for ^2;

my $more-work = start react whenever $c2 -> @item {
    if ( $count++ < 32 ) {
        $c.send( @item[1]);
    my $sum = sum @item;
    $c.send( $sum );
    $output.send( $sum ); 
    } else {
    $c.close;
    }

}
await $more-work;
loop {
    if my $item = $output.poll {
    $item.say
    } else {
    $output.close;
    }
    if $output.closed  { last };
}

通过从通道 ($c.Supply) 创建供应来使用每两个元素对第一个通道进行批处理的第二个通道,以两个 (batch( elems => 2)) 和把它变回一个通道。第三个通道是为输出创建的。 为了不耗尽供应并挂起通道,从第一个(实际上是唯一的)通道读取的每个第二个元素都放回那里。因此,成对读取的第二个通道永远不会挂起或等待新元素。 为每个新元素创建一个输出通道,并在需要时使用外部计数器完成操作;该输出通道以 non-blocking 方式读取,并在最后一行没有任何内容可读时关闭。 准确回答我原来的问题:

  1. 是的,他们是,只是他们在互相窃取元素。
  2. 因为两个线程正在从同一个通道读取。第一个偶然发现元素的人,阅读它。
  3. 是的,通过将渠道变成供应品,将它们分批处理,然后再将它们转回渠道。请记住,它们不是副本,它们将共享相同的元素。