F# 中的流式服务器发送事件 (SSE)
Streaming Server-Sent Events (SSE) in F#
使用 System.Net.Http 库将“服务器发送事件 (SSE) 样式”事件流式传输到 F# 前端的轻量级方法是什么?我了解事件流格式(例如 this PHP example code),但我正在寻求一些指导以在服务器端 F# 应用程序中实现流式传输部分(我在 .Net Framework 4.8 上)。
您可以使用 Suave。下面的示例每秒发送一条消息(使用 SSE)我没有在 .net 47 中尝试过(我在 Mac 中的 .net 5 中尝试过)但它应该可以工作。
open Suave
open Suave.Sockets
open Suave.Sockets.Control
open Suave.EventSource
open Suave.Operators
open Suave.Filters
let write i out =
socket {
let msg = { id = string i; data = string i; ``type`` = None }
do! msg |> send out
return! SocketOp.ofAsync (Async.Sleep 1000)
}
let app =
choose [
GET >=> request (fun _ ->
handShake (fun out ->
socket {
let actions =
Seq.initInfinite (fun n -> n + 1)
|> Seq.map (fun i -> write i out)
for a in actions do
do! a
return out
}))
[<EntryPoint>]
let main _ =
startWebServer defaultConfig app
0
以下简约的基本代码有效(OS:Windows10,浏览器:Google Chrome 92.0.4515,.Net Framework 4.8):
F#客户端代码:
module SSE0 =
open System
open System.IO
open System.Net
let pipeUTF8 (data: string) (sink: Stream) : Async<unit> = async {
let bytes = System.Text.Encoding.UTF8.GetBytes data
use src = new MemoryStream(bytes)
do! src.CopyToAsync(sink) |> Async.AwaitTask }
let private (=>=) data sink = pipeUTF8 data sink
type Msg = { id: string; event: string; data: string } with
member this.send (sink: Stream) : Async<unit> = async {
do! (sprintf "id:%s\n" this.id) =>= sink
do! (sprintf "event:%s\n" this.event) =>= sink
do! (sprintf "data:%s\n\n" this.data) =>= sink // Only works for single-line data payloads (won't work if eol included)
do! " \n" =>= sink
do! Async.Sleep 1000 // only for this basic example
Console.WriteLine(sprintf "id: %s, event: %s, data: %s" this.id this.event this.data)
do! sink.FlushAsync() |> Async.AwaitTask}
let sse_count (ctx : HttpListenerContext) : Async<unit> =
let output = ctx.Response.OutputStream
let message (i: int) : Msg = { id = sprintf "id#%02d" i; event = "message"; data = sprintf "data#%02d" i }
let msgs = seq { for i in 0 .. 59 -> let msg = message i in async { do! msg.send output } }
msgs |> Async.Sequential |> Async.Ignore
let startServer (url: string) (handler: HttpListenerContext -> Async<unit>) (cts: Threading.CancellationTokenSource) : Threading.CancellationTokenSource =
let task = async {
use listener = new HttpListener()
listener.Prefixes.Add(url)
listener.Start()
while true do
let! context = listener.GetContextAsync() |> Async.AwaitTask
let resp = context.Response
[ ("Content-Type", "text/event-stream; charset=utf-8")
; ("Cache-Control", "no-cache")
; ("Access-Control-Allow-Origin", "*") ] // or Access-Control-Allow-Origin: http://localhost:3000
|> List.iter(fun (k, v) -> resp.AddHeader(k, v))
Async.Start (handler context, cts.Token)
}
Async.Start (task, cts.Token)
cts
[<EntryPoint>]
let main argv =
let cts' = defaultArg None <| new Threading.CancellationTokenSource()
Console.WriteLine("Press return to start.")
Console.ReadLine() |> ignore
Console.WriteLine("Running...")
let cts = startServer "http://localhost:8080/events/" sse_count cts'
Console.WriteLine("Press return to exit.")
Console.ReadLine() |> ignore
cts.Cancel()
0
html 文件:
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>SSE test</title>
</head>
<body>
<button id="btn">Close the connection</button>
<ul id="msglist"></ul>
<script>
var es = new EventSource("http://localhost:8080/events/");
es.onopen = function() {
console.log("Connection to server opened.");
};
var msgList = document.getElementById("msglist");
es.onmessage = function(e) {
console.log("type: " + e.type + ", id: " + e.lastEventId + ", data: " + e.data);
var newElement = document.createElement("li");
newElement.textContent = "type: " + e.type + ", id: " + e.lastEventId + ", data: " + e.data;
msgList.appendChild(newElement);
};
var btn = document.getElementById("btn");
btn.onclick = function() {
console.log("Connection closed");
es.close();
}
es.onerror = function(e) {
console.log("Error found.");
};
</script>
</body>
以下资源对完成这项工作很有用:
https://github.com/mdn/dom-examples/tree/master/server-sent-events
使用 System.Net.Http 库将“服务器发送事件 (SSE) 样式”事件流式传输到 F# 前端的轻量级方法是什么?我了解事件流格式(例如 this PHP example code),但我正在寻求一些指导以在服务器端 F# 应用程序中实现流式传输部分(我在 .Net Framework 4.8 上)。
您可以使用 Suave。下面的示例每秒发送一条消息(使用 SSE)我没有在 .net 47 中尝试过(我在 Mac 中的 .net 5 中尝试过)但它应该可以工作。
open Suave
open Suave.Sockets
open Suave.Sockets.Control
open Suave.EventSource
open Suave.Operators
open Suave.Filters
let write i out =
socket {
let msg = { id = string i; data = string i; ``type`` = None }
do! msg |> send out
return! SocketOp.ofAsync (Async.Sleep 1000)
}
let app =
choose [
GET >=> request (fun _ ->
handShake (fun out ->
socket {
let actions =
Seq.initInfinite (fun n -> n + 1)
|> Seq.map (fun i -> write i out)
for a in actions do
do! a
return out
}))
[<EntryPoint>]
let main _ =
startWebServer defaultConfig app
0
以下简约的基本代码有效(OS:Windows10,浏览器:Google Chrome 92.0.4515,.Net Framework 4.8):
F#客户端代码:
module SSE0 =
open System
open System.IO
open System.Net
let pipeUTF8 (data: string) (sink: Stream) : Async<unit> = async {
let bytes = System.Text.Encoding.UTF8.GetBytes data
use src = new MemoryStream(bytes)
do! src.CopyToAsync(sink) |> Async.AwaitTask }
let private (=>=) data sink = pipeUTF8 data sink
type Msg = { id: string; event: string; data: string } with
member this.send (sink: Stream) : Async<unit> = async {
do! (sprintf "id:%s\n" this.id) =>= sink
do! (sprintf "event:%s\n" this.event) =>= sink
do! (sprintf "data:%s\n\n" this.data) =>= sink // Only works for single-line data payloads (won't work if eol included)
do! " \n" =>= sink
do! Async.Sleep 1000 // only for this basic example
Console.WriteLine(sprintf "id: %s, event: %s, data: %s" this.id this.event this.data)
do! sink.FlushAsync() |> Async.AwaitTask}
let sse_count (ctx : HttpListenerContext) : Async<unit> =
let output = ctx.Response.OutputStream
let message (i: int) : Msg = { id = sprintf "id#%02d" i; event = "message"; data = sprintf "data#%02d" i }
let msgs = seq { for i in 0 .. 59 -> let msg = message i in async { do! msg.send output } }
msgs |> Async.Sequential |> Async.Ignore
let startServer (url: string) (handler: HttpListenerContext -> Async<unit>) (cts: Threading.CancellationTokenSource) : Threading.CancellationTokenSource =
let task = async {
use listener = new HttpListener()
listener.Prefixes.Add(url)
listener.Start()
while true do
let! context = listener.GetContextAsync() |> Async.AwaitTask
let resp = context.Response
[ ("Content-Type", "text/event-stream; charset=utf-8")
; ("Cache-Control", "no-cache")
; ("Access-Control-Allow-Origin", "*") ] // or Access-Control-Allow-Origin: http://localhost:3000
|> List.iter(fun (k, v) -> resp.AddHeader(k, v))
Async.Start (handler context, cts.Token)
}
Async.Start (task, cts.Token)
cts
[<EntryPoint>]
let main argv =
let cts' = defaultArg None <| new Threading.CancellationTokenSource()
Console.WriteLine("Press return to start.")
Console.ReadLine() |> ignore
Console.WriteLine("Running...")
let cts = startServer "http://localhost:8080/events/" sse_count cts'
Console.WriteLine("Press return to exit.")
Console.ReadLine() |> ignore
cts.Cancel()
0
html 文件:
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>SSE test</title>
</head>
<body>
<button id="btn">Close the connection</button>
<ul id="msglist"></ul>
<script>
var es = new EventSource("http://localhost:8080/events/");
es.onopen = function() {
console.log("Connection to server opened.");
};
var msgList = document.getElementById("msglist");
es.onmessage = function(e) {
console.log("type: " + e.type + ", id: " + e.lastEventId + ", data: " + e.data);
var newElement = document.createElement("li");
newElement.textContent = "type: " + e.type + ", id: " + e.lastEventId + ", data: " + e.data;
msgList.appendChild(newElement);
};
var btn = document.getElementById("btn");
btn.onclick = function() {
console.log("Connection closed");
es.close();
}
es.onerror = function(e) {
console.log("Error found.");
};
</script>
</body>
以下资源对完成这项工作很有用:
https://github.com/mdn/dom-examples/tree/master/server-sent-events