使用 F# Suave 流式传输字符串数据

Streaming string data with F# Suave

在支持 TransferEncoding.chunkedHttpOutput.writeChunk 的 Suave 2.4.0 中,我编写了以下代码以通过 HTTP 流式传输数据。

let sendStrings getStringsFromProducer : WebPart =
    Writers.setStatus HTTP_200 >=>
    TransferEncoding.chunked (fun conn -> socket {
        let refConn = ref conn

        for str in getStringsFromProducer do
            let! (_, conn) = (str |> stringToBytes |> HttpOutput.writeChunk) !refConn
            refConn := conn

        return! HttpOutput.writeChunk [||] !refConn
    }
)

虽然这行得通,但我质疑使用 ref 的可靠性,并希望有更好的方法以更实用的方式做同样的事情。有更好的方法吗?假设我无法更改 getStringsFromProducer?

我认为在这种情况下您无法避免所有变异 - 一个接一个地编写块是一项相当必要的操作,并且迭代惰性序列也需要(可变)迭代器,因此无法避免所有变异。我认为您的 sendStrings 函数在向消费者隐藏突变方面做得很好,并提供了一个很好的功能 API.

你可以避免使用 ref 单元格并用局部可变变量替换它们,这样更安全一些 - 因为可变变量无法逃脱局部范围:

TransferEncoding.chunked (fun conn -> socket {
    let mutable conn = conn
    for str in getStringsFromProducer do
        let! _, newConn = HttpOutput.writeChunk (stringToBytes str) conn
        conn <- newConn
    return! HttpOutput.writeChunk [||] conn
}

您可以通过使用递归来避免可变 conn 变量,但这需要您使用 IEnumerator<'T> 而不是使用漂亮的 for 循环来遍历序列,所以我认为这实际上不如使用可变变量的版本好:

TransferEncoding.chunked (fun conn -> socket {
    let en = getStringsFromProducer.GetEnumerator()
    let rec loop conn = socket {
      if en.MoveNext() then 
        let! _, conn = HttpOutput.writeChunk (stringToBytes en.Current) conn
        return! loop conn }
    do! loop conn
    return! HttpOutput.writeChunk [||] conn }) 

我一直在寻找一种在 F# 中以通用方式替换 refs/mutables 的方法,虽然我想出了一个解决方案,但它对您的情况来说可能有点矫枉过正。看起来 ref 是一个仅在单个线程内更新的本地文件,因此它可能相当安全。但是,如果你想替换它,我是这样解决问题的:

type private StateMessage<'a> =
| Get of AsyncReplyChannel<'a>
| GetOrSet of 'a * AsyncReplyChannel<'a>
| GetOrSetResult of (unit -> 'a) * AsyncReplyChannel<'a>
| Set of 'a
| Update of ('a -> 'a) * AsyncReplyChannel<'a>

type Stateful<'a>(?initialValue: 'a) =
    let agent = MailboxProcessor<StateMessage<'a>>.Start
                <| fun inbox ->
                    let rec loop state =
                        async {
                            let! message = inbox.Receive()
                            match message with
                            | Get channel -> 
                                match state with
                                | Some value -> channel.Reply(value)
                                | None -> channel.Reply(Unchecked.defaultof<'a>)
                                return! loop state
                            | GetOrSet (newValue, channel) ->
                                match state with
                                | Some value ->
                                    channel.Reply(value)
                                    return! loop state
                                | None ->
                                    channel.Reply(newValue)
                                    return! loop (Some newValue)
                            | GetOrSetResult (getValue, channel) ->
                                match state with
                                | Some value ->
                                    channel.Reply(value)
                                    return! loop state
                                | None ->
                                    let newValue = getValue ()
                                    channel.Reply(newValue)
                                    return! loop (Some newValue)
                            | Set value -> 
                                return! loop (Some value)
                            | Update (update, channel) ->
                                let currentValue =
                                    match state with
                                    | Some value -> value
                                    | None -> Unchecked.defaultof<'a>
                                let newValue = update currentValue
                                channel.Reply(newValue)
                                return! loop (Some newValue)
                        }
                    loop initialValue

    let get () = agent.PostAndReply Get
    let asyncGet () = agent.PostAndAsyncReply Get
    let getOrSet value = agent.PostAndReply <| fun reply -> GetOrSet (value, reply)
    let asyncGetOrSet value = agent.PostAndAsyncReply <| fun reply -> GetOrSet (value, reply)
    let getOrSetResult getValue = agent.PostAndReply <| fun reply -> GetOrSetResult (getValue, reply)
    let asyncGetOrSetResult getValue = agent.PostAndAsyncReply <| fun reply -> GetOrSetResult (getValue, reply)
    let set value = agent.Post <| Set value
    let update f = agent.PostAndReply <| fun reply -> Update (f, reply)
    let asyncUpdate f = agent.PostAndAsyncReply <| fun reply -> Update (f, reply)

    member __.Get () = get ()
    member __.AsyncGet () = asyncGet ()
    member __.GetOrSet value = getOrSet value
    member __.AsyncGetOrSet value = asyncGetOrSet value
    member __.GetOrSetResult getValue = getOrSetResult getValue
    member __.AsyncGetOrSetResult getValue = asyncGetOrSetResult getValue
    member __.Set value = set value
    member __.Update f = update f
    member __.AsyncUpdate f = asyncUpdate f

这基本上使用 MailboxProcessor 将更新序列化为由尾递归函数管理的状态,类似于 Tomas 的第二个示例。但是,这允许您以更像传统可变状态的方式调用 Get/Set/Update,即使它实际上并没有进行突变。你可以这样使用它:

let state = Stateful(0)
state.Get() |> printfn "%d"
state.Set(1)
state.Get() |> printfn "%d"
state.Update(fun x -> x + 1) |> printfn "%d"

这将打印:

0
1 
2