gpt4 book ai didi

f# - 为什么任务没有分配给所有 worker ?

转载 作者:行者123 更新时间:2023-12-04 10:03:10 25 4
gpt4 key购买 nike

以下内容翻译自 Divide and Conquer example在 ZeroMQ 指南中。

module ZeroMQ

open System
open System.IO
open System.Threading
open System.Threading.Tasks
open NetMQ
open NetMQ.Sockets

let parallel_task () =
let task_number = 100
let uri_source, uri_sink =
let uri = "ipc://parallel_task"
Path.Join(uri,"source"), Path.Join(uri,"sink")

printfn "%A, %A" uri_source uri_sink

let rnd = Random()
use source = new PushSocket(uri_source)
use sink = new PushSocket(uri_sink)
let tasks = Array.init task_number (fun _ -> rnd.Next 100+1)

let ventilator_init () =
printf "Press enter when workers are ready.\n"
printf "Total expected time: %A\n" (TimeSpan.FromMilliseconds(Array.sum tasks |> float))
Console.ReadLine() |> ignore

let ventilator_run () =
sink.SendFrame("0")
printf "Sending tasks to workers.\n"
Array.iter (string >> source.SendFrame) tasks
Thread.Sleep(1)

let worker i () =
printf "Starting worker %i\n" i
use source = new PullSocket(uri_source)
use sink = new PushSocket(uri_sink)
while true do
let msg = source.ReceiveFrameString()
printf "Worker %i received message.\n" i
//printf "%s.\n" msg
Thread.Sleep(int msg)
sink.SendFrame("")

let sink () =
use sink = new PullSocket(uri_sink)
let watch = Diagnostics.Stopwatch()
for i=1 to task_number do
let _ = sink.ReceiveFrameString()
if watch.IsRunning = false then watch.Start()
printf (if i % 10 = 0 then ":" else ".")
printf "\nTotal elapsed time: %A msec\n" watch.Elapsed
ventilator_init()
for i=1 to 4 do Task.Run (worker i) |> ignore
let t = Task.Run sink
ventilator_run()
t.Wait()

[<EntryPoint>]
let main argv =
parallel_task()
0

这里发生的是单个工作人员获取所有消息,而其他线程都没有分配任何工作。为什么会这样?

最佳答案

open System
open System.IO
open System.Threading
open System.Threading.Tasks
open NetMQ
open NetMQ.Sockets

let parallel_task () =
let task_number = 100
let uri_source, uri_sink =
let uri = "ipc://parallel_task"
Path.Join(uri,"source"), Path.Join(uri,"sink")

let ventilator () =
let rnd = Random()
use source = new PushSocket()
source.Bind(uri_source)
use sink = new PushSocket()
sink.Connect(uri_sink)
let tasks = Array.init task_number (fun _ -> rnd.Next 100+1)
printf "Press enter when workers are ready.\n"
printf "Total expected time: %A\n" (TimeSpan.FromMilliseconds(Array.sum tasks |> float))
Console.ReadLine() |> ignore
sink.SendFrame("0")
printf "Sending tasks to workers.\n"
Array.iter (string >> source.SendFrame) tasks
Thread.Sleep(1)

let worker i () =
printf "Starting worker %i\n" i
use source = new PullSocket()
source.Connect(uri_source)
use sink = new PushSocket()
sink.Connect(uri_sink)
while true do
let msg = source.ReceiveFrameString()
printf "Worker %i received message.\n" i
Thread.Sleep(int msg)
sink.SendFrame("")

let sink () =
use sink = new PullSocket()
sink.Bind(uri_sink)
let watch = Diagnostics.Stopwatch()
for i=1 to task_number do
let _ = sink.ReceiveFrameString()
if watch.IsRunning = false then watch.Start()
printf (if i % 10 = 0 then ":" else ".")
printf "\nTotal elapsed time: %A msec\n" watch.Elapsed

Task.Run ventilator |> ignore
for i=1 to 4 do Task.Run (worker i) |> ignore
Task.Run(sink).Wait()

这是上面正常工作的清理版本。我必须明确指出什么是绑定(bind),什么是连接。谢谢@somdoron 的提示。

关于f# - 为什么任务没有分配给所有 worker ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61731469/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com