F# MailboxProcessor 慢的银行帐户卡塔
Bank account kata with F# MailboxProcessor slow
我用 F# MailboxProcessor
对“经典”bank account kata 进行了编码,使其成为线程安全的。但是当我尝试将交易并行添加到帐户时,速度非常慢非常快:10 个并行调用有响应(2 毫秒),20 个没有响应(9 秒)! (见下面的最后一个测试 Account can be updated from multiple threads
)
由于MailboxProcessor
支持每秒3000万条消息(参见theburningmonk's article),问题从何而来?
// -- Domain ----
type Message =
| Open of AsyncReplyChannel<bool>
| Close of AsyncReplyChannel<bool>
| Balance of AsyncReplyChannel<decimal option>
| Transaction of decimal * AsyncReplyChannel<bool>
type AccountState = { Opened: bool; Transactions: decimal list }
type Account() =
let agent = MailboxProcessor<Message>.Start(fun inbox ->
let rec loop (state: AccountState) =
async {
let! message = inbox.Receive()
match message with
| Close channel ->
channel.Reply state.Opened
return! loop { state with Opened = false }
| Open channel ->
printfn $"Opening"
channel.Reply (not state.Opened)
return! loop { state with Opened = true }
| Transaction (tran, channel) ->
printfn $"Adding transaction {tran}, nb = {state.Transactions.Length}"
channel.Reply true
return! loop { state with Transactions = tran :: state.Transactions }
| Balance channel ->
let balance =
if state.Opened then
state.Transactions |> List.sum |> Some
else
None
balance |> channel.Reply
return! loop state
}
loop { Opened = false; Transactions = [] }
)
member _.Open () = agent.PostAndReply(Open)
member _.Close () = agent.PostAndReply(Close)
member _.Balance () = agent.PostAndReply(Balance)
member _.Transaction (transaction: decimal) =
agent.PostAndReply(fun channel -> Transaction (transaction, channel))
// -- API ----
let mkBankAccount = Account
let openAccount (account: Account) =
match account.Open() with
| true -> Some account
| false -> None
let closeAccount (account: Account option) =
account |> Option.bind (fun a ->
match a.Close() with
| true -> Some a
| false -> None)
let updateBalance transaction (account: Account option) =
account |> Option.bind (fun a ->
match a.Transaction(transaction) with
| true -> Some a
| false -> None)
let getBalance (account: Account option) =
account |> Option.bind (fun a -> a.Balance())
// -- Tests ----
let should_equal expected actual =
if expected = actual then
Ok expected
else
Error (expected, actual)
let should_not_equal expected actual =
if expected <> actual then
Ok expected
else
Error (expected, actual)
let ``Returns empty balance after opening`` =
let account = mkBankAccount() |> openAccount
getBalance account |> should_equal (Some 0.0m)
let ``Check basic balance`` =
let account = mkBankAccount() |> openAccount
let openingBalance = account |> getBalance
let updatedBalance =
account
|> updateBalance 10.0m
|> getBalance
openingBalance |> should_equal (Some 0.0m),
updatedBalance |> should_equal (Some 10.0m)
let ``Balance can increment or decrement`` =
let account = mkBankAccount() |> openAccount
let openingBalance = account |> getBalance
let addedBalance =
account
|> updateBalance 10.0m
|> getBalance
let subtractedBalance =
account
|> updateBalance -15.0m
|> getBalance
openingBalance |> should_equal (Some 0.0m),
addedBalance |> should_equal (Some 10.0m),
subtractedBalance |> should_equal (Some -5.0m)
let ``Account can be closed`` =
let account =
mkBankAccount()
|> openAccount
|> closeAccount
getBalance account |> should_equal None,
account |> should_not_equal None
#time
let ``Account can be updated from multiple threads`` =
let account =
mkBankAccount()
|> openAccount
let updateAccountAsync =
async {
account
|> updateBalance 1.0m
|> ignore
}
let nb = 10 // 10 is quick (2ms), 20 is so long (9s)
updateAccountAsync
|> List.replicate nb
|> Async.Parallel
|> Async.RunSynchronously
|> ignore
getBalance account |> should_equal (Some (decimal nb))
#time
你的问题是你的代码没有一直使用 Async。
您的帐户 class 具有方法 Open
、Close
、Balance
和 Transaction
,并且您使用 AsyncReplyChannel
但是
您使用 PostAndReply
发送消息。这意味着:您向 MailboxProcessor 发送一条消息,并提供一个回复通道。但是,此时,该方法等待同步完成。
即使有 Async.Parallel
和多个线程,也可能意味着很多线程锁定了它们自己。如果你改变
使用你所有的方法 PostAndAsyncReply
然后你的问题就消失了。
还有另外两个性能优化可以提高性能,但在您的示例中并不重要。
调用列表的长度是错误的。要计算列表的长度,您必须遍历整个列表。只有你
在事务中使用它来打印长度,但要考虑事务列表是否变长。你总是必须经历
整个列表,无论何时添加交易。这将是您交易清单的 O(N)。
呼叫也是如此(List.sum)。每当您调用 Balance 时,您都必须计算当前的 Balance。也是 O(N).
因为您有一个 MailboxProcessor,您还可以计算这两个值,而不是再次完全重新计算这些值,again.Thus,它们变成 O(1) 操作。
最重要的是,我会将 Open
、Close
和 Transaction
消息更改为 return nothing,在我看来,这没有意义他们 return 任何东西。你的例子甚至让我对 bool
return 感到困惑
价值观甚至意味着。
在 Close
消息中 return state.Opened
之前将其设置为 false。为什么?
在 Open
消息中您 return 否定了 state.Opened
。你以后如何使用它看起来不对。
如果 bool
背后有更多含义,请从中创建一个独特的歧视联盟,描述它 return 的目的。
您在整个代码中使用了 option<Acount>
,我删除了它,因为我看不出它有任何用途。
无论如何,这是一个完整的示例,说明我将如何编写没有速度问题的代码。
type Message =
| Open
| Close
| Balance of AsyncReplyChannel<decimal option>
| Transaction of decimal
type AccountState = {
Opened: bool
Transactions: decimal list
TransactionsLength: int
CurrentBalance: decimal
}
type Account() =
let agent = MailboxProcessor<Message>.Start(fun inbox ->
let rec loop (state: AccountState) = async {
match! inbox.Receive() with
| Close ->
printfn "Closing"
return! loop { state with Opened = false }
| Open ->
printfn "Opening"
return! loop { state with Opened = true }
| Transaction tran ->
let l = state.TransactionsLength + 1
printfn $"Adding transaction {tran}, nb = {l}"
if state.Opened then
return! loop {
state with
Transactions = tran :: state.Transactions
TransactionsLength = l
CurrentBalance = state.CurrentBalance + tran
}
else
return! loop state
| Balance channel ->
if state.Opened
then channel.Reply (Some state.CurrentBalance)
else channel.Reply None
return! loop state
}
let defaultAccount = {
Opened = false
Transactions = []
TransactionsLength = 0
CurrentBalance = 0m
}
loop defaultAccount
)
member _.Open () = agent.Post(Open)
member _.Close () = agent.Post(Close)
member _.Balance () = agent.PostAndAsyncReply(Balance)
member _.Transaction transaction = agent.Post(Transaction transaction)
(* Test *)
let should_equal expected actual =
if expected = actual then
Ok expected
else
Error (expected, actual)
(* --- API --- *)
let mkBankAccount = Account
(* Opens the Account *)
let openAccount (account: Account) =
account.Open ()
(* Closes the Account *)
let closeAccount (account: Account) =
account.Close ()
(* Updates Account *)
let updateBalance transaction (account: Account) =
account.Transaction(transaction)
(* Gets the current Balance *)
let getBalance (account: Account) =
account.Balance ()
#time
let ``Account can be updated from multiple threads`` =
let account = mkBankAccount ()
openAccount account
let updateBalanceAsync = async {
updateBalance 1.0m account
}
let nb = 50
List.replicate nb updateBalanceAsync
|> Async.Parallel
|> Async.RunSynchronously
|> ignore
Async.RunSynchronously (async {
let! balance = getBalance account
printfn "Balance is %A should be (Some %f)" balance (1.0m * decimal nb)
})
#time
我用 F# MailboxProcessor
对“经典”bank account kata 进行了编码,使其成为线程安全的。但是当我尝试将交易并行添加到帐户时,速度非常慢非常快:10 个并行调用有响应(2 毫秒),20 个没有响应(9 秒)! (见下面的最后一个测试 Account can be updated from multiple threads
)
由于MailboxProcessor
支持每秒3000万条消息(参见theburningmonk's article),问题从何而来?
// -- Domain ----
type Message =
| Open of AsyncReplyChannel<bool>
| Close of AsyncReplyChannel<bool>
| Balance of AsyncReplyChannel<decimal option>
| Transaction of decimal * AsyncReplyChannel<bool>
type AccountState = { Opened: bool; Transactions: decimal list }
type Account() =
let agent = MailboxProcessor<Message>.Start(fun inbox ->
let rec loop (state: AccountState) =
async {
let! message = inbox.Receive()
match message with
| Close channel ->
channel.Reply state.Opened
return! loop { state with Opened = false }
| Open channel ->
printfn $"Opening"
channel.Reply (not state.Opened)
return! loop { state with Opened = true }
| Transaction (tran, channel) ->
printfn $"Adding transaction {tran}, nb = {state.Transactions.Length}"
channel.Reply true
return! loop { state with Transactions = tran :: state.Transactions }
| Balance channel ->
let balance =
if state.Opened then
state.Transactions |> List.sum |> Some
else
None
balance |> channel.Reply
return! loop state
}
loop { Opened = false; Transactions = [] }
)
member _.Open () = agent.PostAndReply(Open)
member _.Close () = agent.PostAndReply(Close)
member _.Balance () = agent.PostAndReply(Balance)
member _.Transaction (transaction: decimal) =
agent.PostAndReply(fun channel -> Transaction (transaction, channel))
// -- API ----
let mkBankAccount = Account
let openAccount (account: Account) =
match account.Open() with
| true -> Some account
| false -> None
let closeAccount (account: Account option) =
account |> Option.bind (fun a ->
match a.Close() with
| true -> Some a
| false -> None)
let updateBalance transaction (account: Account option) =
account |> Option.bind (fun a ->
match a.Transaction(transaction) with
| true -> Some a
| false -> None)
let getBalance (account: Account option) =
account |> Option.bind (fun a -> a.Balance())
// -- Tests ----
let should_equal expected actual =
if expected = actual then
Ok expected
else
Error (expected, actual)
let should_not_equal expected actual =
if expected <> actual then
Ok expected
else
Error (expected, actual)
let ``Returns empty balance after opening`` =
let account = mkBankAccount() |> openAccount
getBalance account |> should_equal (Some 0.0m)
let ``Check basic balance`` =
let account = mkBankAccount() |> openAccount
let openingBalance = account |> getBalance
let updatedBalance =
account
|> updateBalance 10.0m
|> getBalance
openingBalance |> should_equal (Some 0.0m),
updatedBalance |> should_equal (Some 10.0m)
let ``Balance can increment or decrement`` =
let account = mkBankAccount() |> openAccount
let openingBalance = account |> getBalance
let addedBalance =
account
|> updateBalance 10.0m
|> getBalance
let subtractedBalance =
account
|> updateBalance -15.0m
|> getBalance
openingBalance |> should_equal (Some 0.0m),
addedBalance |> should_equal (Some 10.0m),
subtractedBalance |> should_equal (Some -5.0m)
let ``Account can be closed`` =
let account =
mkBankAccount()
|> openAccount
|> closeAccount
getBalance account |> should_equal None,
account |> should_not_equal None
#time
let ``Account can be updated from multiple threads`` =
let account =
mkBankAccount()
|> openAccount
let updateAccountAsync =
async {
account
|> updateBalance 1.0m
|> ignore
}
let nb = 10 // 10 is quick (2ms), 20 is so long (9s)
updateAccountAsync
|> List.replicate nb
|> Async.Parallel
|> Async.RunSynchronously
|> ignore
getBalance account |> should_equal (Some (decimal nb))
#time
你的问题是你的代码没有一直使用 Async。
您的帐户 class 具有方法 Open
、Close
、Balance
和 Transaction
,并且您使用 AsyncReplyChannel
但是
您使用 PostAndReply
发送消息。这意味着:您向 MailboxProcessor 发送一条消息,并提供一个回复通道。但是,此时,该方法等待同步完成。
即使有 Async.Parallel
和多个线程,也可能意味着很多线程锁定了它们自己。如果你改变
使用你所有的方法 PostAndAsyncReply
然后你的问题就消失了。
还有另外两个性能优化可以提高性能,但在您的示例中并不重要。
调用列表的长度是错误的。要计算列表的长度,您必须遍历整个列表。只有你 在事务中使用它来打印长度,但要考虑事务列表是否变长。你总是必须经历 整个列表,无论何时添加交易。这将是您交易清单的 O(N)。
呼叫也是如此(List.sum)。每当您调用 Balance 时,您都必须计算当前的 Balance。也是 O(N).
因为您有一个 MailboxProcessor,您还可以计算这两个值,而不是再次完全重新计算这些值,again.Thus,它们变成 O(1) 操作。
最重要的是,我会将 Open
、Close
和 Transaction
消息更改为 return nothing,在我看来,这没有意义他们 return 任何东西。你的例子甚至让我对 bool
return 感到困惑
价值观甚至意味着。
在 Close
消息中 return state.Opened
之前将其设置为 false。为什么?
在 Open
消息中您 return 否定了 state.Opened
。你以后如何使用它看起来不对。
如果 bool
背后有更多含义,请从中创建一个独特的歧视联盟,描述它 return 的目的。
您在整个代码中使用了 option<Acount>
,我删除了它,因为我看不出它有任何用途。
无论如何,这是一个完整的示例,说明我将如何编写没有速度问题的代码。
type Message =
| Open
| Close
| Balance of AsyncReplyChannel<decimal option>
| Transaction of decimal
type AccountState = {
Opened: bool
Transactions: decimal list
TransactionsLength: int
CurrentBalance: decimal
}
type Account() =
let agent = MailboxProcessor<Message>.Start(fun inbox ->
let rec loop (state: AccountState) = async {
match! inbox.Receive() with
| Close ->
printfn "Closing"
return! loop { state with Opened = false }
| Open ->
printfn "Opening"
return! loop { state with Opened = true }
| Transaction tran ->
let l = state.TransactionsLength + 1
printfn $"Adding transaction {tran}, nb = {l}"
if state.Opened then
return! loop {
state with
Transactions = tran :: state.Transactions
TransactionsLength = l
CurrentBalance = state.CurrentBalance + tran
}
else
return! loop state
| Balance channel ->
if state.Opened
then channel.Reply (Some state.CurrentBalance)
else channel.Reply None
return! loop state
}
let defaultAccount = {
Opened = false
Transactions = []
TransactionsLength = 0
CurrentBalance = 0m
}
loop defaultAccount
)
member _.Open () = agent.Post(Open)
member _.Close () = agent.Post(Close)
member _.Balance () = agent.PostAndAsyncReply(Balance)
member _.Transaction transaction = agent.Post(Transaction transaction)
(* Test *)
let should_equal expected actual =
if expected = actual then
Ok expected
else
Error (expected, actual)
(* --- API --- *)
let mkBankAccount = Account
(* Opens the Account *)
let openAccount (account: Account) =
account.Open ()
(* Closes the Account *)
let closeAccount (account: Account) =
account.Close ()
(* Updates Account *)
let updateBalance transaction (account: Account) =
account.Transaction(transaction)
(* Gets the current Balance *)
let getBalance (account: Account) =
account.Balance ()
#time
let ``Account can be updated from multiple threads`` =
let account = mkBankAccount ()
openAccount account
let updateBalanceAsync = async {
updateBalance 1.0m account
}
let nb = 50
List.replicate nb updateBalanceAsync
|> Async.Parallel
|> Async.RunSynchronously
|> ignore
Async.RunSynchronously (async {
let! balance = getBalance account
printfn "Balance is %A should be (Some %f)" balance (1.0m * decimal nb)
})
#time