C# Rx Observable 产生随机结果
C# Rx Observable producing random results
考虑以下程序;
class Program
{
static IObservable<int> GetNumbers()
{
var observable = Observable.Empty<int>();
foreach (var i in Enumerable.Range(1, 10))
{
observable = observable.Concat(Observable.FromAsync(() => Task.Run(() =>
{
Console.WriteLine($"Producing {i}");
Thread.Sleep(1000);
return i;
})));
}
return observable;
}
static async Task LogNumbers(IObservable<int> observable)
{
var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));
await observable;
subscription.Dispose();
}
static void Main(string[] args)
{
LogNumbers(GetNumbers()).Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
它产生以下输出
Producing 1
Producing 1
Producing 2
Consuming 1
Producing 2
Producing 3
Consuming 2
Producing 3
Producing 4
Consuming 3
Producing 4
Producing 5
Consuming 4
Producing 5
Producing 6
Consuming 5
Producing 6
Producing 7
Consuming 6
Producing 7
Producing 8
Consuming 7
Producing 8
Producing 9
Consuming 8
Producing 9
Producing 10
Consuming 9
Producing 10
Finished
它写出每两个“Producing x”语句和一个“Consuming x”语句。为什么要这样做?为什么它永远不会写出预期的最终“Consuming 10”语句?
您将获得两份 Producing 系列,因为您订阅了两次。最有可能的是,您没有得到消耗 10,因为当第二个订阅结束时第一个订阅被取消。如果您有时确实获得消耗 10,我不会感到惊讶,只是因为当时任务 运行 的顺序不同。
static async Task LogNumbers(IObservable<int> observable)
{
//This is the first subscription
var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));
//This is the second subscription
await observable;
subscription.Dispose();
}
您的 GetNumbers
函数的编写方式,每个对 observable 的订阅都会触发它自己的一组 10 个任务到 运行,从而触发它自己的一组输出。第一个订阅还监视生成的值并输出一条 Consuming 行。第二个订阅对生成的值不做任何事情,因为您没有使用 await observable
的值,但会导致第二组任务 运行.
您可以通过在 LogNumbers 的参数上使用 Publish().RefCount()
或使用 TaskCompletionSource 并通过您目前在第一个订阅中未使用的 OnError 和 OnComplete 函数将其标记为完成来消除第二个订阅.那些看起来像这样:
static async Task LogNumbersWithRefCount(IObservable<int> observable)
{
observable = observable.Publish().RefCount();
var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));
await observable;
subscription.Dispose();
}
static async Task LogNumbersTCS(IObservable<int> observable)
{
var t = new TaskCompletionSource<object>()
var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"),
ex => t.TrySetException(ex),
() => t.TrySetResult(null));
return t.Task;
}
Gideon 为您解决了这个问题,但当我开始在评论中添加一些提示时,我认为 post 一个完整的解决方案可能会很好。试试这个:
static IObservable<int> GetNumbers() =>
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(i => (int)i + 1)
.Do(i => Console.WriteLine($"Producing {i}"))
.Take(10);
static Task LogNumbers(IObservable<int> observable) =>
observable
.Do(i => Console.WriteLine($"Consuming {i}"))
.ToArray()
.ToTask();
static void Main(string[] args)
{
LogNumbers(GetNumbers()).Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
或者,更干净:
static IObservable<int> GetNumbers() =>
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(i => (int)i + 1)
.Do(i => Console.WriteLine($"Producing {i}"))
.Take(10);
static IObservable<int> LogNumbers(IObservable<int> observable) =>
observable
.Do(i => Console.WriteLine($"Consuming {i}"));
static async Task Main(string[] args)
{
await LogNumbers(GetNumbers());
Console.WriteLine("Finished");
Console.ReadLine();
}
您可以直接 await
observables。
考虑以下程序;
class Program
{
static IObservable<int> GetNumbers()
{
var observable = Observable.Empty<int>();
foreach (var i in Enumerable.Range(1, 10))
{
observable = observable.Concat(Observable.FromAsync(() => Task.Run(() =>
{
Console.WriteLine($"Producing {i}");
Thread.Sleep(1000);
return i;
})));
}
return observable;
}
static async Task LogNumbers(IObservable<int> observable)
{
var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));
await observable;
subscription.Dispose();
}
static void Main(string[] args)
{
LogNumbers(GetNumbers()).Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
}
它产生以下输出
Producing 1
Producing 1
Producing 2
Consuming 1
Producing 2
Producing 3
Consuming 2
Producing 3
Producing 4
Consuming 3
Producing 4
Producing 5
Consuming 4
Producing 5
Producing 6
Consuming 5
Producing 6
Producing 7
Consuming 6
Producing 7
Producing 8
Consuming 7
Producing 8
Producing 9
Consuming 8
Producing 9
Producing 10
Consuming 9
Producing 10
Finished
它写出每两个“Producing x”语句和一个“Consuming x”语句。为什么要这样做?为什么它永远不会写出预期的最终“Consuming 10”语句?
您将获得两份 Producing 系列,因为您订阅了两次。最有可能的是,您没有得到消耗 10,因为当第二个订阅结束时第一个订阅被取消。如果您有时确实获得消耗 10,我不会感到惊讶,只是因为当时任务 运行 的顺序不同。
static async Task LogNumbers(IObservable<int> observable)
{
//This is the first subscription
var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));
//This is the second subscription
await observable;
subscription.Dispose();
}
您的 GetNumbers
函数的编写方式,每个对 observable 的订阅都会触发它自己的一组 10 个任务到 运行,从而触发它自己的一组输出。第一个订阅还监视生成的值并输出一条 Consuming 行。第二个订阅对生成的值不做任何事情,因为您没有使用 await observable
的值,但会导致第二组任务 运行.
您可以通过在 LogNumbers 的参数上使用 Publish().RefCount()
或使用 TaskCompletionSource 并通过您目前在第一个订阅中未使用的 OnError 和 OnComplete 函数将其标记为完成来消除第二个订阅.那些看起来像这样:
static async Task LogNumbersWithRefCount(IObservable<int> observable)
{
observable = observable.Publish().RefCount();
var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));
await observable;
subscription.Dispose();
}
static async Task LogNumbersTCS(IObservable<int> observable)
{
var t = new TaskCompletionSource<object>()
var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"),
ex => t.TrySetException(ex),
() => t.TrySetResult(null));
return t.Task;
}
Gideon 为您解决了这个问题,但当我开始在评论中添加一些提示时,我认为 post 一个完整的解决方案可能会很好。试试这个:
static IObservable<int> GetNumbers() =>
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(i => (int)i + 1)
.Do(i => Console.WriteLine($"Producing {i}"))
.Take(10);
static Task LogNumbers(IObservable<int> observable) =>
observable
.Do(i => Console.WriteLine($"Consuming {i}"))
.ToArray()
.ToTask();
static void Main(string[] args)
{
LogNumbers(GetNumbers()).Wait();
Console.WriteLine("Finished");
Console.ReadLine();
}
或者,更干净:
static IObservable<int> GetNumbers() =>
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(i => (int)i + 1)
.Do(i => Console.WriteLine($"Producing {i}"))
.Take(10);
static IObservable<int> LogNumbers(IObservable<int> observable) =>
observable
.Do(i => Console.WriteLine($"Consuming {i}"));
static async Task Main(string[] args)
{
await LogNumbers(GetNumbers());
Console.WriteLine("Finished");
Console.ReadLine();
}
您可以直接 await
observables。