使用 F# Suave 流式传输字符串数据
Streaming string data with F# Suave
在支持 TransferEncoding.chunked
和 HttpOutput.writeChunk
的 Suave 2.4.0 中,我编写了以下代码以通过 HTTP 流式传输数据。
let sendStrings getStringsFromProducer : WebPart =
Writers.setStatus HTTP_200 >=>
TransferEncoding.chunked (fun conn -> socket {
let refConn = ref conn
for str in getStringsFromProducer do
let! (_, conn) = (str |> stringToBytes |> HttpOutput.writeChunk) !refConn
refConn := conn
return! HttpOutput.writeChunk [||] !refConn
}
)
虽然这行得通,但我质疑使用 ref
的可靠性,并希望有更好的方法以更实用的方式做同样的事情。有更好的方法吗?假设我无法更改 getStringsFromProducer
?
我认为在这种情况下您无法避免所有变异 - 一个接一个地编写块是一项相当必要的操作,并且迭代惰性序列也需要(可变)迭代器,因此无法避免所有变异。我认为您的 sendStrings
函数在向消费者隐藏突变方面做得很好,并提供了一个很好的功能 API.
你可以避免使用 ref
单元格并用局部可变变量替换它们,这样更安全一些 - 因为可变变量无法逃脱局部范围:
TransferEncoding.chunked (fun conn -> socket {
let mutable conn = conn
for str in getStringsFromProducer do
let! _, newConn = HttpOutput.writeChunk (stringToBytes str) conn
conn <- newConn
return! HttpOutput.writeChunk [||] conn
}
您可以通过使用递归来避免可变 conn
变量,但这需要您使用 IEnumerator<'T>
而不是使用漂亮的 for
循环来遍历序列,所以我认为这实际上不如使用可变变量的版本好:
TransferEncoding.chunked (fun conn -> socket {
let en = getStringsFromProducer.GetEnumerator()
let rec loop conn = socket {
if en.MoveNext() then
let! _, conn = HttpOutput.writeChunk (stringToBytes en.Current) conn
return! loop conn }
do! loop conn
return! HttpOutput.writeChunk [||] conn })
我一直在寻找一种在 F# 中以通用方式替换 refs/mutables 的方法,虽然我想出了一个解决方案,但它对您的情况来说可能有点矫枉过正。看起来 ref 是一个仅在单个线程内更新的本地文件,因此它可能相当安全。但是,如果你想替换它,我是这样解决问题的:
type private StateMessage<'a> =
| Get of AsyncReplyChannel<'a>
| GetOrSet of 'a * AsyncReplyChannel<'a>
| GetOrSetResult of (unit -> 'a) * AsyncReplyChannel<'a>
| Set of 'a
| Update of ('a -> 'a) * AsyncReplyChannel<'a>
type Stateful<'a>(?initialValue: 'a) =
let agent = MailboxProcessor<StateMessage<'a>>.Start
<| fun inbox ->
let rec loop state =
async {
let! message = inbox.Receive()
match message with
| Get channel ->
match state with
| Some value -> channel.Reply(value)
| None -> channel.Reply(Unchecked.defaultof<'a>)
return! loop state
| GetOrSet (newValue, channel) ->
match state with
| Some value ->
channel.Reply(value)
return! loop state
| None ->
channel.Reply(newValue)
return! loop (Some newValue)
| GetOrSetResult (getValue, channel) ->
match state with
| Some value ->
channel.Reply(value)
return! loop state
| None ->
let newValue = getValue ()
channel.Reply(newValue)
return! loop (Some newValue)
| Set value ->
return! loop (Some value)
| Update (update, channel) ->
let currentValue =
match state with
| Some value -> value
| None -> Unchecked.defaultof<'a>
let newValue = update currentValue
channel.Reply(newValue)
return! loop (Some newValue)
}
loop initialValue
let get () = agent.PostAndReply Get
let asyncGet () = agent.PostAndAsyncReply Get
let getOrSet value = agent.PostAndReply <| fun reply -> GetOrSet (value, reply)
let asyncGetOrSet value = agent.PostAndAsyncReply <| fun reply -> GetOrSet (value, reply)
let getOrSetResult getValue = agent.PostAndReply <| fun reply -> GetOrSetResult (getValue, reply)
let asyncGetOrSetResult getValue = agent.PostAndAsyncReply <| fun reply -> GetOrSetResult (getValue, reply)
let set value = agent.Post <| Set value
let update f = agent.PostAndReply <| fun reply -> Update (f, reply)
let asyncUpdate f = agent.PostAndAsyncReply <| fun reply -> Update (f, reply)
member __.Get () = get ()
member __.AsyncGet () = asyncGet ()
member __.GetOrSet value = getOrSet value
member __.AsyncGetOrSet value = asyncGetOrSet value
member __.GetOrSetResult getValue = getOrSetResult getValue
member __.AsyncGetOrSetResult getValue = asyncGetOrSetResult getValue
member __.Set value = set value
member __.Update f = update f
member __.AsyncUpdate f = asyncUpdate f
这基本上使用 MailboxProcessor
将更新序列化为由尾递归函数管理的状态,类似于 Tomas 的第二个示例。但是,这允许您以更像传统可变状态的方式调用 Get/Set/Update,即使它实际上并没有进行突变。你可以这样使用它:
let state = Stateful(0)
state.Get() |> printfn "%d"
state.Set(1)
state.Get() |> printfn "%d"
state.Update(fun x -> x + 1) |> printfn "%d"
这将打印:
0
1
2
在支持 TransferEncoding.chunked
和 HttpOutput.writeChunk
的 Suave 2.4.0 中,我编写了以下代码以通过 HTTP 流式传输数据。
let sendStrings getStringsFromProducer : WebPart =
Writers.setStatus HTTP_200 >=>
TransferEncoding.chunked (fun conn -> socket {
let refConn = ref conn
for str in getStringsFromProducer do
let! (_, conn) = (str |> stringToBytes |> HttpOutput.writeChunk) !refConn
refConn := conn
return! HttpOutput.writeChunk [||] !refConn
}
)
虽然这行得通,但我质疑使用 ref
的可靠性,并希望有更好的方法以更实用的方式做同样的事情。有更好的方法吗?假设我无法更改 getStringsFromProducer
?
我认为在这种情况下您无法避免所有变异 - 一个接一个地编写块是一项相当必要的操作,并且迭代惰性序列也需要(可变)迭代器,因此无法避免所有变异。我认为您的 sendStrings
函数在向消费者隐藏突变方面做得很好,并提供了一个很好的功能 API.
你可以避免使用 ref
单元格并用局部可变变量替换它们,这样更安全一些 - 因为可变变量无法逃脱局部范围:
TransferEncoding.chunked (fun conn -> socket {
let mutable conn = conn
for str in getStringsFromProducer do
let! _, newConn = HttpOutput.writeChunk (stringToBytes str) conn
conn <- newConn
return! HttpOutput.writeChunk [||] conn
}
您可以通过使用递归来避免可变 conn
变量,但这需要您使用 IEnumerator<'T>
而不是使用漂亮的 for
循环来遍历序列,所以我认为这实际上不如使用可变变量的版本好:
TransferEncoding.chunked (fun conn -> socket {
let en = getStringsFromProducer.GetEnumerator()
let rec loop conn = socket {
if en.MoveNext() then
let! _, conn = HttpOutput.writeChunk (stringToBytes en.Current) conn
return! loop conn }
do! loop conn
return! HttpOutput.writeChunk [||] conn })
我一直在寻找一种在 F# 中以通用方式替换 refs/mutables 的方法,虽然我想出了一个解决方案,但它对您的情况来说可能有点矫枉过正。看起来 ref 是一个仅在单个线程内更新的本地文件,因此它可能相当安全。但是,如果你想替换它,我是这样解决问题的:
type private StateMessage<'a> =
| Get of AsyncReplyChannel<'a>
| GetOrSet of 'a * AsyncReplyChannel<'a>
| GetOrSetResult of (unit -> 'a) * AsyncReplyChannel<'a>
| Set of 'a
| Update of ('a -> 'a) * AsyncReplyChannel<'a>
type Stateful<'a>(?initialValue: 'a) =
let agent = MailboxProcessor<StateMessage<'a>>.Start
<| fun inbox ->
let rec loop state =
async {
let! message = inbox.Receive()
match message with
| Get channel ->
match state with
| Some value -> channel.Reply(value)
| None -> channel.Reply(Unchecked.defaultof<'a>)
return! loop state
| GetOrSet (newValue, channel) ->
match state with
| Some value ->
channel.Reply(value)
return! loop state
| None ->
channel.Reply(newValue)
return! loop (Some newValue)
| GetOrSetResult (getValue, channel) ->
match state with
| Some value ->
channel.Reply(value)
return! loop state
| None ->
let newValue = getValue ()
channel.Reply(newValue)
return! loop (Some newValue)
| Set value ->
return! loop (Some value)
| Update (update, channel) ->
let currentValue =
match state with
| Some value -> value
| None -> Unchecked.defaultof<'a>
let newValue = update currentValue
channel.Reply(newValue)
return! loop (Some newValue)
}
loop initialValue
let get () = agent.PostAndReply Get
let asyncGet () = agent.PostAndAsyncReply Get
let getOrSet value = agent.PostAndReply <| fun reply -> GetOrSet (value, reply)
let asyncGetOrSet value = agent.PostAndAsyncReply <| fun reply -> GetOrSet (value, reply)
let getOrSetResult getValue = agent.PostAndReply <| fun reply -> GetOrSetResult (getValue, reply)
let asyncGetOrSetResult getValue = agent.PostAndAsyncReply <| fun reply -> GetOrSetResult (getValue, reply)
let set value = agent.Post <| Set value
let update f = agent.PostAndReply <| fun reply -> Update (f, reply)
let asyncUpdate f = agent.PostAndAsyncReply <| fun reply -> Update (f, reply)
member __.Get () = get ()
member __.AsyncGet () = asyncGet ()
member __.GetOrSet value = getOrSet value
member __.AsyncGetOrSet value = asyncGetOrSet value
member __.GetOrSetResult getValue = getOrSetResult getValue
member __.AsyncGetOrSetResult getValue = asyncGetOrSetResult getValue
member __.Set value = set value
member __.Update f = update f
member __.AsyncUpdate f = asyncUpdate f
这基本上使用 MailboxProcessor
将更新序列化为由尾递归函数管理的状态,类似于 Tomas 的第二个示例。但是,这允许您以更像传统可变状态的方式调用 Get/Set/Update,即使它实际上并没有进行突变。你可以这样使用它:
let state = Stateful(0)
state.Get() |> printfn "%d"
state.Set(1)
state.Get() |> printfn "%d"
state.Update(fun x -> x + 1) |> printfn "%d"
这将打印:
0
1
2