Crystal 郎。如何实现生产者少的模式生产者-消费者?

Crystal lang. How to realize pattern producer-consumer with a few producers?

几天前我问 ,@Johannes Müller 回答我使用 Channel::Buffered(T)。现在,我还有一个问题。如果我有多个制作人怎么办?他们都使用相同的 Channel?

例如:

channel = Channel(String).new

# first producer
spawn do
  5.times do |i|
    puts "first producer #{i}"
    channel.send "first #{i}"
    sleep 0.1
  end
end

# second producer, that send data to the same with first producer channel
spawn do
  5.times do |i|
    puts "second producer #{i}"
    channel.send "second #{i}"
    sleep 0.1
  end
end

# consumer
spawn do
  loop do
    data = channel.receive
    puts "receive: #{data}"
    sleep 0.5
  end
end

sleep 6

输出将是:

$ crystal ./test.cr
first producer 0 # ok. first produced 0
second producer 0 # ok. second produced 0
receive: first 0 # ok. received from the first producer
first producer 1 # o_O. Where received data from the second producer?
receive: first 1 
first producer 2
receive: first 2
first producer 3
receive: first 3
first producer 4
receive: first 4
receive: second 0 # aa. It's here... Why it's happend only, when the first producer was produced all?
second producer 1 
receive: second 1
second producer 2
receive: second 2
second producer 3
receive: second 3
second producer 4
receive: second 4

如您所见,"second producer" 几乎与第一个同时发送第一个包,但在第一个生产者完成工作之前它一直被忽略。没关系,通道缓冲与否。

为什么他们不能始终如一地工作?如果第一个生产者在一个永恒的循环中工作,则永远不会收到来自第二个生产者的数据。

可能应该是这种行为,并将数据从几个地方发送到一个地方 - 显然是不好的做法?

Read the docs!

不确定我是否正确理解你的意思,但这是预期的输出吗?:

First producer sent: 0
Consumer received "First: 0"!
Second producer sent: 0
Consumer received "Second: 0"!
First producer sent: 1
Consumer received "First: 1"!
Second producer sent: 1
Consumer received "Second: 1"!
First producer sent: 2
Consumer received "First: 2"!
Second producer sent: 2
Consumer received "Second: 2"!
First producer sent: 3
Consumer received "First: 3"!
Second producer sent: 3
Consumer received "Second: 3"!
First producer sent: 4
Consumer received "First: 4"!
Second producer sent: 4
Consumer received "Second: 4"!

如果是;这是代码:

class CrystalIsAwesome
  @@products = 0
  @@channel = Channel(String).new

  def self.produce(which, times, produce_time = 0)
    @@products += times
    spawn do
      times.times do |i|
        puts "#{which} producer sent: #{i}"
        @@channel.send "#{which}: #{i}"
        sleep produce_time.seconds
        end
    end
  end

  # first producer
  produce "First", 5

  # second producer
  produce "Second", 5

  @@products.times do |_|
    puts %(Consumer received "#{@@channel.receive}"!)
  end
end

尝试添加第三个制作人 (produce "Third", 2, 1),看看效果如何。

再次;请参阅 the Concurrency docs 了解为什么会这样。