Fsharpx Async.AwaitObservable 不调用取消继续
Fsharpx Async.AwaitObservable does not call cancellation continuation
我正在尝试在使用 Async.StartWithContinuations
启动的异步工作流中使用 Fsharpx' Async.AwaitObservable
。出于某种原因,如果用于启动此工作流的取消令牌在等待可观察对象时被取消(但不是在工作流的其他部分),则永远不会调用取消继续。但是,如果我把它放在 use! __ = Async.OnCancel (interruption)
中,那么中断函数就会被调用。有人可以澄清为什么会发生这种情况以及最好的方法是什么,并确保始终调用其中一个延续函数吗?
open System
open System.Reactive.Linq
open FSharp.Control.Observable
open System.Threading
[<EntryPoint>]
let main _ =
let cancellationCapability = new CancellationTokenSource()
let tick = Observable.Interval(TimeSpan.FromSeconds 1.0)
let test = async {
let! __ = Async.AwaitObservable tick
printfn "Got a thing." }
Async.StartWithContinuations(test,
(fun () -> printfn "Finished"),
(fun exn -> printfn "Error!"),
(fun exn -> printfn "Canceled!"),
cancellationCapability.Token)
Thread.Sleep 100
printfn "Cancelling..."
cancellationCapability.Cancel()
Console.ReadLine() |> ignore
0 // return an integer exit code
在我看来,AwaitObservable 的实现方式也存在问题。祝你好运。
也就是说,您可以在客户端代码中使用的一种解决方法是将 AwaitObservable 包装在任务中:
async {
let! ct = Async.CancellationToken
let! __ =
Async.StartAsTask(Async.AwaitObservable tick, cancellationToken = ct)
|> Async.AwaitTask
printfn "Got a thing."
}
不理想,但有效。
GitHub 上的 Fsharpx 版本似乎已经包含一个修复程序(不是我实现的)。但是,NuGet (1.8.41) 上的当前版本尚未更新以包含此修复。查看更改 here。
编辑 1:
GitHub 上的代码也存在一些与具有重放语义的 Observables 相关的问题。我现在已经修复了这个问题,但希望有一个更清洁的解决方案。想了想有没有什么办法可以更简单点,再提交PR。
/// Creates an asynchronous workflow that will be resumed when the
/// specified observables produces a value. The workflow will return
/// the value produced by the observable.
static member AwaitObservable(observable : IObservable<'T1>) =
let removeObj : IDisposable option ref = ref None
let removeLock = new obj()
let setRemover r =
lock removeLock (fun () -> removeObj := Some r)
let remove() =
lock removeLock (fun () ->
match !removeObj with
| Some d -> removeObj := None
d.Dispose()
| None -> ())
synchronize (fun f ->
let workflow =
Async.FromContinuations((fun (cont,econt,ccont) ->
let rec finish cont value =
remove()
f (fun () -> cont value)
setRemover <|
observable.Subscribe
({ new IObserver<_> with
member x.OnNext(v) = finish cont v
member x.OnError(e) = finish econt e
member x.OnCompleted() =
let msg = "Cancelling the workflow, because the Observable awaited using AwaitObservable has completed."
finish ccont (new System.OperationCanceledException(msg)) })
() ))
async {
let! cToken = Async.CancellationToken
let token : CancellationToken = cToken
#if NET40
use registration = token.Register(fun () -> remove())
#else
use registration = token.Register((fun _ -> remove()), null)
#endif
return! workflow
})
static member AwaitObservable(observable : IObservable<'T1>) =
let synchronize f =
let ctx = System.Threading.SynchronizationContext.Current
f (fun g ->
let nctx = System.Threading.SynchronizationContext.Current
if ctx <> null && ctx <> nctx then ctx.Post((fun _ -> g()), null)
else g() )
let continued = ref false
let continuedLock = new obj()
let removeObj : IDisposable option ref = ref None
let removeLock = new obj()
let setRemover r =
lock removeLock (fun () -> removeObj := Some r)
let remove() =
lock removeLock (fun () ->
match !removeObj with
| Some d ->
removeObj := None
d.Dispose()
| None -> ())
synchronize (fun f ->
let workflow =
Async.FromContinuations((fun (cont,econt,ccont) ->
let rec finish cont value =
remove()
f (fun () -> lock continuedLock (fun () ->
if not !continued then
cont value
continued := true))
let observer =
observable.Subscribe
({ new IObserver<_> with
member __.OnNext(v) = finish cont v
member __.OnError(e) = finish econt e
member __.OnCompleted() =
let msg = "Cancelling the workflow, because the Observable awaited using AwaitObservable has completed."
finish ccont (new System.OperationCanceledException(msg)) })
lock continuedLock (fun () -> if not !continued then setRemover observer else observer.Dispose())
() ))
async {
let! cToken = Async.CancellationToken
let token : CancellationToken = cToken
use __ = token.Register((fun _ -> remove()), null)
return! workflow
})
编辑 2:
对可观察到的热点问题进行更简洁的修复...
let AwaitObservable(observable : IObservable<'T>) = async {
let! token = Async.CancellationToken // capture the current cancellation token
return! Async.FromContinuations(fun (cont, econt, ccont) ->
// start a new mailbox processor which will await the result
Agent.Start((fun (mailbox : Agent<Choice<'T, exn, OperationCanceledException>>) ->
async {
// register a callback with the cancellation token which posts a cancellation message
#if NET40
use __ = token.Register((fun _ ->
mailbox.Post (Choice3Of3 (new OperationCanceledException("The opeartion was cancelled.")))))
#else
use __ = token.Register((fun _ ->
mailbox.Post (Choice3Of3 (new OperationCanceledException("The opeartion was cancelled.")))), null)
#endif
// subscribe to the observable: if an error occurs post an error message and post the result otherwise
use __ =
observable.FirstAsync()
.Catch(fun exn -> mailbox.Post(Choice2Of3 exn) ; Observable.Empty())
.Subscribe(fun result -> mailbox.Post(Choice1Of3 result))
// wait for the first of these messages and call the appropriate continuation function
let! message = mailbox.Receive()
match message with
| Choice1Of3 reply -> cont reply
| Choice2Of3 exn -> econt exn
| Choice3Of3 exn -> ccont exn })) |> ignore) }
我正在尝试在使用 Async.StartWithContinuations
启动的异步工作流中使用 Fsharpx' Async.AwaitObservable
。出于某种原因,如果用于启动此工作流的取消令牌在等待可观察对象时被取消(但不是在工作流的其他部分),则永远不会调用取消继续。但是,如果我把它放在 use! __ = Async.OnCancel (interruption)
中,那么中断函数就会被调用。有人可以澄清为什么会发生这种情况以及最好的方法是什么,并确保始终调用其中一个延续函数吗?
open System
open System.Reactive.Linq
open FSharp.Control.Observable
open System.Threading
[<EntryPoint>]
let main _ =
let cancellationCapability = new CancellationTokenSource()
let tick = Observable.Interval(TimeSpan.FromSeconds 1.0)
let test = async {
let! __ = Async.AwaitObservable tick
printfn "Got a thing." }
Async.StartWithContinuations(test,
(fun () -> printfn "Finished"),
(fun exn -> printfn "Error!"),
(fun exn -> printfn "Canceled!"),
cancellationCapability.Token)
Thread.Sleep 100
printfn "Cancelling..."
cancellationCapability.Cancel()
Console.ReadLine() |> ignore
0 // return an integer exit code
在我看来,AwaitObservable 的实现方式也存在问题。祝你好运。
也就是说,您可以在客户端代码中使用的一种解决方法是将 AwaitObservable 包装在任务中:
async {
let! ct = Async.CancellationToken
let! __ =
Async.StartAsTask(Async.AwaitObservable tick, cancellationToken = ct)
|> Async.AwaitTask
printfn "Got a thing."
}
不理想,但有效。
GitHub 上的 Fsharpx 版本似乎已经包含一个修复程序(不是我实现的)。但是,NuGet (1.8.41) 上的当前版本尚未更新以包含此修复。查看更改 here。
编辑 1: GitHub 上的代码也存在一些与具有重放语义的 Observables 相关的问题。我现在已经修复了这个问题,但希望有一个更清洁的解决方案。想了想有没有什么办法可以更简单点,再提交PR。
/// Creates an asynchronous workflow that will be resumed when the
/// specified observables produces a value. The workflow will return
/// the value produced by the observable.
static member AwaitObservable(observable : IObservable<'T1>) =
let removeObj : IDisposable option ref = ref None
let removeLock = new obj()
let setRemover r =
lock removeLock (fun () -> removeObj := Some r)
let remove() =
lock removeLock (fun () ->
match !removeObj with
| Some d -> removeObj := None
d.Dispose()
| None -> ())
synchronize (fun f ->
let workflow =
Async.FromContinuations((fun (cont,econt,ccont) ->
let rec finish cont value =
remove()
f (fun () -> cont value)
setRemover <|
observable.Subscribe
({ new IObserver<_> with
member x.OnNext(v) = finish cont v
member x.OnError(e) = finish econt e
member x.OnCompleted() =
let msg = "Cancelling the workflow, because the Observable awaited using AwaitObservable has completed."
finish ccont (new System.OperationCanceledException(msg)) })
() ))
async {
let! cToken = Async.CancellationToken
let token : CancellationToken = cToken
#if NET40
use registration = token.Register(fun () -> remove())
#else
use registration = token.Register((fun _ -> remove()), null)
#endif
return! workflow
})
static member AwaitObservable(observable : IObservable<'T1>) =
let synchronize f =
let ctx = System.Threading.SynchronizationContext.Current
f (fun g ->
let nctx = System.Threading.SynchronizationContext.Current
if ctx <> null && ctx <> nctx then ctx.Post((fun _ -> g()), null)
else g() )
let continued = ref false
let continuedLock = new obj()
let removeObj : IDisposable option ref = ref None
let removeLock = new obj()
let setRemover r =
lock removeLock (fun () -> removeObj := Some r)
let remove() =
lock removeLock (fun () ->
match !removeObj with
| Some d ->
removeObj := None
d.Dispose()
| None -> ())
synchronize (fun f ->
let workflow =
Async.FromContinuations((fun (cont,econt,ccont) ->
let rec finish cont value =
remove()
f (fun () -> lock continuedLock (fun () ->
if not !continued then
cont value
continued := true))
let observer =
observable.Subscribe
({ new IObserver<_> with
member __.OnNext(v) = finish cont v
member __.OnError(e) = finish econt e
member __.OnCompleted() =
let msg = "Cancelling the workflow, because the Observable awaited using AwaitObservable has completed."
finish ccont (new System.OperationCanceledException(msg)) })
lock continuedLock (fun () -> if not !continued then setRemover observer else observer.Dispose())
() ))
async {
let! cToken = Async.CancellationToken
let token : CancellationToken = cToken
use __ = token.Register((fun _ -> remove()), null)
return! workflow
})
编辑 2: 对可观察到的热点问题进行更简洁的修复...
let AwaitObservable(observable : IObservable<'T>) = async {
let! token = Async.CancellationToken // capture the current cancellation token
return! Async.FromContinuations(fun (cont, econt, ccont) ->
// start a new mailbox processor which will await the result
Agent.Start((fun (mailbox : Agent<Choice<'T, exn, OperationCanceledException>>) ->
async {
// register a callback with the cancellation token which posts a cancellation message
#if NET40
use __ = token.Register((fun _ ->
mailbox.Post (Choice3Of3 (new OperationCanceledException("The opeartion was cancelled.")))))
#else
use __ = token.Register((fun _ ->
mailbox.Post (Choice3Of3 (new OperationCanceledException("The opeartion was cancelled.")))), null)
#endif
// subscribe to the observable: if an error occurs post an error message and post the result otherwise
use __ =
observable.FirstAsync()
.Catch(fun exn -> mailbox.Post(Choice2Of3 exn) ; Observable.Empty())
.Subscribe(fun result -> mailbox.Post(Choice1Of3 result))
// wait for the first of these messages and call the appropriate continuation function
let! message = mailbox.Receive()
match message with
| Choice1Of3 reply -> cont reply
| Choice2Of3 exn -> econt exn
| Choice3Of3 exn -> ccont exn })) |> ignore) }