F# 异步工作流/任务与免费 monad 相结合

F# async workflow / tasks combined with free monad

我正在尝试使用免费的 monad 模式构建用于消息处理的管道,我的代码如下所示:

module PipeMonad =
type PipeInstruction<'msgIn, 'msgOut, 'a> =
    | HandleAsync of 'msgIn * (Async<'msgOut> -> 'a)
    | SendOutAsync of 'msgOut * (Async -> 'a)

let private mapInstruction f = function
    | HandleAsync (x, next) -> HandleAsync (x, next >> f)
    | SendOutAsync (x, next) -> SendOutAsync (x, next >> f)

type PipeProgram<'msgIn, 'msgOut, 'a> =
    | Act of PipeInstruction<'msgIn, 'msgOut, PipeProgram<'msgIn, 'msgOut, 'a>>
    | Stop of 'a

let rec bind f = function
    | Act x -> x |> mapInstruction (bind f) |> Act
    | Stop x -> f x

type PipeBuilder() =
    member __.Bind (x, f) = bind f x
    member __.Return x = Stop x
    member __.Zero () = Stop ()
    member __.ReturnFrom x = x

let pipe = PipeBuilder()
let handleAsync msgIn = Act (HandleAsync (msgIn, Stop))
let sendOutAsync msgOut = Act (SendOutAsync (msgOut, Stop))

这是我根据this article


然而,让这些方法异步对我来说很重要(Task 最好,但 Async 是可以接受的),但是当我为我的 pipeline 创建构建器时,我可以'不知道如何使用它 - 我怎样才能等待 Task<'msgOut>Async<'msgOut> 以便我可以发送它并等待这个 "send" 任务?


let pipeline log msgIn =
    pipe {
        let! msgOut = handleAsync msgIn
        let result = async {
            let! msgOut = msgOut
            log msgOut
            return sendOutAsync msgOut
        }
    }

哪个returnsPipeProgram<'b, 'a, Async<PipeProgram<'c, 'a, Async>>>

首先,我认为在 F# 中使用自由 monad 非常接近于反模式。这是一个非常抽象的结构,不适合惯用的 F# 风格 - 但这是一个偏好问题,如果您(和您的团队)发现这种编写代码的方式可读且易于理解,那么您当然可以朝这个方向。

出于好奇,我花了一些时间研究您的示例 - 虽然我还没有完全弄清楚如何完全修复您的示例,但我希望以下内容可能有助于引导您朝着正确的方向前进。总结是,我认为您需要将 Async 集成到您的 PipeProgram 中,以便管道程序本质上是异步的:

type PipeInstruction<'msgIn, 'msgOut, 'a> =
    | HandleAsync of 'msgIn * (Async<'msgOut> -> 'a)
    | SendOutAsync of 'msgOut * (Async<unit> -> 'a)
    | Continue of 'a 

type PipeProgram<'msgIn, 'msgOut, 'a> =
    | Act of Async<PipeInstruction<'msgIn, 'msgOut, PipeProgram<'msgIn, 'msgOut, 'a>>>
    | Stop of Async<'a>

请注意,我必须添加 Continue 才能对我的函数进行类型检查,但我认为这可能是一个错误的 hack,您可能需要对其进行远程处理。使用这些定义,您可以执行以下操作:

let private mapInstruction f = function
    | HandleAsync (x, next) -> HandleAsync (x, next >> f)
    | SendOutAsync (x, next) -> SendOutAsync (x, next >> f)
    | Continue v -> Continue v

let rec bind (f:'a -> PipeProgram<_, _, _>) = function
    | Act x -> 
        let w = async { 
          let! x = x 
          return mapInstruction (bind f) x }
        Act w
    | Stop x -> 
        let w = async {
          let! x = x
          let pg = f x
          return Continue pg
        Act w

type PipeBuilder() =
    member __.Bind (x, f) = bind f x
    member __.Return x = Stop x
    member __.Zero () = Stop (async.Return())
    member __.ReturnFrom x = x

let pipe = PipeBuilder()
let handleAsync msgIn = Act (async.Return(HandleAsync (msgIn, Stop)))
let sendOutAsync msgOut = Act (async.Return(SendOutAsync (msgOut, Stop)))

let pipeline log msgIn =
    pipe {
        let! msgOut = handleAsync msgIn
        log msgOut
        return! sendOutAsync msgOut

pipeline ignore 0 

现在这只是简单的PipeProgram<int, unit, unit>,您应该能够通过对命令起作用的递归异步函数来评估它。

根据我的理解,免费 monad 的全部意义在于您不会公开像 Async 这样的效果,所以我认为它们不应该用于 PipeInstruction 类型。解释器是添加效果的地方。

此外,Free Monad 只在 Haskell 中才有意义,您需要做的就是定义一个仿函数,然后自动获得其余的实现。在 F# 中,您还必须编写其余代码,因此与更传统的解释器模式相比,使用 Free 并没有太多好处。 您链接到的 TurtleProgram 代码只是一个实验——我根本不建议将 Free 用于实际代码。




/// The abstract instruction set
module PipeProgram =

    type PipeInstruction<'msgIn, 'msgOut,'state> =
        | Handle of 'msgIn * ('msgOut -> PipeInstruction<'msgIn, 'msgOut,'state>)
        | SendOut of 'msgOut * (unit -> PipeInstruction<'msgIn, 'msgOut,'state>)
        | Stop of 'state


/// A computation expression for a PipeProgram
module PipeProgramCE =
    open PipeProgram

    let rec bind f instruction =
        match instruction with
        | Handle (x,next) ->  Handle (x, (next >> bind f))
        | SendOut (x, next) -> SendOut (x, (next >> bind f))
        | Stop x -> f x

    type PipeBuilder() =
        member __.Bind (x, f) = bind f x
        member __.Return x = Stop x
        member __.Zero () = Stop ()
        member __.ReturnFrom x = x

let pipe = PipeProgramCE.PipeBuilder()


// helper functions for CE
let stop x = PipeProgram.Stop x
let handle x = PipeProgram.Handle (x,stop)
let sendOut x  = PipeProgram.SendOut (x, stop)

let exampleProgram : PipeProgram.PipeInstruction<string,string,string> = pipe {
    let! msgOut1 = handle "In1"
    do! sendOut msgOut1
    let! msgOut2 = handle "In2"
    do! sendOut msgOut2
    }


这是一个非异步版本的解释器("Id monad"):

module PipeInterpreterSync =
    open PipeProgram

    let handle msgIn =
        printfn "In: %A"  msgIn
        let msgOut = System.Console.ReadLine()

    let sendOut msgOut =
        printfn "Out: %A"  msgOut

    let rec interpret instruction =
        match instruction with
        | Handle (x, next) ->
            let result = handle x
            result |> next |> interpret
        | SendOut (x, next) ->
            let result = sendOut x
            result |> next |> interpret
        x


module PipeInterpreterAsync =
    open PipeProgram

    /// Implementation of "handle" uses async/IO
    let handleAsync msgIn = async {
        printfn "In: %A"  msgIn
        let msgOut = System.Console.ReadLine()
        }

    /// Implementation of "sendOut" uses async/IO
    let sendOutAsync msgOut = async {
        printfn "Out: %A"  msgOut
        }

    let rec interpret instruction =
        match instruction with
        | Handle (x, next) -> async {
            let! result = handleAsync x
            return! result |> next |> interpret
        | SendOut (x, next) -> async {
            do! sendOutAsync x
            return! () |> next |> interpret
        async.Return(x)