Post 从异步线程到 F# 主线程的消息

Post messages from async threads to main thread in F#

订阅了一个发送日志消息的可观察对象。一些日志消息来自其他线程,因为它们位于 F# 异步块中。我需要能够从主线程写出消息。

这里是 the code 当前过滤掉许多日志消息,因为它们不在主线程上:

member x.RegisterTrace() =
    Logging.verbose <- x.Verbose
    let id = Threading.Thread.CurrentThread.ManagedThreadId
    Logging.subscribe (fun trace ->
        if id = Threading.Thread.CurrentThread.ManagedThreadId then
            match trace.Level with
            | TraceLevel.Warning -> x.WriteWarning trace.Text
            | TraceLevel.Error -> x.WriteWarning trace.Text
            | _ -> x.WriteObject trace.Text
        else
            Diagnostics.Debug.Write(sprintf "not on main PS thread: %A" trace)
    )

我有多种使用方式 System.Threading.SynchronizationContent .Current.SetSynchronizationConent.Send.Post。我还涉足 System.Threading.Tasks.TaskScheduler.FromCurrentSynchronizationContext。我也试过 Async.SwitchToContext。无论我做什么,System.Threading.Thread.CurrentThread.ManagedThreadId 最终都会有所不同并且 PowerShell 会抱怨。我做错了吗?

这是正在进行的工作pull request and more details about the problem

更新 2015-06-16 星期二 11:45 太平洋标准时间上午

@RCH 谢谢,但是使用 Async.SwitchToContext 设置 SynchronizationContext 似乎不起作用。这是我执行 Paket-Restore -Force:

时的代码和调试输出
member x.RegisterTrace() =
    let a = Thread.CurrentThread.ManagedThreadId
    Logging.verbose <- x.Verbose
    let ctx = SynchronizationContext.Current
    Logging.subscribe (fun trace ->
        let b = Thread.CurrentThread.ManagedThreadId
        async {
            let c = Thread.CurrentThread.ManagedThreadId
            do! Async.SwitchToContext ctx
            let d = Thread.CurrentThread.ManagedThreadId
            Debug.WriteLine (sprintf "%d %d %d %d %s" a b c d trace.Text)
        } |> Async.Start
    )

一位工作专家推荐了另一种解决方案,我将尝试在订阅时传递上下文。

更新 2015-06-16 星期二 5:30 太平洋标准时间下午

我得到了创建 IObservable.SubscribeOn that allows the SynchrnonizationContext to be passed in. Unfortunately, it doesn't solve the problem eithe, but may be part of the solution. May be a custom SynchronizationContext is needed like that SingleThreadSynchrnonizationContext 的帮助。我很乐意帮助制作一个,但在我这样做之前,我要尝试 System.Reactive 的 Observable.ObserveOn(Scheduler.CurrentThread)

更新 2015-06-16 星期二 8:30 太平洋标准时间下午

我也无法让 Rx 工作。 Scheduler.CurrentThread doesn't behave the way I was hoping. I then tried out these changes 并且没有调用回调。

member x.RegisterTrace() =
    Logging.verbose <- x.Verbose
    let a = Threading.Thread.CurrentThread.ManagedThreadId
    let ctx = match SynchronizationContext.Current with null -> SynchronizationContext() | sc -> sc
    let sch = SynchronizationContextScheduler ctx
    Logging.event.Publish.ObserveOn sch
    |> Observable.subscribe (fun trace ->
        let b = Threading.Thread.CurrentThread.ManagedThreadId
        Debug.WriteLine(sprintf "%d %d %s" a b trace.Text)

可能需要自定义 SynchronizationContext。 :/

对于 UI 应用程序(Forms、WPF、F# interactive,...)从您的试验中选择 SynchronizationContext.CurrentAsync.SwitchToContext 以及从 [=32= 借用一些代码]小包就够了。

然而,对于控制台应用程序,没有 SynchronizationContext,因此没有线程可以继续 Posted,所以它们最终会在线程池中。MSDN Blogs.

上找到了可能的解决方法

仅适用于 UI 应用程序的解决方案:

module Logging

open System.Diagnostics

type Trace = { Level: TraceLevel; Text: string }

let public event = Event<Trace>()

let subscribe callback = Observable.subscribe callback event.Publish

[<AutoOpen>]
module CmdletExt

open System.Diagnostics
open System.Threading

type PSCmdletStandalone() =
    member x.RegisterTrace() =
        let syncContext = SynchronizationContext.Current
        Logging.subscribe (fun trace ->
            async {
                do! Async.SwitchToContext syncContext
                let threadId = Thread.CurrentThread.ManagedThreadId
                match trace.Level with
                | TraceLevel.Warning -> printfn "WARN (on %i): %s" threadId trace.Text
                | TraceLevel.Error -> printfn "ERROR (on %i): %s" threadId trace.Text
                | _ -> printfn "(on %i): %s" threadId trace.Text
            } |> Async.Start // or Async.StartImmediate
        )

然后,在主线程上注册

#load "Logging.fs"
#load "SO.fs"

open System.Diagnostics
open System.Threading
open System

(new PSCmdletStandalone()).RegisterTrace()

printfn "Main: %i" Thread.CurrentThread.ManagedThreadId

for i in Enum.GetValues(typeof<TraceLevel>) do
    async {
        let workerId = Thread.CurrentThread.ManagedThreadId
        do Logging.event.Trigger { Level = unbox i; Text = sprintf "From %i" workerId }
    } |> Async.Start

产量例如

Main: 1
WARN (on 1): From 23
(on 1): From 25
ERROR (on 1): From 22
(on 1): From 13
(on 1): From 14

我最终创建了一个 EventSink,它有一个回调队列,通过 Drain() 在主 PowerShell 线程上执行。我将主要计算放在另一个线程上。 pull request 有完整的代码和更多的细节。