如果订阅了 IObservable,控制台应用程序不会正常终止
Console application does not gracefully terminate if subscribed to IObservable
有一个控制台应用程序并使用 Nito.AsyncEx 我有以下入口点。
static int Main(string[] args)
{
var result = -1;
try
{
result = AsyncContext.Run(() => MainAsync(args));
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
#if DEBUG
Console.WriteLine("Press any key to terminate...");
Console.ReadKey();
#endif
return result;
}
我IDisposable
class暴露了一个IObservable<T>
属性。
private readonly Subject<Tuple<IReadOnlyCollection<dynamic>, string>> _onDataResultsSubject = new Subject<Tuple<IReadOnlyCollection<dynamic>, string>>();
public IObservable<Tuple<IReadOnlyCollection<dynamic>, string>> OnDataResults => _onDataResultsSubject;
在MainAsync
我订阅了OnDataResults
属性.
using (var processor = new Processor())
{
var eventDisposable = processor.OnDataResults
.Select(tuple => Tuple.Create(DataResultsToChunks(tuple.Item1, dataChunkSize), tuple.Item2))
.Select(tuple => DataChunksToMessageEnvelopes(tuple.Item1, tuple.Item2))
.SelectMany(messageEnvelope => PublishMessagesAsync(messageEnvelope, messagingService))
.Subscribe(messagesSent =>
{
var result = messagesSent.Select(p => p.ToString())
.Aggregate((p1, p2) => $"{p1}, {p2}");
Console.WriteLine($"Message sent {result}");
},
ex => Console.Error.WriteLine($"Error: {ex}"));
await processor.ProcessAsync(migrationConfigs).ConfigureAwait(false);
eventDisposable.Dispose();
}
当应用程序成功完成并打印出 Console.WriteLine("Press any key to terminate...");
时,按任意键不会终止应用程序。即使在发布模式下,控制台 window 仍然打开并挂起。
删除 IDisposable 订阅一切正常。
题中演示的代码和@clint抓到的疑点与.SelectMany(messageEnvelope => PublishMessagesAsync(messageEnvelope, messagingService))
有关。
似乎 PublishMessagesAsync
持有与 RabbitMQ 连接的 messagingService
实例。
因为 messagingService
是我自己的实现和实现 IDisposable
调用 Dispose()
并在内部清理资源 Reactive Extensions 将不再保留实例。
爱RX!总是推动你更好的编程:-)
有一个控制台应用程序并使用 Nito.AsyncEx 我有以下入口点。
static int Main(string[] args)
{
var result = -1;
try
{
result = AsyncContext.Run(() => MainAsync(args));
}
catch (Exception ex)
{
Console.Error.WriteLine(ex);
}
#if DEBUG
Console.WriteLine("Press any key to terminate...");
Console.ReadKey();
#endif
return result;
}
我IDisposable
class暴露了一个IObservable<T>
属性。
private readonly Subject<Tuple<IReadOnlyCollection<dynamic>, string>> _onDataResultsSubject = new Subject<Tuple<IReadOnlyCollection<dynamic>, string>>();
public IObservable<Tuple<IReadOnlyCollection<dynamic>, string>> OnDataResults => _onDataResultsSubject;
在MainAsync
我订阅了OnDataResults
属性.
using (var processor = new Processor())
{
var eventDisposable = processor.OnDataResults
.Select(tuple => Tuple.Create(DataResultsToChunks(tuple.Item1, dataChunkSize), tuple.Item2))
.Select(tuple => DataChunksToMessageEnvelopes(tuple.Item1, tuple.Item2))
.SelectMany(messageEnvelope => PublishMessagesAsync(messageEnvelope, messagingService))
.Subscribe(messagesSent =>
{
var result = messagesSent.Select(p => p.ToString())
.Aggregate((p1, p2) => $"{p1}, {p2}");
Console.WriteLine($"Message sent {result}");
},
ex => Console.Error.WriteLine($"Error: {ex}"));
await processor.ProcessAsync(migrationConfigs).ConfigureAwait(false);
eventDisposable.Dispose();
}
当应用程序成功完成并打印出 Console.WriteLine("Press any key to terminate...");
时,按任意键不会终止应用程序。即使在发布模式下,控制台 window 仍然打开并挂起。
删除 IDisposable 订阅一切正常。
题中演示的代码和@clint抓到的疑点与.SelectMany(messageEnvelope => PublishMessagesAsync(messageEnvelope, messagingService))
有关。
似乎 PublishMessagesAsync
持有与 RabbitMQ 连接的 messagingService
实例。
因为 messagingService
是我自己的实现和实现 IDisposable
调用 Dispose()
并在内部清理资源 Reactive Extensions 将不再保留实例。
爱RX!总是推动你更好的编程:-)