通过 Rx 从 MailboxProcessor 返回结果是个好主意吗?
Is returning results from MailboxProcessor via Rx a good idea?
我有点好奇下面的代码示例以及人们的想法。
这个想法是从 NetworkStream (~20 msg/s) 中读取,而不是在 main 中工作,将事情传递给 MainboxProcessor 来处理并在完成后取回绑定。
通常的方法是使用PostAndReply,但我想绑定到C#中的ListView或其他控件。无论如何都必须对 LastN 项目和过滤施展魔法。
另外,Rx 有一些错误处理。
下面的示例观察来自 2..10 和 returns "hello X" 的数字。在 8 它像 EOF 一样停止。使其成为 ToEnumerable,因为其他线程在此之前完成,但它也适用于 Subscribe。
困扰我的事情:
- 递归传递 Subject(obj)。我看不到其中 3-4 个有任何问题。好主意?
- 主题的生命周期。
open System
open System.Threading
open System.Reactive.Subjects
open System.Reactive.Linq // NuGet, take System.Reactive.Core also.
open System.Reactive.Concurrency
type SerializedLogger() =
let _letters = new Subject<string>()
// create the mailbox processor
let agent = MailboxProcessor.Start(fun inbox ->
// the message processing function
let rec messageLoop (letters:Subject<string>) = async{
// read a message
let! msg = inbox.Receive()
printfn "mailbox: %d in Thread: %d" msg Thread.CurrentThread.ManagedThreadId
do! Async.Sleep 100
// write it to the log
match msg with
| 8 -> letters.OnCompleted() // like EOF.
| x -> letters.OnNext(sprintf "hello %d" x)
// loop to top
return! messageLoop letters
}
// start the loop
messageLoop _letters
)
// public interface
member this.Log msg = agent.Post msg
member this.Getletters() = _letters.AsObservable()
/// Print line with prefix 1.
let myPrint1 x = printfn "onNext - %s, Thread: %d" x Thread.CurrentThread.ManagedThreadId
// Actions
let onNext = new Action<string>(myPrint1)
let onCompleted = new Action(fun _ -> printfn "Complete")
[<EntryPoint>]
let main argv =
async{
printfn "Main is on: %d" Thread.CurrentThread.ManagedThreadId
// test
let logger = SerializedLogger()
logger.Log 1 // ignored?
let xObs = logger
.Getletters() //.Where( fun x -> x <> "hello 5")
.SubscribeOn(Scheduler.CurrentThread)
.ObserveOn(Scheduler.CurrentThread)
.ToEnumerable() // this
//.Subscribe(onNext, onCompleted) // or with Dispose()
[2..10] |> Seq.iter (logger.Log)
xObs |> Seq.iter myPrint1
while true
do
printfn "waiting"
System.Threading.Thread.Sleep(1000)
return 0
} |> Async.RunSynchronously // return an integer exit code
我做过类似的事情,但使用的是普通 F# Event
类型而不是 Subject
。它基本上可以让您创建 IObservable
并触发其订阅 - 就像您使用更复杂的 Subject
一样。基于事件的版本将是:
type SerializedLogger() =
let letterProduced = new Event<string>()
let lettersEnded = new Event<unit>()
let agent = MailboxProcessor.Start(fun inbox ->
let rec messageLoop (letters:Subject<string>) = async {
// Some code omitted
match msg with
| 8 -> lettersEnded.Trigger()
| x -> letterProduced.Trigger(sprintf "hello %d" x)
// ...
member this.Log msg = agent.Post msg
member this.LetterProduced = letterProduced.Publish
member this.LettersEnded = lettersEnded.Publish
重要的区别是:
Event
无法触发OnCompleted
,所以我改为公开两个单独的事件。这是很不幸的!鉴于 Subject
在所有其他方面与事件非常相似,这可能是使用主题而不是普通事件的一个很好的理由。
使用 Event
的好处是它是标准的 F# 类型,因此您不需要代理中的任何外部依赖项。
我注意到您的评论指出对 Log
的第一次调用被忽略了。那是因为您仅在该调用发生后才订阅事件处理程序。我想你可以在这里使用 ReplaySubject variation on the Subject idea - 当你订阅它时它会重播所有事件,所以之前发生的事件不会丢失(但缓存会产生成本)。
总而言之,我认为使用 Subject
可能是个好主意——它与使用 Event
本质上是相同的模式(我认为这是从代理公开通知的非常标准的方式),但它可以让你触发 OnCompleted
。我可能不会使用 ReplaySubject
,因为缓存成本 - 你只需要确保在触发任何事件之前订阅。
我有点好奇下面的代码示例以及人们的想法。 这个想法是从 NetworkStream (~20 msg/s) 中读取,而不是在 main 中工作,将事情传递给 MainboxProcessor 来处理并在完成后取回绑定。
通常的方法是使用PostAndReply,但我想绑定到C#中的ListView或其他控件。无论如何都必须对 LastN 项目和过滤施展魔法。 另外,Rx 有一些错误处理。
下面的示例观察来自 2..10 和 returns "hello X" 的数字。在 8 它像 EOF 一样停止。使其成为 ToEnumerable,因为其他线程在此之前完成,但它也适用于 Subscribe。
困扰我的事情:
- 递归传递 Subject(obj)。我看不到其中 3-4 个有任何问题。好主意?
- 主题的生命周期。
open System
open System.Threading
open System.Reactive.Subjects
open System.Reactive.Linq // NuGet, take System.Reactive.Core also.
open System.Reactive.Concurrency
type SerializedLogger() =
let _letters = new Subject<string>()
// create the mailbox processor
let agent = MailboxProcessor.Start(fun inbox ->
// the message processing function
let rec messageLoop (letters:Subject<string>) = async{
// read a message
let! msg = inbox.Receive()
printfn "mailbox: %d in Thread: %d" msg Thread.CurrentThread.ManagedThreadId
do! Async.Sleep 100
// write it to the log
match msg with
| 8 -> letters.OnCompleted() // like EOF.
| x -> letters.OnNext(sprintf "hello %d" x)
// loop to top
return! messageLoop letters
}
// start the loop
messageLoop _letters
)
// public interface
member this.Log msg = agent.Post msg
member this.Getletters() = _letters.AsObservable()
/// Print line with prefix 1.
let myPrint1 x = printfn "onNext - %s, Thread: %d" x Thread.CurrentThread.ManagedThreadId
// Actions
let onNext = new Action<string>(myPrint1)
let onCompleted = new Action(fun _ -> printfn "Complete")
[<EntryPoint>]
let main argv =
async{
printfn "Main is on: %d" Thread.CurrentThread.ManagedThreadId
// test
let logger = SerializedLogger()
logger.Log 1 // ignored?
let xObs = logger
.Getletters() //.Where( fun x -> x <> "hello 5")
.SubscribeOn(Scheduler.CurrentThread)
.ObserveOn(Scheduler.CurrentThread)
.ToEnumerable() // this
//.Subscribe(onNext, onCompleted) // or with Dispose()
[2..10] |> Seq.iter (logger.Log)
xObs |> Seq.iter myPrint1
while true
do
printfn "waiting"
System.Threading.Thread.Sleep(1000)
return 0
} |> Async.RunSynchronously // return an integer exit code
我做过类似的事情,但使用的是普通 F# Event
类型而不是 Subject
。它基本上可以让您创建 IObservable
并触发其订阅 - 就像您使用更复杂的 Subject
一样。基于事件的版本将是:
type SerializedLogger() =
let letterProduced = new Event<string>()
let lettersEnded = new Event<unit>()
let agent = MailboxProcessor.Start(fun inbox ->
let rec messageLoop (letters:Subject<string>) = async {
// Some code omitted
match msg with
| 8 -> lettersEnded.Trigger()
| x -> letterProduced.Trigger(sprintf "hello %d" x)
// ...
member this.Log msg = agent.Post msg
member this.LetterProduced = letterProduced.Publish
member this.LettersEnded = lettersEnded.Publish
重要的区别是:
Event
无法触发OnCompleted
,所以我改为公开两个单独的事件。这是很不幸的!鉴于Subject
在所有其他方面与事件非常相似,这可能是使用主题而不是普通事件的一个很好的理由。使用
Event
的好处是它是标准的 F# 类型,因此您不需要代理中的任何外部依赖项。我注意到您的评论指出对
Log
的第一次调用被忽略了。那是因为您仅在该调用发生后才订阅事件处理程序。我想你可以在这里使用 ReplaySubject variation on the Subject idea - 当你订阅它时它会重播所有事件,所以之前发生的事件不会丢失(但缓存会产生成本)。
总而言之,我认为使用 Subject
可能是个好主意——它与使用 Event
本质上是相同的模式(我认为这是从代理公开通知的非常标准的方式),但它可以让你触发 OnCompleted
。我可能不会使用 ReplaySubject
,因为缓存成本 - 你只需要确保在触发任何事件之前订阅。