在多线程代码中使用 F# 事件和异步

Using a F# event and asynchronous in multi-threaded code

EDIT/Notice:事件在当前 F# 实现中现在线程安全。


我经常使用 F# 中的异步工作流和代理。当我深入研究事件时,我注意到 Event<_>() 类型不是线程安全的。

这里我说的不是引发事件的常见问题。我实际上是在谈论从事件中订阅和 removing/disposing。出于测试目的,我编写了这个简短的程序:

let event = Event<int>()
let sub   = event.Publish

[<EntryPoint>]
let main argv =
    let subscribe sub x = async {
        let mutable disposables = []
        for i=0 to x do
            let dis = Observable.subscribe (fun x -> printf "%d" x) sub
            disposables <- dis :: disposables
        for dis in disposables do
            dis.Dispose()
    }

    Async.RunSynchronously(async{
        let! x = Async.StartChild (subscribe sub 1000)
        let! y = Async.StartChild (subscribe sub 1000)
        do! x
        do! y
        event.Trigger 1
        do! Async.Sleep 2000
    })
    0

程序很简单。我创建了一个事件和一个为它订阅特定数量事件的函数,然后处理每个处理程序。我使用另一个异步计算生成这些函数的两个实例 Async.StartChild。两个函数完成后,我触发事件以查看是否还有一些处理程序。

但是当调用event.Trigger(1)时,结果是仍然有一些处理程序注册到该事件。因为一些“1”将被打印到控制台。这通常意味着订阅 and/or 处置不是线程安全的。

这是我没想到的。如果订阅和处置不是线程安全的,一般事件如何安全使用?

某些事件也可以在线程外使用,并且触发器不会并行或在不同线程上产生任何函数。但在 Async、基于代理的代码或通常与线程一起使用事件对我来说是正常的。它们通常用作收集 Backroundworker 线程信息的通信。

使用 Async.AwaitEvent 可以订阅事件。如果订阅和处置不是线程安全的,那么在这样的环境中怎么可能使用事件呢? Async.AwaitEvent 的目的是什么?考虑到异步工作流确实是线程化的,如果默认情况下 subscribing/disposing 事件不是线程安全的,那么希望仅使用 Async.AwaitEvent 基本上是“被设计破坏”。

我面临的一般问题是:订阅和处置不是线程安全的是否正确?从我的示例来看,它看起来很像,但可能我错过了一些重要的细节。我目前在我的设计中经常使用事件,我通常有 MailboxProcessors 并使用事件进行通知。所以问题是。如果事件不是线程安全的,那么我目前使用的整个设计根本就不是线程安全的。那么什么是解决这种情况的方法呢?创建一个全新的线程安全事件实现?是否已经存在一些面临这个问题的实现?或者是否有其他选项可以在高度线程化的环境中安全地使用事件?

仅供参考; Event<int> 的实现可以在 here.

中找到

有趣的一点似乎是:

member e.AddHandler(d) =
  x.multicast <- (System.Delegate.Combine(x.multicast, d) :?> Handler<'T>)
member e.RemoveHandler(d) = 
  x.multicast <- (System.Delegate.Remove(x.multicast, d) :?> Handler<'T>)

订阅事件会将当前事件处理程序与传递给订阅的事件处理程序结合起来。这个组合的事件处理程序替换了当前的事件处理程序。

从并发的角度来看,问题是这里我们有一个竞争条件,因为并发订阅者可能使用当前事件处理程序与 "last" 写回处理程序 win(最后现在并发是一个很难的概念,但是 nvm).

这里可以做的是使用 Interlocked.CompareAndExchange 引入 CAS 循环,但这会增加性能开销,从而损害非并发用户。不过,人们可以通过 PR 看看它是否受到 F# 社区的欢迎。

关于你的第二个问题,关于如何处理它,我只能说我会做什么。我会选择创建支持受保护 subscribe/unsubscribe 的 FSharpEvent 版本。如果您的公司 FOSS 政策允许,也许可以基于 FSharpEvent。如果结果成功,那么它可以形成 F# 核心库的未来 PR。

我不知道你的要求,但也有可能如果你需要的是协同程序(即异步)而不是线程,那么可以重写程序以仅使用 1 个线程,这样你就不会受到影响通过这种竞争条件。

首先感谢FuleSnable 的回答。他给我指明了正确的方向。根据他提供的信息,我自己实现了一个 ConcurrentEvent 类型。这种类型使用 Interlocked.CompareExchange 作为 adding/removing 它的处理程序,因此它是无锁的,希望是最快的方法。

我通过从 F# 编译器复制 Event 类型开始实施。 (我也按原样保留评论。)当前的实现如下所示:

type ConcurrentEvent<'T> =
    val mutable multicast : Handler<'T>
    new() = { multicast = null }

    member x.Trigger(arg:'T) =
        match x.multicast with
        | null -> ()
        | d -> d.Invoke(null,arg) |> ignore
    member x.Publish =
        // Note, we implement each interface explicitly: this works around a bug in the CLR
        // implementation on CompactFramework 3.7, used on Windows Phone 7
        { new obj() with
            member x.ToString() = "<published event>"
          interface IEvent<'T>
          interface IDelegateEvent<Handler<'T>> with
            member e.AddHandler(d) =
                let mutable exchanged = false
                while exchanged = false do
                    System.Threading.Thread.MemoryBarrier()
                    let dels    = x.multicast
                    let newDels = System.Delegate.Combine(dels, d) :?> Handler<'T>
                    let result  = System.Threading.Interlocked.CompareExchange(&x.multicast, newDels, dels)
                    if obj.ReferenceEquals(dels,result) then
                        exchanged <- true
            member e.RemoveHandler(d) =
                let mutable exchanged = false
                while exchanged = false do
                    System.Threading.Thread.MemoryBarrier()
                    let dels    = x.multicast
                    let newDels = System.Delegate.Remove(dels, d) :?> Handler<'T>
                    let result  = System.Threading.Interlocked.CompareExchange(&x.multicast, newDels, dels)
                    if obj.ReferenceEquals(dels,result) then
                        exchanged <- true
          interface System.IObservable<'T> with
            member e.Subscribe(observer) =
                let h = new Handler<_>(fun sender args -> observer.OnNext(args))
                (e :?> IEvent<_,_>).AddHandler(h)
                { new System.IDisposable with
                    member x.Dispose() = (e :?> IEvent<_,_>).RemoveHandler(h) } }

设计的一些注意事项:

  • 我从递归循环开始。但是这样做并查看编译后的代码,它会创建一个匿名 class 并调用 AddHandler 或 RemoveHandler 创建一个对象。通过直接实现 while 循环,它可以避免在新处理程序为 added/removed.
  • 时实例化对象
  • 我明确使用 obj.ReferenceEquals 来避免通用哈希相等。

至少在我的测试中 adding/removing 处理程序现在似乎是线程安全的。 ConcurrentEvent 可以根据需要与 Event 类型交换。


一个基准,如果人们好奇 ConcurrentEventEvent 相比会慢多少:

let stopWatch () = System.Diagnostics.Stopwatch.StartNew()

let event = Event<int>()
let sub   = event.Publish

let cevent = ConcurrentEvent<int>()
let csub   = cevent.Publish

let subscribe sub x = async {
    let mutable disposables = []
    for i=0 to x do
        let dis = Observable.subscribe (fun x -> printf "%d" x) sub
        disposables <- dis :: disposables
    for dis in disposables do
        dis.Dispose()
}

let sw = stopWatch()
Async.RunSynchronously(async{
    // Amount of tries
    let tries = 10000

    // benchmarking Event subscribe/unsubscribing
    let sw = stopWatch()
    let! x = Async.StartChild (subscribe sub tries)
    let! y = Async.StartChild (subscribe sub tries)
    do! x
    do! y
    sw.Stop()
    printfn "Event: %O" sw.Elapsed
    do! Async.Sleep 1000
    event.Trigger 1
    do! Async.Sleep 2000

    // Benchmarking ConcurrentEvent subscribe/unsubscribing
    let sw = stopWatch()
    let! x = Async.StartChild (subscribe csub tries)
    let! y = Async.StartChild (subscribe csub tries)
    do! x
    do! y
    sw.Stop()
    printfn "\nConcurrentEvent: %O" sw.Elapsed
    do! Async.Sleep 1000
    cevent.Trigger 1
    do! Async.Sleep 2000
})

在我的系统上 subscribing/unsubscribing 10,000 个具有非线程安全 Event 的处理程序需要大约 1.4 秒 才能完成。

线程安全 ConcurrentEvent 大约需要 1.8 秒 完成。所以我认为开销很低。