强制 lambda 作为 Rx 序列的一部分重新评估

Force lambda to re-evaluate as part of Rx sequence

我正在尝试生成进度消息并通过 IObservable 发布它们。

Func<JobProgressMessage> nextMsg = () => ProgressManager.InProgressMessage("progressing");

var o = Observable
    .Return(nextMsg())
    .Repeat()
    .Timeout(TimeSpan.FromSeconds(2))
    .Retry(100)
    .Finally(() =>
        job.AddMessage<ProgressCompleted>(ProgressManager.CompletedMessage("Completed")));

我发现 nextMsg() 只被评估一次并且发布了相同的消息。我希望 Repeat 会导致 lambda 每次都重新计算。

根据 nhabuiduc 的建议,最简单的方法是

var o = Observable.Return(ProgressManager.InProgressMessage("progressing"))
                          .Repeat()
                          .Timeout(TimeSpan.FromSeconds(2))
                          .Retry(100)
                          .Finally(() => job.AddMessage<ProgressCompleted>(ProgressManager.CompletedMessage("Completed")));

var s = o.Subscribe(m => job.AddMessage(ProgressManager.InProgressMessage("progressing")));

但最后,因为我想玩 Observables,所以我得到了

var s = Observable.Defer(()=>Observable.Return(nextMsg()))
                  .Delay(TimeSpan.FromSeconds(2))
                  .Repeat(100)
                  .Finally(() => job.AddMessage<ProgressCompleted>(ProgressManager.CompletedMessage("Completed")))
                  .SubscribeOn(Scheduler.ThreadPool)
                  .Subscribe(m => job.AddMessage(m));

每当您需要在每次订阅您的 Observable 时重新评估某些内容(包括 Repeat 的重复订阅),Defer 就是您想要的:

Func<JobProgressMessage> nextMsg = () => ProgressManager.InProgressMessage("progressing");

var o = Observable
    .Defer(() => Observable
        .Return(nextMsg()))
    .Repeat()
    .Timeout(TimeSpan.FromSeconds(2))
    .Retry(100)
    .Finally(() => job.AddMessage<ProgressCompleted>ProgressManager.CompletedMessage("Completed")));

基本上,在订阅时,Defer 评估传入的函数以获得每个订阅的新观察值。然后,当然nextMsg也会被重新评估。