F# MailboxProcessor 慢的银行帐户卡塔

Bank account kata with F# MailboxProcessor slow

我用 F# MailboxProcessor 对“经典”bank account kata 进行了编码,使其成为线程安全的。但是当我尝试将交易并行添加到帐户时,速度非常慢非常快:10 个并行调用有响应(2 毫秒),20 个没有响应(9 秒)! (见下面的最后一个测试 Account can be updated from multiple threads

由于MailboxProcessor支持每秒3000万条消息(参见theburningmonk's article),问题从何而来?

// -- Domain ----

type Message =
    | Open of AsyncReplyChannel<bool>
    | Close of AsyncReplyChannel<bool>
    | Balance of AsyncReplyChannel<decimal option>
    | Transaction of decimal * AsyncReplyChannel<bool>

type AccountState = { Opened: bool; Transactions: decimal list }

type Account() =
    let agent = MailboxProcessor<Message>.Start(fun inbox ->
        let rec loop (state: AccountState) =
            async {
                let! message = inbox.Receive()
                match message with
                | Close channel ->
                    channel.Reply state.Opened
                    return! loop { state with Opened = false }
                | Open channel ->
                    printfn $"Opening"
                    channel.Reply (not state.Opened)
                    return! loop { state with Opened = true }
                | Transaction (tran, channel) ->
                    printfn $"Adding transaction {tran}, nb = {state.Transactions.Length}"
                    channel.Reply true
                    return! loop { state with Transactions = tran :: state.Transactions }
                | Balance channel ->
                    let balance =
                        if state.Opened then
                            state.Transactions |> List.sum |> Some
                        else
                            None
                    balance |> channel.Reply
                    return! loop state
            }
        loop { Opened = false; Transactions = [] }
    )

    member _.Open () = agent.PostAndReply(Open)
    member _.Close () = agent.PostAndReply(Close)
    member _.Balance () = agent.PostAndReply(Balance)
    member _.Transaction (transaction: decimal) =
        agent.PostAndReply(fun channel -> Transaction (transaction, channel))

// -- API ----

let mkBankAccount = Account

let openAccount (account: Account) =
    match account.Open() with
    | true -> Some account
    | false -> None

let closeAccount (account: Account option) =
    account |> Option.bind (fun a ->
        match a.Close() with
        | true -> Some a
        | false -> None)

let updateBalance transaction (account: Account option) =
    account |> Option.bind (fun a ->
        match a.Transaction(transaction) with
        | true -> Some a
        | false -> None)

let getBalance (account: Account option) =
    account |> Option.bind (fun a -> a.Balance())
// -- Tests ----

let should_equal expected actual =
    if expected = actual then
        Ok expected
    else
        Error (expected, actual)

let should_not_equal expected actual =
    if expected <> actual then
        Ok expected
    else
        Error (expected, actual)

let ``Returns empty balance after opening`` =
    let account = mkBankAccount() |> openAccount
    getBalance account |> should_equal (Some 0.0m)

let ``Check basic balance`` =
    let account = mkBankAccount() |> openAccount
    let openingBalance = account |> getBalance
    let updatedBalance =
        account
        |> updateBalance 10.0m
        |> getBalance
    openingBalance |> should_equal (Some 0.0m),
    updatedBalance |> should_equal (Some 10.0m)

let ``Balance can increment or decrement`` =
    let account = mkBankAccount() |> openAccount
    let openingBalance = account |> getBalance
    let addedBalance =
        account
        |> updateBalance 10.0m
        |> getBalance
    let subtractedBalance =
        account
        |> updateBalance -15.0m
        |> getBalance
    openingBalance |> should_equal (Some 0.0m),
    addedBalance |> should_equal (Some 10.0m),
    subtractedBalance |> should_equal (Some -5.0m)

let ``Account can be closed`` =
    let account =
        mkBankAccount()
        |> openAccount
        |> closeAccount
    getBalance account |> should_equal None,
    account |> should_not_equal None

#time
let ``Account can be updated from multiple threads`` =
    let account =
        mkBankAccount()
        |> openAccount
    let updateAccountAsync =
        async {
            account
            |> updateBalance 1.0m
            |> ignore
        }
    let nb = 10 //  10 is quick (2ms), 20 is so long (9s)
    updateAccountAsync
    |> List.replicate nb
    |> Async.Parallel
    |> Async.RunSynchronously
    |> ignore
    getBalance account |> should_equal (Some (decimal nb))
#time

你的问题是你的代码没有一直使用 Async。

您的帐户 class 具有方法 OpenCloseBalanceTransaction,并且您使用 AsyncReplyChannel 但是 您使用 PostAndReply 发送消息。这意味着:您向 MailboxProcessor 发送一条消息,并提供一个回复通道。但是,此时,该方法等待同步完成。

即使有 Async.Parallel 和多个线程,也可能意味着很多线程锁定了它们自己。如果你改变 使用你所有的方法 PostAndAsyncReply 然后你的问题就消失了。

还有另外两个性能优化可以提高性能,但在您的示例中并不重要。

  1. 调用列表的长度是错误的。要计算列表的长度,您必须遍历整个列表。只有你 在事务中使用它来打印长度,但要考虑事务列表是否变长。你总是必须经历 整个列表,无论何时添加交易。这将是您交易清单的 O(N)。

  2. 呼叫也是如此(List.sum)。每当您调用 Balance 时,您都必须计算当前的 Balance。也是 O(N).

因为您有一个 MailboxProcessor,您还可以计算这两个值,而不是再次完全重新计算这些值,again.Thus,它们变成 O(1) 操作。

最重要的是,我会将 OpenCloseTransaction 消息更改为 return nothing,在我看来,这没有意义他们 return 任何东西。你的例子甚至让我对 bool return 感到困惑 价值观甚至意味着。

Close 消息中 return state.Opened 之前将其设置为 false。为什么?

Open 消息中您 return 否定了 state.Opened。你以后如何使用它看起来不对。

如果 bool 背后有更多含义,请从中创建一个独特的歧视联盟,描述它 return 的目的。

您在整个代码中使用了 option<Acount>,我删除了它,因为我看不出它有任何用途。

无论如何,这是一个完整的示例,说明我将如何编写没有速度问题的代码。


type Message =
    | Open
    | Close
    | Balance     of AsyncReplyChannel<decimal option>
    | Transaction of decimal

type AccountState = {
    Opened:             bool
    Transactions:       decimal list
    TransactionsLength: int
    CurrentBalance:     decimal
}

type Account() =
    let agent = MailboxProcessor<Message>.Start(fun inbox ->
        let rec loop (state: AccountState) = async {
            match! inbox.Receive() with
            | Close ->
                printfn "Closing"
                return! loop { state with Opened = false }
            | Open ->
                printfn "Opening"
                return! loop { state with Opened = true }
            | Transaction tran ->
                let l = state.TransactionsLength + 1
                printfn $"Adding transaction {tran}, nb = {l}"

                if state.Opened then
                    return! loop {
                        state with
                            Transactions       = tran :: state.Transactions
                            TransactionsLength = l
                            CurrentBalance     = state.CurrentBalance + tran
                    }
                else
                    return! loop state
            | Balance channel ->
                if   state.Opened
                then channel.Reply (Some state.CurrentBalance)
                else channel.Reply  None
                return! loop state
        }

        let defaultAccount = {
            Opened             = false
            Transactions       = []
            TransactionsLength = 0
            CurrentBalance     = 0m
        }
        loop defaultAccount
    )

    member _.Open        ()          = agent.Post(Open)
    member _.Close       ()          = agent.Post(Close)
    member _.Balance     ()          = agent.PostAndAsyncReply(Balance)
    member _.Transaction transaction = agent.Post(Transaction transaction)

(* Test *)

let should_equal expected actual =
    if expected = actual then
        Ok expected
    else
        Error (expected, actual)

(* --- API --- *)

let mkBankAccount = Account

(* Opens the Account *)
let openAccount  (account: Account) =
    account.Open ()

(* Closes the Account *)
let closeAccount (account: Account) =
    account.Close ()

(* Updates Account *)
let updateBalance transaction (account: Account) =
    account.Transaction(transaction)

(* Gets the current Balance *)
let getBalance (account: Account) =
    account.Balance ()

#time
let ``Account can be updated from multiple threads`` =
    let account = mkBankAccount ()
    openAccount account

    let updateBalanceAsync = async {
        updateBalance 1.0m account
    }

    let nb = 50

    List.replicate nb updateBalanceAsync
    |> Async.Parallel
    |> Async.RunSynchronously
    |> ignore

    Async.RunSynchronously (async {
        let! balance = getBalance account
        printfn "Balance is %A should be (Some %f)" balance (1.0m * decimal nb)
    })
#time