使用异常处理将 IEnumerable<Task<T>> 转换为 IObservable<T>
Convert IEnumerable<Task<T>> to IObservable<T> with exceptions handling
我想将 IEnumerable<Task<T>>
转换为 IObservable<T>
。我找到了这个 here:
的解决方案
IObservable<T> ToObservable<T>(IEnumerable<Task<T>> source)
{
return source.Select(t => t.ToObservable()).Merge();
}
对于通常的情况来说完全没问题,但我需要处理可能会在该任务中引发的异常...所以 IObservable<T>
不应该在第一次异常后死掉。
据我了解,针对此用例的建议是使用一些包装器,它将携带实际值或错误。所以我的尝试是
IObservable<Either<T, Exception>> ToObservable<T>(IEnumerable<Task<T>> source)
{
var subject = new Subject<Either<T, Exception>>();
foreach (var observable in GetIntsIEnumerable().Select(t => t.ToObservable()))
{
observable.Subscribe(i => subject.OnNext(i), e => subject.OnNext(e));
}
return subject;
}
Either<T, Exception>
借自 this article。
但这也不行,因为OnCompleted()
没有被调用。我该如何解决?我对 Rx 概念还很陌生。
这里是完整的测试代码...
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace Test
{
class Program
{
static Task Main()
{
SemaphoreSlim signal = new SemaphoreSlim(0, 1);
//GetInts1().Subscribe(
// i => Console.WriteLine($"OK: {i}"),
// e => Console.WriteLine($"ERROR: {e.Message}"),
// () => signal.Release());
GetInts2().Subscribe(r => Console.WriteLine(r.Match(
i => $"OK: {i}",
e => $"ERROR: {e.Message}")),
() => signal.Release());
return signal.WaitAsync();
}
static IObservable<int> GetInts1()
{
return GetIntsIEnumerable().Select(t => t.ToObservable()).Merge();
}
static IObservable<Either<int, Exception>> GetInts2()
{
var subject = new Subject<Either<int, Exception>>();
foreach (var observable in GetIntsIEnumerable().Select(t => t.ToObservable()))
{
observable.Subscribe(i => subject.OnNext(i), e => subject.OnNext(e));
}
return subject;
}
static IEnumerable<Task<int>> GetIntsIEnumerable()
{
Random rnd = new Random();
foreach (int i in Enumerable.Range(1, 10))
{
yield return Task.Run(async () =>
{
await Task.Delay(rnd.Next(0, 5000));
if (i == 6)
throw new ArgumentException();
return i;
});
}
}
}
/// <summary>
/// Functional data data to represent a discriminated
/// union of two possible types.
/// </summary>
/// <typeparam name="TL">Type of "Left" item.</typeparam>
/// <typeparam name="TR">Type of "Right" item.</typeparam>
public class Either<TL, TR>
{
private readonly TL left;
private readonly TR right;
private readonly bool isLeft;
public Either(TL left)
{
this.left = left;
this.isLeft = true;
}
public Either(TR right)
{
this.right = right;
this.isLeft = false;
}
public T Match<T>(Func<TL, T> leftFunc, Func<TR, T> rightFunc)
{
if (leftFunc == null)
{
throw new ArgumentNullException(nameof(leftFunc));
}
if (rightFunc == null)
{
throw new ArgumentNullException(nameof(rightFunc));
}
return this.isLeft ? leftFunc(this.left) : rightFunc(this.right);
}
/// <summary>
/// If right value is assigned, execute an action on it.
/// </summary>
/// <param name="rightAction">Action to execute.</param>
public void DoRight(Action<TR> rightAction)
{
if (rightAction == null)
{
throw new ArgumentNullException(nameof(rightAction));
}
if (!this.isLeft)
{
rightAction(this.right);
}
}
public TL LeftOrDefault() => this.Match(l => l, r => default);
public TR RightOrDefault() => this.Match(l => default, r => r);
public static implicit operator Either<TL, TR>(TL left) => new Either<TL, TR>(left);
public static implicit operator Either<TL, TR>(TR right) => new Either<TL, TR>(right);
}
}
有一个内置的机制来处理这样的错误。只需使用 .Materialize()
运算符即可将 IObservable<T>
更改为 IObservable<Notification<T>>
并允许将错误和完成视为正常值。
因此,举个例子,Observable.Return<int>(42)
产生一个值 42
和一个完成,但是 Observable.Return<int>(42).Materialize()
产生一个值 Notification.CreateOnNext<int>(42)
,然后是一个值 [=19] =],后面正常补全。
如果您有一个产生错误的序列,那么您实际上会得到一个值 Notification.CreateOnError<T>(exception)
,然后正常完成。
这意味着您可以像这样更改代码:
IObservable<Notification<T>> ToObservable<T>(IEnumerable<Task<T>> source)
{
return source.Select(t => t.ToObservable().Materialize()).Merge();
}
你的测试代码对我来说有点复杂。您永远不需要像使用它们那样使用 SemaphoreSlim
或 Subject
。
我自己写了测试代码。
void Main()
{
var r = new Random();
IEnumerable<Task<int>> source =
Enumerable
.Range(0, 10).Select(x => Task.Factory.StartNew(() =>
{
Thread.Sleep(r.Next(10000));
if (x % 3 == 0) throw new NotSupportedException($"Failed on {x}");
return x;
}));
IObservable<Notification<int>> query = source.ToObservable();
query
.Do(x =>
{
if (x.Kind == NotificationKind.OnError)
{
Console.WriteLine(x.Exception.Message);
}
})
.Where(x => x.Kind == NotificationKind.OnNext) // Only care about vales
.Select(x => x.Value)
.Subscribe(x => Console.WriteLine(x), () => Console.WriteLine("Done."));
}
public static class Ex
{
public static IObservable<Notification<T>> ToObservable<T>(this IEnumerable<Task<T>> source)
{
return source.Select(t => t.ToObservable().Materialize()).Merge();
}
}
该代码的典型 运行 生成:
Failed on 3
2
5
4
Failed on 0
Failed on 9
Failed on 6
7
1
8
Done.
Rx 库包含一个 Merge
重载,可以直接有效地合并任务,而不是将每个任务转换为中间的一次性可观察序列:
// Merges results from all source tasks into a single observable sequence.
public static IObservable<TSource> Merge<TSource>(
this IObservable<Task<TSource>> sources);
您可以使用此运算符来实现 ToObservable
方法,如下所示:
IObservable<Either<T, Exception>> ToObservable<T>(IEnumerable<Task<T>> source)
{
return source
.Select(async task =>
{
try { return new Either<T, Exception>(await task); }
catch (Exception ex) { return new Either<T, Exception>(ex); }
})
.ToObservable()
.Merge();
}
您可以将 ToObservable
运算符放在 Select
运算符之前或之后,这没有任何区别。
顺便说一句,有一个可用的简约库(Try by Stephen Cleary) that contains a Try<T>
type, which is similar in functionality to the Either
类型,但专门用于将 Exception
作为第二种类型(作为 Either<T, Exception>
)。使用此库,您可以像这样实现 ToObservable
方法:
IObservable<Try<T>> ToObservable<T>(IEnumerable<Task<T>> source)
{
return source
.Select(task => Try.Create(() => task))
.ToObservable()
.Merge();
}
这里是Try.Create
方法的定义:
// Executes the specified function, and wraps either the result or the exception.
public static Task<Try<T>> Create<T>(Func<Task<T>> func);
我想将 IEnumerable<Task<T>>
转换为 IObservable<T>
。我找到了这个 here:
IObservable<T> ToObservable<T>(IEnumerable<Task<T>> source)
{
return source.Select(t => t.ToObservable()).Merge();
}
对于通常的情况来说完全没问题,但我需要处理可能会在该任务中引发的异常...所以 IObservable<T>
不应该在第一次异常后死掉。
据我了解,针对此用例的建议是使用一些包装器,它将携带实际值或错误。所以我的尝试是
IObservable<Either<T, Exception>> ToObservable<T>(IEnumerable<Task<T>> source)
{
var subject = new Subject<Either<T, Exception>>();
foreach (var observable in GetIntsIEnumerable().Select(t => t.ToObservable()))
{
observable.Subscribe(i => subject.OnNext(i), e => subject.OnNext(e));
}
return subject;
}
Either<T, Exception>
借自 this article。
但这也不行,因为OnCompleted()
没有被调用。我该如何解决?我对 Rx 概念还很陌生。
这里是完整的测试代码...
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace Test
{
class Program
{
static Task Main()
{
SemaphoreSlim signal = new SemaphoreSlim(0, 1);
//GetInts1().Subscribe(
// i => Console.WriteLine($"OK: {i}"),
// e => Console.WriteLine($"ERROR: {e.Message}"),
// () => signal.Release());
GetInts2().Subscribe(r => Console.WriteLine(r.Match(
i => $"OK: {i}",
e => $"ERROR: {e.Message}")),
() => signal.Release());
return signal.WaitAsync();
}
static IObservable<int> GetInts1()
{
return GetIntsIEnumerable().Select(t => t.ToObservable()).Merge();
}
static IObservable<Either<int, Exception>> GetInts2()
{
var subject = new Subject<Either<int, Exception>>();
foreach (var observable in GetIntsIEnumerable().Select(t => t.ToObservable()))
{
observable.Subscribe(i => subject.OnNext(i), e => subject.OnNext(e));
}
return subject;
}
static IEnumerable<Task<int>> GetIntsIEnumerable()
{
Random rnd = new Random();
foreach (int i in Enumerable.Range(1, 10))
{
yield return Task.Run(async () =>
{
await Task.Delay(rnd.Next(0, 5000));
if (i == 6)
throw new ArgumentException();
return i;
});
}
}
}
/// <summary>
/// Functional data data to represent a discriminated
/// union of two possible types.
/// </summary>
/// <typeparam name="TL">Type of "Left" item.</typeparam>
/// <typeparam name="TR">Type of "Right" item.</typeparam>
public class Either<TL, TR>
{
private readonly TL left;
private readonly TR right;
private readonly bool isLeft;
public Either(TL left)
{
this.left = left;
this.isLeft = true;
}
public Either(TR right)
{
this.right = right;
this.isLeft = false;
}
public T Match<T>(Func<TL, T> leftFunc, Func<TR, T> rightFunc)
{
if (leftFunc == null)
{
throw new ArgumentNullException(nameof(leftFunc));
}
if (rightFunc == null)
{
throw new ArgumentNullException(nameof(rightFunc));
}
return this.isLeft ? leftFunc(this.left) : rightFunc(this.right);
}
/// <summary>
/// If right value is assigned, execute an action on it.
/// </summary>
/// <param name="rightAction">Action to execute.</param>
public void DoRight(Action<TR> rightAction)
{
if (rightAction == null)
{
throw new ArgumentNullException(nameof(rightAction));
}
if (!this.isLeft)
{
rightAction(this.right);
}
}
public TL LeftOrDefault() => this.Match(l => l, r => default);
public TR RightOrDefault() => this.Match(l => default, r => r);
public static implicit operator Either<TL, TR>(TL left) => new Either<TL, TR>(left);
public static implicit operator Either<TL, TR>(TR right) => new Either<TL, TR>(right);
}
}
有一个内置的机制来处理这样的错误。只需使用 .Materialize()
运算符即可将 IObservable<T>
更改为 IObservable<Notification<T>>
并允许将错误和完成视为正常值。
因此,举个例子,Observable.Return<int>(42)
产生一个值 42
和一个完成,但是 Observable.Return<int>(42).Materialize()
产生一个值 Notification.CreateOnNext<int>(42)
,然后是一个值 [=19] =],后面正常补全。
如果您有一个产生错误的序列,那么您实际上会得到一个值 Notification.CreateOnError<T>(exception)
,然后正常完成。
这意味着您可以像这样更改代码:
IObservable<Notification<T>> ToObservable<T>(IEnumerable<Task<T>> source)
{
return source.Select(t => t.ToObservable().Materialize()).Merge();
}
你的测试代码对我来说有点复杂。您永远不需要像使用它们那样使用 SemaphoreSlim
或 Subject
。
我自己写了测试代码。
void Main()
{
var r = new Random();
IEnumerable<Task<int>> source =
Enumerable
.Range(0, 10).Select(x => Task.Factory.StartNew(() =>
{
Thread.Sleep(r.Next(10000));
if (x % 3 == 0) throw new NotSupportedException($"Failed on {x}");
return x;
}));
IObservable<Notification<int>> query = source.ToObservable();
query
.Do(x =>
{
if (x.Kind == NotificationKind.OnError)
{
Console.WriteLine(x.Exception.Message);
}
})
.Where(x => x.Kind == NotificationKind.OnNext) // Only care about vales
.Select(x => x.Value)
.Subscribe(x => Console.WriteLine(x), () => Console.WriteLine("Done."));
}
public static class Ex
{
public static IObservable<Notification<T>> ToObservable<T>(this IEnumerable<Task<T>> source)
{
return source.Select(t => t.ToObservable().Materialize()).Merge();
}
}
该代码的典型 运行 生成:
Failed on 3 2 5 4 Failed on 0 Failed on 9 Failed on 6 7 1 8 Done.
Rx 库包含一个 Merge
重载,可以直接有效地合并任务,而不是将每个任务转换为中间的一次性可观察序列:
// Merges results from all source tasks into a single observable sequence.
public static IObservable<TSource> Merge<TSource>(
this IObservable<Task<TSource>> sources);
您可以使用此运算符来实现 ToObservable
方法,如下所示:
IObservable<Either<T, Exception>> ToObservable<T>(IEnumerable<Task<T>> source)
{
return source
.Select(async task =>
{
try { return new Either<T, Exception>(await task); }
catch (Exception ex) { return new Either<T, Exception>(ex); }
})
.ToObservable()
.Merge();
}
您可以将 ToObservable
运算符放在 Select
运算符之前或之后,这没有任何区别。
顺便说一句,有一个可用的简约库(Try by Stephen Cleary) that contains a Try<T>
type, which is similar in functionality to the Either
类型,但专门用于将 Exception
作为第二种类型(作为 Either<T, Exception>
)。使用此库,您可以像这样实现 ToObservable
方法:
IObservable<Try<T>> ToObservable<T>(IEnumerable<Task<T>> source)
{
return source
.Select(task => Try.Create(() => task))
.ToObservable()
.Merge();
}
这里是Try.Create
方法的定义:
// Executes the specified function, and wraps either the result or the exception.
public static Task<Try<T>> Create<T>(Func<Task<T>> func);