Post 从异步线程到 F# 主线程的消息
Post messages from async threads to main thread in F#
订阅了一个发送日志消息的可观察对象。一些日志消息来自其他线程,因为它们位于 F# 异步块中。我需要能够从主线程写出消息。
这里是 the code 当前过滤掉许多日志消息,因为它们不在主线程上:
member x.RegisterTrace() =
Logging.verbose <- x.Verbose
let id = Threading.Thread.CurrentThread.ManagedThreadId
Logging.subscribe (fun trace ->
if id = Threading.Thread.CurrentThread.ManagedThreadId then
match trace.Level with
| TraceLevel.Warning -> x.WriteWarning trace.Text
| TraceLevel.Error -> x.WriteWarning trace.Text
| _ -> x.WriteObject trace.Text
else
Diagnostics.Debug.Write(sprintf "not on main PS thread: %A" trace)
)
我有多种使用方式 System.Threading.SynchronizationContent
.Current
、.SetSynchronizationConent
、.Send
、.Post
。我还涉足 System.Threading.Tasks.TaskScheduler.FromCurrentSynchronizationContext
。我也试过 Async.SwitchToContext
。无论我做什么,System.Threading.Thread.CurrentThread.ManagedThreadId
最终都会有所不同并且 PowerShell 会抱怨。我做错了吗?
这是正在进行的工作pull request and more details about the problem。
更新 2015-06-16 星期二 11:45 太平洋标准时间上午
@RCH 谢谢,但是使用 Async.SwitchToContext
设置 SynchronizationContext
似乎不起作用。这是我执行 Paket-Restore -Force
:
时的代码和调试输出
member x.RegisterTrace() =
let a = Thread.CurrentThread.ManagedThreadId
Logging.verbose <- x.Verbose
let ctx = SynchronizationContext.Current
Logging.subscribe (fun trace ->
let b = Thread.CurrentThread.ManagedThreadId
async {
let c = Thread.CurrentThread.ManagedThreadId
do! Async.SwitchToContext ctx
let d = Thread.CurrentThread.ManagedThreadId
Debug.WriteLine (sprintf "%d %d %d %d %s" a b c d trace.Text)
} |> Async.Start
)
一位工作专家推荐了另一种解决方案,我将尝试在订阅时传递上下文。
更新 2015-06-16 星期二 5:30 太平洋标准时间下午
我得到了创建 IObservable.SubscribeOn that allows the SynchrnonizationContext to be passed in. Unfortunately, it doesn't solve the problem eithe, but may be part of the solution. May be a custom SynchronizationContext is needed like that SingleThreadSynchrnonizationContext 的帮助。我很乐意帮助制作一个,但在我这样做之前,我要尝试 System.Reactive 的 Observable.ObserveOn(Scheduler.CurrentThread)
。
更新 2015-06-16 星期二 8:30 太平洋标准时间下午
我也无法让 Rx 工作。 Scheduler.CurrentThread
doesn't behave the way I was hoping. I then tried out these changes 并且没有调用回调。
member x.RegisterTrace() =
Logging.verbose <- x.Verbose
let a = Threading.Thread.CurrentThread.ManagedThreadId
let ctx = match SynchronizationContext.Current with null -> SynchronizationContext() | sc -> sc
let sch = SynchronizationContextScheduler ctx
Logging.event.Publish.ObserveOn sch
|> Observable.subscribe (fun trace ->
let b = Threading.Thread.CurrentThread.ManagedThreadId
Debug.WriteLine(sprintf "%d %d %s" a b trace.Text)
可能需要自定义 SynchronizationContext。 :/
对于 UI 应用程序(Forms、WPF、F# interactive,...)从您的试验中选择 SynchronizationContext.Current
和 Async.SwitchToContext
以及从 [=32= 借用一些代码]小包就够了。
然而,对于控制台应用程序,没有 SynchronizationContext,因此没有线程可以继续 Post
ed,所以它们最终会在线程池中。 在 MSDN Blogs.
上找到了可能的解决方法
仅适用于 UI 应用程序的解决方案:
有
module Logging
open System.Diagnostics
type Trace = { Level: TraceLevel; Text: string }
let public event = Event<Trace>()
let subscribe callback = Observable.subscribe callback event.Publish
和
[<AutoOpen>]
module CmdletExt
open System.Diagnostics
open System.Threading
type PSCmdletStandalone() =
member x.RegisterTrace() =
let syncContext = SynchronizationContext.Current
Logging.subscribe (fun trace ->
async {
do! Async.SwitchToContext syncContext
let threadId = Thread.CurrentThread.ManagedThreadId
match trace.Level with
| TraceLevel.Warning -> printfn "WARN (on %i): %s" threadId trace.Text
| TraceLevel.Error -> printfn "ERROR (on %i): %s" threadId trace.Text
| _ -> printfn "(on %i): %s" threadId trace.Text
} |> Async.Start // or Async.StartImmediate
)
然后,在主线程上注册
#load "Logging.fs"
#load "SO.fs"
open System.Diagnostics
open System.Threading
open System
(new PSCmdletStandalone()).RegisterTrace()
printfn "Main: %i" Thread.CurrentThread.ManagedThreadId
for i in Enum.GetValues(typeof<TraceLevel>) do
async {
let workerId = Thread.CurrentThread.ManagedThreadId
do Logging.event.Trigger { Level = unbox i; Text = sprintf "From %i" workerId }
} |> Async.Start
产量例如
Main: 1
WARN (on 1): From 23
(on 1): From 25
ERROR (on 1): From 22
(on 1): From 13
(on 1): From 14
我最终创建了一个 EventSink
,它有一个回调队列,通过 Drain()
在主 PowerShell 线程上执行。我将主要计算放在另一个线程上。 pull request 有完整的代码和更多的细节。
订阅了一个发送日志消息的可观察对象。一些日志消息来自其他线程,因为它们位于 F# 异步块中。我需要能够从主线程写出消息。
这里是 the code 当前过滤掉许多日志消息,因为它们不在主线程上:
member x.RegisterTrace() =
Logging.verbose <- x.Verbose
let id = Threading.Thread.CurrentThread.ManagedThreadId
Logging.subscribe (fun trace ->
if id = Threading.Thread.CurrentThread.ManagedThreadId then
match trace.Level with
| TraceLevel.Warning -> x.WriteWarning trace.Text
| TraceLevel.Error -> x.WriteWarning trace.Text
| _ -> x.WriteObject trace.Text
else
Diagnostics.Debug.Write(sprintf "not on main PS thread: %A" trace)
)
我有多种使用方式 System.Threading.SynchronizationContent
.Current
、.SetSynchronizationConent
、.Send
、.Post
。我还涉足 System.Threading.Tasks.TaskScheduler.FromCurrentSynchronizationContext
。我也试过 Async.SwitchToContext
。无论我做什么,System.Threading.Thread.CurrentThread.ManagedThreadId
最终都会有所不同并且 PowerShell 会抱怨。我做错了吗?
这是正在进行的工作pull request and more details about the problem。
更新 2015-06-16 星期二 11:45 太平洋标准时间上午
@RCH 谢谢,但是使用 Async.SwitchToContext
设置 SynchronizationContext
似乎不起作用。这是我执行 Paket-Restore -Force
:
member x.RegisterTrace() =
let a = Thread.CurrentThread.ManagedThreadId
Logging.verbose <- x.Verbose
let ctx = SynchronizationContext.Current
Logging.subscribe (fun trace ->
let b = Thread.CurrentThread.ManagedThreadId
async {
let c = Thread.CurrentThread.ManagedThreadId
do! Async.SwitchToContext ctx
let d = Thread.CurrentThread.ManagedThreadId
Debug.WriteLine (sprintf "%d %d %d %d %s" a b c d trace.Text)
} |> Async.Start
)
一位工作专家推荐了另一种解决方案,我将尝试在订阅时传递上下文。
更新 2015-06-16 星期二 5:30 太平洋标准时间下午
我得到了创建 IObservable.SubscribeOn that allows the SynchrnonizationContext to be passed in. Unfortunately, it doesn't solve the problem eithe, but may be part of the solution. May be a custom SynchronizationContext is needed like that SingleThreadSynchrnonizationContext 的帮助。我很乐意帮助制作一个,但在我这样做之前,我要尝试 System.Reactive 的 Observable.ObserveOn(Scheduler.CurrentThread)
。
更新 2015-06-16 星期二 8:30 太平洋标准时间下午
我也无法让 Rx 工作。 Scheduler.CurrentThread
doesn't behave the way I was hoping. I then tried out these changes 并且没有调用回调。
member x.RegisterTrace() =
Logging.verbose <- x.Verbose
let a = Threading.Thread.CurrentThread.ManagedThreadId
let ctx = match SynchronizationContext.Current with null -> SynchronizationContext() | sc -> sc
let sch = SynchronizationContextScheduler ctx
Logging.event.Publish.ObserveOn sch
|> Observable.subscribe (fun trace ->
let b = Threading.Thread.CurrentThread.ManagedThreadId
Debug.WriteLine(sprintf "%d %d %s" a b trace.Text)
可能需要自定义 SynchronizationContext。 :/
对于 UI 应用程序(Forms、WPF、F# interactive,...)从您的试验中选择 SynchronizationContext.Current
和 Async.SwitchToContext
以及从 [=32= 借用一些代码]小包就够了。
然而,对于控制台应用程序,没有 SynchronizationContext,因此没有线程可以继续 Post
ed,所以它们最终会在线程池中。 在 MSDN Blogs.
仅适用于 UI 应用程序的解决方案:
有
module Logging
open System.Diagnostics
type Trace = { Level: TraceLevel; Text: string }
let public event = Event<Trace>()
let subscribe callback = Observable.subscribe callback event.Publish
和
[<AutoOpen>]
module CmdletExt
open System.Diagnostics
open System.Threading
type PSCmdletStandalone() =
member x.RegisterTrace() =
let syncContext = SynchronizationContext.Current
Logging.subscribe (fun trace ->
async {
do! Async.SwitchToContext syncContext
let threadId = Thread.CurrentThread.ManagedThreadId
match trace.Level with
| TraceLevel.Warning -> printfn "WARN (on %i): %s" threadId trace.Text
| TraceLevel.Error -> printfn "ERROR (on %i): %s" threadId trace.Text
| _ -> printfn "(on %i): %s" threadId trace.Text
} |> Async.Start // or Async.StartImmediate
)
然后,在主线程上注册
#load "Logging.fs"
#load "SO.fs"
open System.Diagnostics
open System.Threading
open System
(new PSCmdletStandalone()).RegisterTrace()
printfn "Main: %i" Thread.CurrentThread.ManagedThreadId
for i in Enum.GetValues(typeof<TraceLevel>) do
async {
let workerId = Thread.CurrentThread.ManagedThreadId
do Logging.event.Trigger { Level = unbox i; Text = sprintf "From %i" workerId }
} |> Async.Start
产量例如
Main: 1
WARN (on 1): From 23
(on 1): From 25
ERROR (on 1): From 22
(on 1): From 13
(on 1): From 14
我最终创建了一个 EventSink
,它有一个回调队列,通过 Drain()
在主 PowerShell 线程上执行。我将主要计算放在另一个线程上。 pull request 有完整的代码和更多的细节。