在 F# 中混合 IObservable 和 Async<'a>

Mixing IObservable and Async<'a> in F#

我有一个图书馆提供的 IObservable,它监听来自外部服务的事件:

let startObservable () : IObservable<'a> = failwith "Given"

对于每个接收到的事件,我想执行一个操作 returns Async:

let action (item: 'a) : Async<unit> = failwith "Given"

我正在尝试在

行中实现处理器
let processor () : Async<unit> =
    startObservable()
    |> Observable.mapAsync action
    |> Async.AwaitObservable

我编了 mapAsyncAwaitObservable:理想情况下,它们应该由某个图书馆提供,但我目前还没有找到。

额外要求:

关于我应该使用的库的任何提示?

由于您希望将基于推的模型 (IObservable<>) 转换为基于拉的模型 (Async<>),因此您需要排队以缓冲来自 observable 的数据。如果队列有大小限制——说实话。应该是为了使整个管道安全,不会溢出内存 - 然后还需要缓冲区溢出策略。

  1. 一种方法是实现一个 MailboxProcessor<> 和自定义可观察对象,这将向它提供 Post 数据。由于 MP 是本机 F# actor 实现,它能够使用队列进行有序处理以缓冲峰值。
  2. 另一种选择是使用 FSharp.Control.AsyncSeq(特别是 AsyncSeq.ofObservableBuffered 函数),它将 observable 转换为基于拉取的异步可枚举 - 在其下方从第一点开始使用邮箱处理器:

    startObservable()
    |> AsyncSeq.ofObservableBuffered
    |> AsyncSeq.iterAsync action
    

Hopac 可以轻松地与 IObservables 互操作 您可以使用 Job.toAsync

将 Hopac Jobs 转换为 Async
open System
open Hopac

let startObservable () : IObservable<'a> = failwith "Given"

let action (item: 'a) : Job<unit> = failwith "Given"

let processor () : Job<unit> =
    startObservable()
    |> Stream.ofObservable
    |> Stream.mapJob action
    |> Stream.iter