当您不知道实际数量时如何从频道中读取所有项目

How to read all the items from the Channel when you don't know their actual count

我正在尝试实现一个访问一些 URL 的爬虫,从中收集新的相关 URL 并构建报告。我正在尝试使用 Crystal 光纤和通道同时进行,如下所示:

urls = [...] # of String
visited_urls = []

pool_size.times do
  spawn do
    loop do
      url = urls.shift?
      break if url.nil?

      channel.send(url) if some_condition
    end
  end
end

# TODO: here the problem!
loop do
  url = channel.receive?
  break if url.nil? || channel.closed?

  visited_urls << url
end

puts visited_urls.inspect

但这里我有一个问题 - 无限秒 loop(它调用 channel.receive? 直到频道中的最后一个项目,然后等待一条永远不会到达的新消息)。存在问题是因为我不知道频道中实际有多少项目,所以我不能像 Crystal 语言指南的 Concurency 部分中建议的那样做。

那么当我们不知道它将存储多少项目并且我们需要接收时,也许有一些如何使用通道的良好实践?谢谢!

一个常见的解决方案是设置杀戮值。要么像这样作为主要数据流的一部分:

results = Channel(String|Symbol).new(POOL_SIZE * 2)

POOL_SIZE.times do
  spawn do
    while has_work?
      results.send "some work result"
    end

    results.send :done
  end
end

done_workers = 0

loop do
  message = results.receive
  if message == :done
    done_workers += 1
    break if done_workers == POOL_SIZE
  elsif message.is_a? String
    puts "Got: #{message}"
  end
end

或通过辅助渠道发出事件信号:

results = Channel(String).new(POOL_SIZE * 2)
done = Channel(Nil).new(POOL_SIZE)

POOL_SIZE.times do
  spawn do
    while has_work?
      results.send "some work result"
    end

    done.send nil
  end
end

done_workers = 0
loop do
  select
  when message = results.receive
    puts "Got: #{message}"
  when done.receive
    done_workers += 1
    break if done_workers == POOL_SIZE
  end
end