为什么我用 lua 协程实现的生产者消费者模型不能正常工作?

Why does my producer-consumer model implemented by lua coroutine cannot work expectedly?

我使用协程来实现生产者-消费者模型。我的代码如下:

function send(prod, x)
    coroutine.resume(prod, x)
end

function receive()
    local x = coroutine.yield()
    return x
end

function consumer()
    return coroutine.create(function ()
        while true do
            local x = receive()
            io.write(x, "\n")
        end
    end)
end

function producer(prod)
    while true do
        local x = io.read()
        send(prod, x)
    end
end

producer(consumer())

当我 运行 这段代码时,我得到:

第一个输入消息(“Hello World”)消失了。它应该打印两次,但现在只打印一次。在我看来,我的生产者-消费者模式的流程图应该是这样的:

我是不是听错了?

输入第一个字符串后,将按以下顺序调用函数:

  1. send,
  2. consumer
  3. 返回的协程
  4. receive,
  5. producer。 然后程序等待用户输入。

local x = coroutine.yield() 因此 local x = receive() 屈服于 producerio.write(x, "\n")现阶段未达到。

用户输入第二行后,如下:

  1. send,
  2. consumerlocal x = receive() 之后恢复并打印第二个输入,
  3. 在无限循环中,consumer 调用 receive
  4. receive 屈服于 producer,
  5. 等待用户输入。

这里是更正确的代码:

local function send (x)
    coroutine.yield(x)
end

local function receive (prod)
    local status, value = coroutine.resume(prod)
    return value
end

local function producer()
    return coroutine.create(
        function ()
            while true do
                local x = io.read()     -- produce new value
                send(x)
            end
        end
    )
end

function consumer (prod)
    while true do
        local x = receive(prod)   -- get new value
        io.write(x, "\n")          -- consume new value
    end
end
    
consumer(producer())

请注意,它是 consumer(producer()),而不是相反。另请注意 producer 是协程,而不是 consumersend 产量和 receive 恢复。

consumer 开始节目,producer 一次又一次地继续。如果相反,如您的示例所示,consumer 直到第二次迭代才准备好消费产品。

UPD: 下面是“force-feeding”,即producer-driven,代码:

local function send (cons, x)
    coroutine.resume (cons, x)
end

local function receive ()
    return coroutine.yield()
end

local function consumer()
    return coroutine.create (
        function (x)
            while true do
                io.write(x, '\n')
                -- corotine.yield() returns the extra arguments of coroutine.resume()
                x = receive()
            end
        end
    )
end

function producer (cons)
    while true do
        local x = io.read() -- make a new value
        send (cons, x)      -- feed the new value
    end
end
    
producer (consumer ())

与作者示例的不同之处在于 producer sendconsumer 并且 receivewrite 之后。

延伸阅读:https://www.lua.org/pil/9.2.html.

第一次“恢复”协程时,它不会直接跳转到第一个 yield,而是使用给定的参数调用包装函数:

local co = coroutine.wrap(function(aaa)
   print(aaa) -- prints "first"
   print(coroutine.yield()) -- prints "second"
end)

co("first")
co("second")

在您的代码中解决此问题的简单方法:

local send, receive =
   coroutine.resume, coroutine.yield

function consumer()
   return coroutine.create(function(x)
      while true do
         io.write(x, "\n")
         x = receive()
      end
   end)
end

function producer(consumer)
   while true do
      local x = io.read()
      send(consumer, x)
   end
end

producer(consumer())