为什么任务没有分配给所有工人?

Why are the tasks not being distributed to all the workers?

以下翻译自 ZeroMQ 指南中的 Divide and Conquer example

module ZeroMQ

open System
open System.IO
open System.Threading
open System.Threading.Tasks
open NetMQ
open NetMQ.Sockets

let parallel_task () =
    let task_number = 100
    let uri_source, uri_sink = 
        let uri = "ipc://parallel_task"
        Path.Join(uri,"source"), Path.Join(uri,"sink")

    printfn "%A, %A" uri_source uri_sink

    let rnd = Random()
    use source = new PushSocket(uri_source)
    use sink = new PushSocket(uri_sink)
    let tasks = Array.init task_number (fun _ -> rnd.Next 100+1)

    let ventilator_init () =
        printf "Press enter when workers are ready.\n"
        printf "Total expected time: %A\n" (TimeSpan.FromMilliseconds(Array.sum tasks |> float))
        Console.ReadLine() |> ignore

    let ventilator_run () =
        sink.SendFrame("0")
        printf "Sending tasks to workers.\n"
        Array.iter (string >> source.SendFrame) tasks
        Thread.Sleep(1)

    let worker i () =
        printf "Starting worker %i\n" i
        use source = new PullSocket(uri_source)
        use sink = new PushSocket(uri_sink)
        while true do
            let msg = source.ReceiveFrameString()
            printf "Worker %i received message.\n" i
            //printf "%s.\n" msg
            Thread.Sleep(int msg)
            sink.SendFrame("")

    let sink () =
        use sink = new PullSocket(uri_sink)
        let watch = Diagnostics.Stopwatch()
        for i=1 to task_number do
            let _ = sink.ReceiveFrameString()
            if watch.IsRunning = false then watch.Start()
            printf (if i % 10 = 0 then ":" else ".")
        printf "\nTotal elapsed time: %A msec\n" watch.Elapsed
    ventilator_init()
    for i=1 to 4 do Task.Run (worker i) |> ignore
    let t = Task.Run sink
    ventilator_run()
    t.Wait()

[<EntryPoint>]
let main argv =
    parallel_task()
    0

这里发生的是单个工作人员获取所有消息,而其他线程的 none 被分配任何工作。为什么会这样?

open System
open System.IO
open System.Threading
open System.Threading.Tasks
open NetMQ
open NetMQ.Sockets

let parallel_task () =
    let task_number = 100
    let uri_source, uri_sink = 
        let uri = "ipc://parallel_task"
        Path.Join(uri,"source"), Path.Join(uri,"sink")

    let ventilator () =
        let rnd = Random()
        use source = new PushSocket()
        source.Bind(uri_source)
        use sink = new PushSocket()
        sink.Connect(uri_sink)
        let tasks = Array.init task_number (fun _ -> rnd.Next 100+1)
        printf "Press enter when workers are ready.\n"
        printf "Total expected time: %A\n" (TimeSpan.FromMilliseconds(Array.sum tasks |> float))
        Console.ReadLine() |> ignore
        sink.SendFrame("0")
        printf "Sending tasks to workers.\n"
        Array.iter (string >> source.SendFrame) tasks
        Thread.Sleep(1)

    let worker i () =
        printf "Starting worker %i\n" i
        use source = new PullSocket()
        source.Connect(uri_source)
        use sink = new PushSocket()
        sink.Connect(uri_sink)
        while true do
            let msg = source.ReceiveFrameString()
            printf "Worker %i received message.\n" i
            Thread.Sleep(int msg)
            sink.SendFrame("")

    let sink () =
        use sink = new PullSocket()
        sink.Bind(uri_sink)
        let watch = Diagnostics.Stopwatch()
        for i=1 to task_number do
            let _ = sink.ReceiveFrameString()
            if watch.IsRunning = false then watch.Start()
            printf (if i % 10 = 0 then ":" else ".")
        printf "\nTotal elapsed time: %A msec\n" watch.Elapsed

    Task.Run ventilator |> ignore
    for i=1 to 4 do Task.Run (worker i) |> ignore
    Task.Run(sink).Wait()

这是上面可以正常工作的清理版本。我必须明确指出什么是绑定,什么是连接。谢谢@somdoron 的提示。