如何处理observable的async方法抛出的异常?
How to handle the exception thrown by the async method with observable?
我有一个 observable,我想用异步方法订阅这个 observable,但是每次异步方法抛出异常时,即使我将 catch 代码放在 observable 定义中,订阅也会立即处理。伪代码如下来演示这种情况:
[Fact]
public async Task Test()
{
var observable = Observable.Create<int>(observer =>
{
try
{
Enumerable.Range(1, 10).ToList().ForEach(x =>
{
observer.OnNext(x);
});
}
catch (Exception ex)
{
// get called after the exception is thrown
_testOutputHelper.WriteLine($"The exception is catch:{ex.ToString()}");
}
return Disposable.Create(() =>
{
// also get called after exception is thrown
_testOutputHelper.WriteLine("Observable Dispose");
});
});
Func<int, Task> handler = async (i) =>
{
// simulate the handler logic
await Task.Delay(TimeSpan.FromSeconds(1));
// throw the exception to test
throw new Exception($"{i}");
};
observable.Subscribe(x=>handler(x).Wait());
await Task.Delay(TimeSpan.FromSeconds(10));
}
从上面的代码中,我不明白为什么即使捕获了异常也会调用 dispose 委托(出于某种原因,我必须在可观察定义中处理异常),有什么办法可以防止当异步方法抛出异常时订阅被处理?
您的代码完全按照您的指示执行。
捕获异常的目的是让你的程序可以继续运行而不会突然停止。这正是您的代码正在做的事情:异常被捕获,然后在 catch
块之后继续执行。
如果您想让它做其他事情,您有两个选择。
- 记录异常后重新抛出异常:
catch (Exception ex)
{
// get called after the exception is thrown
_testOutputHelper.WriteLine($"The exception is catch:{ex.ToString()}");
throw;
}
然后,任何调用 Test()
的代码都将负责捕获(或不捕获)该异常。
- 将 return 移到
try
块中,并在捕获到异常时将 return 移到其他地方:
try
{
Enumerable.Range(1, 10).ToList().ForEach(x =>
{
observer.OnNext(x);
});
return Disposable.Create(() =>
{
// also get called after exception is thrown
_testOutputHelper.WriteLine("Observable Dispose");
});
}
catch (Exception ex)
{
// get called after the exception is thrown
_testOutputHelper.WriteLine($"The exception is catch:{ex.ToString()}");
return //something else
}
您可能会受益于阅读 Microsoft 关于 Exception Handling 的文档。
您的代码中发生的事情是您使用 Observable.Create
并使用以下代码填充可观察对象的直接结果:
Enumerable.Range(1, 10).ToList().ForEach(x =>
{
observer.OnNext(x);
});
Observable.Create
使用当前线程创建可观察对象,因此 Enumerable.Range(1, 10).ToList().ForEach
会立即在当前线程上执行,而对 OnNext
的调用会立即执行 handler(x).Wait()
。
不过,您会注意到异常发生在传递给 Subscribe
的委托中。内部有这样的代码:
catch (Exception exception)
{
if (!autoDetachObserver.Fail(exception))
{
throw;
}
return autoDetachObserver;
}
在订阅中捕获异常,取消订阅 - 因此 "Observable Dispose"
消息 - 然后重新抛出异常,这就是你的代码捕获它的地方。
现在,如果您想在 Rx 中正确地执行此操作,则应避免 Observable.Create
。这是一种创建可观察对象的诱人方式,但它会带来麻烦。
改为这样做:
public async Task Test()
{
Func<int, Task> handler = async (i) =>
{
// simulate the handler logic
await Task.Delay(TimeSpan.FromSeconds(1));
// throw the exception to test
throw new Exception($"{i}");
};
await
Observable
.Range(1, 10)
.SelectMany(i => Observable.FromAsync(() => handler(i)))
.LastOrDefaultAsync();
}
但是,当然,我们要处理异常。简单的方法是这样的:
public async Task Test()
{
Func<int, Task> handler = async (i) =>
{
// simulate the handler logic
await Task.Delay(TimeSpan.FromSeconds(1));
// throw the exception to test
throw new Exception($"{i}");
};
await
Observable
.Range(1, 10)
.SelectMany(i =>
Observable
.FromAsync(() => handler(i))
.Catch<Unit, Exception>(ex =>
{
Console.WriteLine($"The exception is catch:{ex.ToString()}");
return Observable.Empty<Unit>();
}))
.LastOrDefaultAsync();
}
现在输出 10 个异常错误并正常完成。
我有一个 observable,我想用异步方法订阅这个 observable,但是每次异步方法抛出异常时,即使我将 catch 代码放在 observable 定义中,订阅也会立即处理。伪代码如下来演示这种情况:
[Fact]
public async Task Test()
{
var observable = Observable.Create<int>(observer =>
{
try
{
Enumerable.Range(1, 10).ToList().ForEach(x =>
{
observer.OnNext(x);
});
}
catch (Exception ex)
{
// get called after the exception is thrown
_testOutputHelper.WriteLine($"The exception is catch:{ex.ToString()}");
}
return Disposable.Create(() =>
{
// also get called after exception is thrown
_testOutputHelper.WriteLine("Observable Dispose");
});
});
Func<int, Task> handler = async (i) =>
{
// simulate the handler logic
await Task.Delay(TimeSpan.FromSeconds(1));
// throw the exception to test
throw new Exception($"{i}");
};
observable.Subscribe(x=>handler(x).Wait());
await Task.Delay(TimeSpan.FromSeconds(10));
}
从上面的代码中,我不明白为什么即使捕获了异常也会调用 dispose 委托(出于某种原因,我必须在可观察定义中处理异常),有什么办法可以防止当异步方法抛出异常时订阅被处理?
您的代码完全按照您的指示执行。
捕获异常的目的是让你的程序可以继续运行而不会突然停止。这正是您的代码正在做的事情:异常被捕获,然后在 catch
块之后继续执行。
如果您想让它做其他事情,您有两个选择。
- 记录异常后重新抛出异常:
catch (Exception ex)
{
// get called after the exception is thrown
_testOutputHelper.WriteLine($"The exception is catch:{ex.ToString()}");
throw;
}
然后,任何调用 Test()
的代码都将负责捕获(或不捕获)该异常。
- 将 return 移到
try
块中,并在捕获到异常时将 return 移到其他地方:
try
{
Enumerable.Range(1, 10).ToList().ForEach(x =>
{
observer.OnNext(x);
});
return Disposable.Create(() =>
{
// also get called after exception is thrown
_testOutputHelper.WriteLine("Observable Dispose");
});
}
catch (Exception ex)
{
// get called after the exception is thrown
_testOutputHelper.WriteLine($"The exception is catch:{ex.ToString()}");
return //something else
}
您可能会受益于阅读 Microsoft 关于 Exception Handling 的文档。
您的代码中发生的事情是您使用 Observable.Create
并使用以下代码填充可观察对象的直接结果:
Enumerable.Range(1, 10).ToList().ForEach(x =>
{
observer.OnNext(x);
});
Observable.Create
使用当前线程创建可观察对象,因此 Enumerable.Range(1, 10).ToList().ForEach
会立即在当前线程上执行,而对 OnNext
的调用会立即执行 handler(x).Wait()
。
不过,您会注意到异常发生在传递给 Subscribe
的委托中。内部有这样的代码:
catch (Exception exception)
{
if (!autoDetachObserver.Fail(exception))
{
throw;
}
return autoDetachObserver;
}
在订阅中捕获异常,取消订阅 - 因此 "Observable Dispose"
消息 - 然后重新抛出异常,这就是你的代码捕获它的地方。
现在,如果您想在 Rx 中正确地执行此操作,则应避免 Observable.Create
。这是一种创建可观察对象的诱人方式,但它会带来麻烦。
改为这样做:
public async Task Test()
{
Func<int, Task> handler = async (i) =>
{
// simulate the handler logic
await Task.Delay(TimeSpan.FromSeconds(1));
// throw the exception to test
throw new Exception($"{i}");
};
await
Observable
.Range(1, 10)
.SelectMany(i => Observable.FromAsync(() => handler(i)))
.LastOrDefaultAsync();
}
但是,当然,我们要处理异常。简单的方法是这样的:
public async Task Test()
{
Func<int, Task> handler = async (i) =>
{
// simulate the handler logic
await Task.Delay(TimeSpan.FromSeconds(1));
// throw the exception to test
throw new Exception($"{i}");
};
await
Observable
.Range(1, 10)
.SelectMany(i =>
Observable
.FromAsync(() => handler(i))
.Catch<Unit, Exception>(ex =>
{
Console.WriteLine($"The exception is catch:{ex.ToString()}");
return Observable.Empty<Unit>();
}))
.LastOrDefaultAsync();
}
现在输出 10 个异常错误并正常完成。