F# MailboxProcessor 限制并行度
F# MailboxProcessor limit parallelism
我是 F# 的新手,正在尝试使用 MailboxProcessor 进行试验以确保状态更改是独立完成的。
简而言之,我将操作(描述状态变化的不可变对象)发布到 MailboxProcessor,在递归函数中我读取消息并生成一个新状态(即在下面的示例中将一个项目添加到集合中)并且将该状态发送到下一个递归。
open System
type AppliationState =
{
Store : string list
}
static member Default =
{
Store = List.empty
}
member this.HandleAction (action:obj) =
match action with
| :? string as a -> { this with Store = a :: this.Store }
| _ -> this
type Agent<'T> = MailboxProcessor<'T>
[<AbstractClass; Sealed>]
type AppHolder private () =
static member private Processor = Agent.Start(fun inbox ->
let rec loop (s : AppliationState) =
async {
let! action = inbox.Receive()
let s' = s.HandleAction action
Console.WriteLine("{s: " + s.Store.Length.ToString() + " s': " + s'.Store.Length.ToString())
return! loop s'
}
loop AppliationState.Default)
static member HandleAction (action:obj) =
AppHolder.Processor.Post action
[<EntryPoint>]
let main argv =
AppHolder.HandleAction "a"
AppHolder.HandleAction "b"
AppHolder.HandleAction "c"
AppHolder.HandleAction "d"
Console.ReadLine()
0 // return an integer exit code
预期输出为:
s: 0 s': 1
s: 1 s': 2
s: 2 s': 3
s: 3 s': 4
我得到的是:
s: 0 s': 1
s: 0 s': 1
s: 0 s': 1
s: 0 s': 1
阅读 MailboxProcessor 的文档并在谷歌上搜索我的结论是它是一个消息队列,由 'single-thread' 处理,而不是看起来它们都是并行处理的。
我完全不在场吗?
我认为问题一定出在您对 HandleAction 的实现上。我实现了以下,它产生了预期的输出。
open System
type ApplicationState =
{
Items: int list
}
static member Default = {Items = []}
member this.HandleAction x = {this with Items = x::this.Items}
type Message = Add of int
let Processor = MailboxProcessor<Message>.Start(fun inbox ->
let rec loop (s : ApplicationState) =
async {
let! (Add action) = inbox.Receive()
let s' = s.HandleAction action
Console.WriteLine("s: " + s.Items.Length.ToString() + " s': " + s'.Items.Length.ToString())
return! loop s'
}
loop ApplicationState.Default)
Processor.Post (Add 1)
Processor.Post (Add 2)
Processor.Post (Add 3)
Processor.Post (Add 4)
// OUTPUT
// s: 0 s': 1
// s: 1 s': 2
// s: 2 s': 3
// s: 3 s': 4
编辑
看到更新后的代码示例后,我相信正确的 F# 解决方案只是将 AppHolder
类型从 class 切换为模块。更新后的代码如下:
open System
type AppliationState =
{
Store : string list
}
static member Default =
{
Store = List.empty
}
member this.HandleAction (action:obj) =
match action with
| :? string as a -> { this with Store = a :: this.Store }
| _ -> this
type Agent<'T> = MailboxProcessor<'T>
module AppHolder =
let private processor = Agent.Start(fun inbox ->
let rec loop (s : AppliationState) =
async {
let! action = inbox.Receive()
let s' = s.HandleAction action
Console.WriteLine("{s: " + s.Store.Length.ToString() + " s': " + s'.Store.Length.ToString())
return! loop s'
}
loop AppliationState.Default)
let handleAction (action:obj) =
processor.Post action
AppHolder.handleAction "a"
AppHolder.handleAction "b"
AppHolder.handleAction "c"
AppHolder.handleAction "d"
这输出与之前相同的结果:
{s: 0 s': 1
{s: 1 s': 2
{s: 2 s': 3
{s: 3 s': 4
问题是您认为 AppHolder.Processor
每次都是同一个对象,但实际上每次都是不同的 MailboxProcessor。我将您的 AppHolder 代码更改为以下内容:
[<AbstractClass; Sealed>]
type AppHolder private () =
static member private Processor =
printfn "Starting..."
Agent.Start(fun inbox ->
let rec loop (s : AppliationState) =
async {
let! action = inbox.Receive()
let s' = s.HandleAction action
printfn "{s: %A s': %A}" s s'
return! loop s'
}
loop AppliationState.Default)
static member HandleAction (action:obj) =
AppHolder.Processor.Post action
我所做的唯一更改是简化 Console.WriteLine 调用以使用 printfn
和 %A
获取更多调试细节,并添加单个 printfn "Starting..."
调用将在构建和启动 MailboxProcessor 之前立即执行。我得到的输出是:
Starting...
Starting...
Starting...
Starting...
{s: {Store = [];} s': {Store = ["b"];}}
{s: {Store = [];} s': {Store = ["d"];}}
{s: {Store = [];} s': {Store = ["c"];}}
{s: {Store = [];} s': {Store = ["a"];}}
注意 printfn "Starting..."
行已经执行了四次。
这吸引了很多 F# 新手:member
关键字定义了 属性,而不是字段。每次计算 属性 时,都会重新计算 属性 的主体。因此,每次访问 AppHolder.Processor
时,您都会获得一个新的 MailboxProcessor。有关详细信息,请参阅 https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/members/properties。
您可能想要的是以下内容:
[<AbstractClass; Sealed>]
type AppHolder private () =
static let processor =
printfn "Starting..."
Agent.Start(fun inbox ->
// ...
)
static member HandleAction (action:obj) =
processor.Post action
我是 F# 的新手,正在尝试使用 MailboxProcessor 进行试验以确保状态更改是独立完成的。
简而言之,我将操作(描述状态变化的不可变对象)发布到 MailboxProcessor,在递归函数中我读取消息并生成一个新状态(即在下面的示例中将一个项目添加到集合中)并且将该状态发送到下一个递归。
open System
type AppliationState =
{
Store : string list
}
static member Default =
{
Store = List.empty
}
member this.HandleAction (action:obj) =
match action with
| :? string as a -> { this with Store = a :: this.Store }
| _ -> this
type Agent<'T> = MailboxProcessor<'T>
[<AbstractClass; Sealed>]
type AppHolder private () =
static member private Processor = Agent.Start(fun inbox ->
let rec loop (s : AppliationState) =
async {
let! action = inbox.Receive()
let s' = s.HandleAction action
Console.WriteLine("{s: " + s.Store.Length.ToString() + " s': " + s'.Store.Length.ToString())
return! loop s'
}
loop AppliationState.Default)
static member HandleAction (action:obj) =
AppHolder.Processor.Post action
[<EntryPoint>]
let main argv =
AppHolder.HandleAction "a"
AppHolder.HandleAction "b"
AppHolder.HandleAction "c"
AppHolder.HandleAction "d"
Console.ReadLine()
0 // return an integer exit code
预期输出为:
s: 0 s': 1
s: 1 s': 2
s: 2 s': 3
s: 3 s': 4
我得到的是:
s: 0 s': 1
s: 0 s': 1
s: 0 s': 1
s: 0 s': 1
阅读 MailboxProcessor 的文档并在谷歌上搜索我的结论是它是一个消息队列,由 'single-thread' 处理,而不是看起来它们都是并行处理的。
我完全不在场吗?
我认为问题一定出在您对 HandleAction 的实现上。我实现了以下,它产生了预期的输出。
open System
type ApplicationState =
{
Items: int list
}
static member Default = {Items = []}
member this.HandleAction x = {this with Items = x::this.Items}
type Message = Add of int
let Processor = MailboxProcessor<Message>.Start(fun inbox ->
let rec loop (s : ApplicationState) =
async {
let! (Add action) = inbox.Receive()
let s' = s.HandleAction action
Console.WriteLine("s: " + s.Items.Length.ToString() + " s': " + s'.Items.Length.ToString())
return! loop s'
}
loop ApplicationState.Default)
Processor.Post (Add 1)
Processor.Post (Add 2)
Processor.Post (Add 3)
Processor.Post (Add 4)
// OUTPUT
// s: 0 s': 1
// s: 1 s': 2
// s: 2 s': 3
// s: 3 s': 4
编辑
看到更新后的代码示例后,我相信正确的 F# 解决方案只是将 AppHolder
类型从 class 切换为模块。更新后的代码如下:
open System
type AppliationState =
{
Store : string list
}
static member Default =
{
Store = List.empty
}
member this.HandleAction (action:obj) =
match action with
| :? string as a -> { this with Store = a :: this.Store }
| _ -> this
type Agent<'T> = MailboxProcessor<'T>
module AppHolder =
let private processor = Agent.Start(fun inbox ->
let rec loop (s : AppliationState) =
async {
let! action = inbox.Receive()
let s' = s.HandleAction action
Console.WriteLine("{s: " + s.Store.Length.ToString() + " s': " + s'.Store.Length.ToString())
return! loop s'
}
loop AppliationState.Default)
let handleAction (action:obj) =
processor.Post action
AppHolder.handleAction "a"
AppHolder.handleAction "b"
AppHolder.handleAction "c"
AppHolder.handleAction "d"
这输出与之前相同的结果:
{s: 0 s': 1
{s: 1 s': 2
{s: 2 s': 3
{s: 3 s': 4
问题是您认为 AppHolder.Processor
每次都是同一个对象,但实际上每次都是不同的 MailboxProcessor。我将您的 AppHolder 代码更改为以下内容:
[<AbstractClass; Sealed>]
type AppHolder private () =
static member private Processor =
printfn "Starting..."
Agent.Start(fun inbox ->
let rec loop (s : AppliationState) =
async {
let! action = inbox.Receive()
let s' = s.HandleAction action
printfn "{s: %A s': %A}" s s'
return! loop s'
}
loop AppliationState.Default)
static member HandleAction (action:obj) =
AppHolder.Processor.Post action
我所做的唯一更改是简化 Console.WriteLine 调用以使用 printfn
和 %A
获取更多调试细节,并添加单个 printfn "Starting..."
调用将在构建和启动 MailboxProcessor 之前立即执行。我得到的输出是:
Starting...
Starting...
Starting...
Starting...
{s: {Store = [];} s': {Store = ["b"];}}
{s: {Store = [];} s': {Store = ["d"];}}
{s: {Store = [];} s': {Store = ["c"];}}
{s: {Store = [];} s': {Store = ["a"];}}
注意 printfn "Starting..."
行已经执行了四次。
这吸引了很多 F# 新手:member
关键字定义了 属性,而不是字段。每次计算 属性 时,都会重新计算 属性 的主体。因此,每次访问 AppHolder.Processor
时,您都会获得一个新的 MailboxProcessor。有关详细信息,请参阅 https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/members/properties。
您可能想要的是以下内容:
[<AbstractClass; Sealed>]
type AppHolder private () =
static let processor =
printfn "Starting..."
Agent.Start(fun inbox ->
// ...
)
static member HandleAction (action:obj) =
processor.Post action