为什么任务没有分配给所有工人?
Why are the tasks not being distributed to all the workers?
以下翻译自 ZeroMQ 指南中的 Divide and Conquer example。
module ZeroMQ
open System
open System.IO
open System.Threading
open System.Threading.Tasks
open NetMQ
open NetMQ.Sockets
let parallel_task () =
let task_number = 100
let uri_source, uri_sink =
let uri = "ipc://parallel_task"
Path.Join(uri,"source"), Path.Join(uri,"sink")
printfn "%A, %A" uri_source uri_sink
let rnd = Random()
use source = new PushSocket(uri_source)
use sink = new PushSocket(uri_sink)
let tasks = Array.init task_number (fun _ -> rnd.Next 100+1)
let ventilator_init () =
printf "Press enter when workers are ready.\n"
printf "Total expected time: %A\n" (TimeSpan.FromMilliseconds(Array.sum tasks |> float))
Console.ReadLine() |> ignore
let ventilator_run () =
sink.SendFrame("0")
printf "Sending tasks to workers.\n"
Array.iter (string >> source.SendFrame) tasks
Thread.Sleep(1)
let worker i () =
printf "Starting worker %i\n" i
use source = new PullSocket(uri_source)
use sink = new PushSocket(uri_sink)
while true do
let msg = source.ReceiveFrameString()
printf "Worker %i received message.\n" i
//printf "%s.\n" msg
Thread.Sleep(int msg)
sink.SendFrame("")
let sink () =
use sink = new PullSocket(uri_sink)
let watch = Diagnostics.Stopwatch()
for i=1 to task_number do
let _ = sink.ReceiveFrameString()
if watch.IsRunning = false then watch.Start()
printf (if i % 10 = 0 then ":" else ".")
printf "\nTotal elapsed time: %A msec\n" watch.Elapsed
ventilator_init()
for i=1 to 4 do Task.Run (worker i) |> ignore
let t = Task.Run sink
ventilator_run()
t.Wait()
[<EntryPoint>]
let main argv =
parallel_task()
0
这里发生的是单个工作人员获取所有消息,而其他线程的 none 被分配任何工作。为什么会这样?
open System
open System.IO
open System.Threading
open System.Threading.Tasks
open NetMQ
open NetMQ.Sockets
let parallel_task () =
let task_number = 100
let uri_source, uri_sink =
let uri = "ipc://parallel_task"
Path.Join(uri,"source"), Path.Join(uri,"sink")
let ventilator () =
let rnd = Random()
use source = new PushSocket()
source.Bind(uri_source)
use sink = new PushSocket()
sink.Connect(uri_sink)
let tasks = Array.init task_number (fun _ -> rnd.Next 100+1)
printf "Press enter when workers are ready.\n"
printf "Total expected time: %A\n" (TimeSpan.FromMilliseconds(Array.sum tasks |> float))
Console.ReadLine() |> ignore
sink.SendFrame("0")
printf "Sending tasks to workers.\n"
Array.iter (string >> source.SendFrame) tasks
Thread.Sleep(1)
let worker i () =
printf "Starting worker %i\n" i
use source = new PullSocket()
source.Connect(uri_source)
use sink = new PushSocket()
sink.Connect(uri_sink)
while true do
let msg = source.ReceiveFrameString()
printf "Worker %i received message.\n" i
Thread.Sleep(int msg)
sink.SendFrame("")
let sink () =
use sink = new PullSocket()
sink.Bind(uri_sink)
let watch = Diagnostics.Stopwatch()
for i=1 to task_number do
let _ = sink.ReceiveFrameString()
if watch.IsRunning = false then watch.Start()
printf (if i % 10 = 0 then ":" else ".")
printf "\nTotal elapsed time: %A msec\n" watch.Elapsed
Task.Run ventilator |> ignore
for i=1 to 4 do Task.Run (worker i) |> ignore
Task.Run(sink).Wait()
这是上面可以正常工作的清理版本。我必须明确指出什么是绑定,什么是连接。谢谢@somdoron 的提示。
以下翻译自 ZeroMQ 指南中的 Divide and Conquer example。
module ZeroMQ
open System
open System.IO
open System.Threading
open System.Threading.Tasks
open NetMQ
open NetMQ.Sockets
let parallel_task () =
let task_number = 100
let uri_source, uri_sink =
let uri = "ipc://parallel_task"
Path.Join(uri,"source"), Path.Join(uri,"sink")
printfn "%A, %A" uri_source uri_sink
let rnd = Random()
use source = new PushSocket(uri_source)
use sink = new PushSocket(uri_sink)
let tasks = Array.init task_number (fun _ -> rnd.Next 100+1)
let ventilator_init () =
printf "Press enter when workers are ready.\n"
printf "Total expected time: %A\n" (TimeSpan.FromMilliseconds(Array.sum tasks |> float))
Console.ReadLine() |> ignore
let ventilator_run () =
sink.SendFrame("0")
printf "Sending tasks to workers.\n"
Array.iter (string >> source.SendFrame) tasks
Thread.Sleep(1)
let worker i () =
printf "Starting worker %i\n" i
use source = new PullSocket(uri_source)
use sink = new PushSocket(uri_sink)
while true do
let msg = source.ReceiveFrameString()
printf "Worker %i received message.\n" i
//printf "%s.\n" msg
Thread.Sleep(int msg)
sink.SendFrame("")
let sink () =
use sink = new PullSocket(uri_sink)
let watch = Diagnostics.Stopwatch()
for i=1 to task_number do
let _ = sink.ReceiveFrameString()
if watch.IsRunning = false then watch.Start()
printf (if i % 10 = 0 then ":" else ".")
printf "\nTotal elapsed time: %A msec\n" watch.Elapsed
ventilator_init()
for i=1 to 4 do Task.Run (worker i) |> ignore
let t = Task.Run sink
ventilator_run()
t.Wait()
[<EntryPoint>]
let main argv =
parallel_task()
0
这里发生的是单个工作人员获取所有消息,而其他线程的 none 被分配任何工作。为什么会这样?
open System
open System.IO
open System.Threading
open System.Threading.Tasks
open NetMQ
open NetMQ.Sockets
let parallel_task () =
let task_number = 100
let uri_source, uri_sink =
let uri = "ipc://parallel_task"
Path.Join(uri,"source"), Path.Join(uri,"sink")
let ventilator () =
let rnd = Random()
use source = new PushSocket()
source.Bind(uri_source)
use sink = new PushSocket()
sink.Connect(uri_sink)
let tasks = Array.init task_number (fun _ -> rnd.Next 100+1)
printf "Press enter when workers are ready.\n"
printf "Total expected time: %A\n" (TimeSpan.FromMilliseconds(Array.sum tasks |> float))
Console.ReadLine() |> ignore
sink.SendFrame("0")
printf "Sending tasks to workers.\n"
Array.iter (string >> source.SendFrame) tasks
Thread.Sleep(1)
let worker i () =
printf "Starting worker %i\n" i
use source = new PullSocket()
source.Connect(uri_source)
use sink = new PushSocket()
sink.Connect(uri_sink)
while true do
let msg = source.ReceiveFrameString()
printf "Worker %i received message.\n" i
Thread.Sleep(int msg)
sink.SendFrame("")
let sink () =
use sink = new PullSocket()
sink.Bind(uri_sink)
let watch = Diagnostics.Stopwatch()
for i=1 to task_number do
let _ = sink.ReceiveFrameString()
if watch.IsRunning = false then watch.Start()
printf (if i % 10 = 0 then ":" else ".")
printf "\nTotal elapsed time: %A msec\n" watch.Elapsed
Task.Run ventilator |> ignore
for i=1 to 4 do Task.Run (worker i) |> ignore
Task.Run(sink).Wait()
这是上面可以正常工作的清理版本。我必须明确指出什么是绑定,什么是连接。谢谢@somdoron 的提示。