如何使用任务并行库中的 Task.Run() 启动冷可观察对象?
How to start a cold observable using Task.Run() from the task parallel library?
我们有这样一种情况,我们想要在 C# 应用程序中启动一个后台 "polling" 操作,该应用程序 returns 定期使用反应式扩展值。我们要实施的流程如下:
- 一个调用者调用一个类似
Poll()
的方法,该方法 returns 一个 IObservable
- 调用者订阅所述可观察对象,并启动一个后台 thread/task 与硬件交互以在某个时间间隔检索值
- 当调用者完成后,它会处理订阅并自动停止后台 thread/task
尝试 #1
为了证明这一点,我编写了以下控制台应用程序,但这并不符合我的预期:
public class OutputParameters
{
public Guid Id { get; set; }
public int Value { get; set; }
}
public class Program
{
static void Main(string[] args)
{
Console.WriteLine("Requesting the polling operation");
var worker1 = Poll();
Console.WriteLine("Subscribing to start the polling operation");
var sub1 = worker1.Subscribe(
value => { Console.WriteLine($"Thread {value.Id} emitted {value.Value}"); },
ex => { Console.WriteLine($"Thread threw an exception: {ex.Message}"); },
() => { Console.WriteLine("Thread has completed"); });
Thread.Sleep(5000);
sub1.Dispose();
Console.ReadLine();
}
private static IObservable<OutputParameters> Poll()
{
return Observable.DeferAsync(Worker);
}
private static Task<IObservable<OutputParameters>> Worker(CancellationToken token)
{
var subject = new Subject<OutputParameters>();
Task.Run(async () =>
{
var id = Guid.NewGuid();
const int steps = 10;
try
{
for (var i = 1; i <= steps || token.IsCancellationRequested; i++)
{
Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}");
subject.OnNext(new OutputParameters { Id = id, Value = i });
// This will actually throw an exception if it's the active call when
// the token is cancelled.
//
await Task.Delay(1000, token);
}
}
catch (TaskCanceledException ex)
{
// Interestingly, if this is triggered because the caller unsibscribed then
// this is unneeded...the caller isn't listening for this error anymore
//
subject.OnError(ex);
}
if (token.IsCancellationRequested)
{
Console.WriteLine($"[IN THREAD] Thread {id} was cancelled");
}
else
{
Console.WriteLine($"[IN THREAD] Thread {id} exiting normally");
subject.OnCompleted();
}
}, token);
return Task.FromResult(subject.AsObservable());
}
}
上面的代码实际上似乎几乎立即取消了后台任务,因为这是输出:
Requesting the polling operation
Subscribing to start the polling operation
[IN THREAD] Thread a470e6f4-2e62-4a3c-abe6-670bce39b6de: Step 1 of 10
Thread threw an exception: A task was canceled.
[IN THREAD] Thread a470e6f4-2e62-4a3c-abe6-670bce39b6de was cancelled
尝试#2
然后我尝试对 Worker
方法做一个小改动,使其异步并等待 Task.Run
调用,如下所示:
private static async Task<IObservable<OutputParameters>> Worker(CancellationToken token)
{
var subject = new Subject<OutputParameters>();
await Task.Run(async () =>
{
...what happens in here is unchanged...
}, token);
return subject.AsObservable();
}
虽然这里的结果让后台任务看起来具有完全控制权,因为它在被取消之前 运行 持续了大约 5 秒,但是订阅回调没有输出。这是完整的输出:
Requesting the polling operation
Subscribing to start the polling operation
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 1 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 2 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 3 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 4 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 5 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 6 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a was cancelled
我的问题
所以很明显我没有完全理解这里发生的事情,或者在这种情况下使用 DeferAsync
是 observable 的正确创建方法。
有没有合适的方法来实现这种方法?
如果仅 RX 解决方案就足够了,这就可以了。如果你问我更干净...
static IObservable<OutputParameters> Poll()
{
const int steps = 10;
return Observable.Defer<Guid>(() => Observable.Return(Guid.NewGuid()))
.SelectMany(id =>
Observable.Generate(1, i => i <= steps, i => i + 1, i => i, _ => TimeSpan.FromMilliseconds(1000))
.ObserveOn(new EventLoopScheduler())
.Do(i => Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}"))
.Select(i => new OutputParameters { Id = id, Value = i })
);
}
解释:
Generate
就像 Rx 的 for 循环。最后一个参数控制何时发出项目。这相当于你的 for 循环 + Task.Delay
.
ObserveOn
控制where/whenobservable被观察到。在这种情况下,EventLoopScheduler
将为每个订阅者启动一个新线程,并且来自该可观察对象的所有项目都将在新线程上被观察到。
来自 Enigmativity:
static IObservable<OutputParameters> Poll()
{
const int steps = 10;
return Observable.Defer<OutputParameters>(() =>
{
var id = Guid.NewGuid();
return Observable.Generate(1, i => i <= steps, i => i + 1, i => i,
_ => TimeSpan.FromMilliseconds(1000), new EventLoopScheduler())
.Do(i => Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}"))
.Select(i => new OutputParameters { Id = id, Value = i });
});
}
我们有这样一种情况,我们想要在 C# 应用程序中启动一个后台 "polling" 操作,该应用程序 returns 定期使用反应式扩展值。我们要实施的流程如下:
- 一个调用者调用一个类似
Poll()
的方法,该方法 returns 一个IObservable
- 调用者订阅所述可观察对象,并启动一个后台 thread/task 与硬件交互以在某个时间间隔检索值
- 当调用者完成后,它会处理订阅并自动停止后台 thread/task
尝试 #1
为了证明这一点,我编写了以下控制台应用程序,但这并不符合我的预期:
public class OutputParameters
{
public Guid Id { get; set; }
public int Value { get; set; }
}
public class Program
{
static void Main(string[] args)
{
Console.WriteLine("Requesting the polling operation");
var worker1 = Poll();
Console.WriteLine("Subscribing to start the polling operation");
var sub1 = worker1.Subscribe(
value => { Console.WriteLine($"Thread {value.Id} emitted {value.Value}"); },
ex => { Console.WriteLine($"Thread threw an exception: {ex.Message}"); },
() => { Console.WriteLine("Thread has completed"); });
Thread.Sleep(5000);
sub1.Dispose();
Console.ReadLine();
}
private static IObservable<OutputParameters> Poll()
{
return Observable.DeferAsync(Worker);
}
private static Task<IObservable<OutputParameters>> Worker(CancellationToken token)
{
var subject = new Subject<OutputParameters>();
Task.Run(async () =>
{
var id = Guid.NewGuid();
const int steps = 10;
try
{
for (var i = 1; i <= steps || token.IsCancellationRequested; i++)
{
Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}");
subject.OnNext(new OutputParameters { Id = id, Value = i });
// This will actually throw an exception if it's the active call when
// the token is cancelled.
//
await Task.Delay(1000, token);
}
}
catch (TaskCanceledException ex)
{
// Interestingly, if this is triggered because the caller unsibscribed then
// this is unneeded...the caller isn't listening for this error anymore
//
subject.OnError(ex);
}
if (token.IsCancellationRequested)
{
Console.WriteLine($"[IN THREAD] Thread {id} was cancelled");
}
else
{
Console.WriteLine($"[IN THREAD] Thread {id} exiting normally");
subject.OnCompleted();
}
}, token);
return Task.FromResult(subject.AsObservable());
}
}
上面的代码实际上似乎几乎立即取消了后台任务,因为这是输出:
Requesting the polling operation
Subscribing to start the polling operation
[IN THREAD] Thread a470e6f4-2e62-4a3c-abe6-670bce39b6de: Step 1 of 10
Thread threw an exception: A task was canceled.
[IN THREAD] Thread a470e6f4-2e62-4a3c-abe6-670bce39b6de was cancelled
尝试#2
然后我尝试对 Worker
方法做一个小改动,使其异步并等待 Task.Run
调用,如下所示:
private static async Task<IObservable<OutputParameters>> Worker(CancellationToken token)
{
var subject = new Subject<OutputParameters>();
await Task.Run(async () =>
{
...what happens in here is unchanged...
}, token);
return subject.AsObservable();
}
虽然这里的结果让后台任务看起来具有完全控制权,因为它在被取消之前 运行 持续了大约 5 秒,但是订阅回调没有输出。这是完整的输出:
Requesting the polling operation
Subscribing to start the polling operation
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 1 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 2 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 3 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 4 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 5 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a: Step 6 of 10
[IN THREAD] Thread cf416d81-c3b7-41fe-8f5a-681da368452a was cancelled
我的问题
所以很明显我没有完全理解这里发生的事情,或者在这种情况下使用 DeferAsync
是 observable 的正确创建方法。
有没有合适的方法来实现这种方法?
如果仅 RX 解决方案就足够了,这就可以了。如果你问我更干净...
static IObservable<OutputParameters> Poll()
{
const int steps = 10;
return Observable.Defer<Guid>(() => Observable.Return(Guid.NewGuid()))
.SelectMany(id =>
Observable.Generate(1, i => i <= steps, i => i + 1, i => i, _ => TimeSpan.FromMilliseconds(1000))
.ObserveOn(new EventLoopScheduler())
.Do(i => Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}"))
.Select(i => new OutputParameters { Id = id, Value = i })
);
}
解释:
Generate
就像 Rx 的 for 循环。最后一个参数控制何时发出项目。这相当于你的 for 循环 +Task.Delay
.ObserveOn
控制where/whenobservable被观察到。在这种情况下,EventLoopScheduler
将为每个订阅者启动一个新线程,并且来自该可观察对象的所有项目都将在新线程上被观察到。
来自 Enigmativity:
static IObservable<OutputParameters> Poll()
{
const int steps = 10;
return Observable.Defer<OutputParameters>(() =>
{
var id = Guid.NewGuid();
return Observable.Generate(1, i => i <= steps, i => i + 1, i => i,
_ => TimeSpan.FromMilliseconds(1000), new EventLoopScheduler())
.Do(i => Console.WriteLine($"[IN THREAD] Thread {id}: Step {i} of {steps}"))
.Select(i => new OutputParameters { Id = id, Value = i });
});
}