运行 Perl 6 中的自馈频道
Running self-feeding channels in Perl 6
我想设置多个在通道上并发运行的线程,并且这些线程中的每一个都应该同时为通道提供数据。其中一个线程将决定何时停止。然而,这是我最接近的一次:
use Algorithm::Evolutionary::Simple;
my $length = 32;
my $supplier = Supplier.new;
my $supply = $supplier.Supply;
my $channel-one = $supply.Channel;
my $pairs-supply = $supply.batch( elems => 2 );
my $channel-two = $pairs-supply.Channel;
my $single = start {
react {
whenever $channel-one -> $item {
say "via Channel 1:", max-ones($item);
}
}
}
my $pairs = start {
react {
whenever $channel-two -> @pair {
my @new-chromosome = crossover( @pair[0], @pair[1] );
say "In Channel 2: ", @new-chromosome;
$supplier.emit( @new-chromosome[0]);
$supplier.emit( @new-chromosome[1]);
}
}
}
await (^10).map: -> $r {
start {
sleep $r/100.0;
$supplier.emit( random-chromosome($length) );
}
}
$supplier.done;
这会在多次排放后停止。无论如何,它可能不会同时 运行ning。我正在使用通道而不是供应和水龙头,因为它们不是同时 运行,而是异步的。我 需要 用品,因为我想有一个 seudo-channel 成对地接收元素,就像上面所做的那样;我还没有看到用纯渠道做到这一点的方法。
如果我将供应的 emit
更改为渠道的 send
,上面没有区别。
这里有几个问题
这些 react
块 运行 在不同的线程中吗?如果不是,那该怎么做?
即使不是,为什么即使$pairs
一直在向频道发射它也会停止?
我可以让 "batch" 个频道从单项频道自动创建吗?
更新 1:如果我从末尾删除 $supplier.done
,它只会阻塞。如果我为每次读取创建一个 promise in whenever
,它只会阻塞并且什么都不做。
答案就在这里,精简到最低限度
my Channel $c .= new;
my Channel $c2 = $c.Supply.batch( elems => 2).Channel;
my Channel $output .= new;
my $count = 0;
$c.send(1) for ^2;
my $more-work = start react whenever $c2 -> @item {
if ( $count++ < 32 ) {
$c.send( @item[1]);
my $sum = sum @item;
$c.send( $sum );
$output.send( $sum );
} else {
$c.close;
}
}
await $more-work;
loop {
if my $item = $output.poll {
$item.say
} else {
$output.close;
}
if $output.closed { last };
}
通过从通道 ($c.Supply
) 创建供应来使用每两个元素对第一个通道进行批处理的第二个通道,以两个 (batch( elems => 2)
) 和把它变回一个通道。第三个通道是为输出创建的。
为了不耗尽供应并挂起通道,从第一个(实际上是唯一的)通道读取的每个第二个元素都放回那里。因此,成对读取的第二个通道永远不会挂起或等待新元素。
为每个新元素创建一个输出通道,并在需要时使用外部计数器完成操作;该输出通道以 non-blocking 方式读取,并在最后一行没有任何内容可读时关闭。
准确回答我原来的问题:
- 是的,他们是,只是他们在互相窃取元素。
- 因为两个线程正在从同一个通道读取。第一个偶然发现元素的人,阅读它。
- 是的,通过将渠道变成供应品,将它们分批处理,然后再将它们转回渠道。请记住,它们不是副本,它们将共享相同的元素。
我想设置多个在通道上并发运行的线程,并且这些线程中的每一个都应该同时为通道提供数据。其中一个线程将决定何时停止。然而,这是我最接近的一次:
use Algorithm::Evolutionary::Simple;
my $length = 32;
my $supplier = Supplier.new;
my $supply = $supplier.Supply;
my $channel-one = $supply.Channel;
my $pairs-supply = $supply.batch( elems => 2 );
my $channel-two = $pairs-supply.Channel;
my $single = start {
react {
whenever $channel-one -> $item {
say "via Channel 1:", max-ones($item);
}
}
}
my $pairs = start {
react {
whenever $channel-two -> @pair {
my @new-chromosome = crossover( @pair[0], @pair[1] );
say "In Channel 2: ", @new-chromosome;
$supplier.emit( @new-chromosome[0]);
$supplier.emit( @new-chromosome[1]);
}
}
}
await (^10).map: -> $r {
start {
sleep $r/100.0;
$supplier.emit( random-chromosome($length) );
}
}
$supplier.done;
这会在多次排放后停止。无论如何,它可能不会同时 运行ning。我正在使用通道而不是供应和水龙头,因为它们不是同时 运行,而是异步的。我 需要 用品,因为我想有一个 seudo-channel 成对地接收元素,就像上面所做的那样;我还没有看到用纯渠道做到这一点的方法。
如果我将供应的 emit
更改为渠道的 send
,上面没有区别。
这里有几个问题
这些
react
块 运行 在不同的线程中吗?如果不是,那该怎么做?即使不是,为什么即使
$pairs
一直在向频道发射它也会停止?我可以让 "batch" 个频道从单项频道自动创建吗?
更新 1:如果我从末尾删除 $supplier.done
,它只会阻塞。如果我为每次读取创建一个 promise in whenever
,它只会阻塞并且什么都不做。
答案就在这里,精简到最低限度
my Channel $c .= new;
my Channel $c2 = $c.Supply.batch( elems => 2).Channel;
my Channel $output .= new;
my $count = 0;
$c.send(1) for ^2;
my $more-work = start react whenever $c2 -> @item {
if ( $count++ < 32 ) {
$c.send( @item[1]);
my $sum = sum @item;
$c.send( $sum );
$output.send( $sum );
} else {
$c.close;
}
}
await $more-work;
loop {
if my $item = $output.poll {
$item.say
} else {
$output.close;
}
if $output.closed { last };
}
通过从通道 ($c.Supply
) 创建供应来使用每两个元素对第一个通道进行批处理的第二个通道,以两个 (batch( elems => 2)
) 和把它变回一个通道。第三个通道是为输出创建的。
为了不耗尽供应并挂起通道,从第一个(实际上是唯一的)通道读取的每个第二个元素都放回那里。因此,成对读取的第二个通道永远不会挂起或等待新元素。
为每个新元素创建一个输出通道,并在需要时使用外部计数器完成操作;该输出通道以 non-blocking 方式读取,并在最后一行没有任何内容可读时关闭。
准确回答我原来的问题:
- 是的,他们是,只是他们在互相窃取元素。
- 因为两个线程正在从同一个通道读取。第一个偶然发现元素的人,阅读它。
- 是的,通过将渠道变成供应品,将它们分批处理,然后再将它们转回渠道。请记住,它们不是副本,它们将共享相同的元素。