合并多个 IAsyncEnumerable 流

Merge multiple IAsyncEnumerable streams

随着 Mediatr 10 的发布,现在有一种范例允许开发人员创建由 IAsyncEnumerable 提供支持的流。我正在利用这种范例来创建多个不同的文件系统观察器来监视多个文件夹。为了监控文件夹,我使用了两种不同的方法:轮询和 FileSystemWatcher。作为我的管道的一部分,所有不同的文件夹监视器都聚合到一个 IEnumerable<IAsyncEnumerable<FileRecord> 中。在每种类型的观察者中,都有一个内部循环,该循环一直运行到通过 CancellationToken.

请求取消为止

这是投票观察者:

public class PolledFileStreamHandler : 
    IStreamRequestHandler<PolledFileStream, FileRecord>
{
    private readonly ISeenFileStore _seenFileStore;
    private readonly IPublisher _publisher;
    private readonly ILogger<PolledFileStreamHandler> _logger;

    public PolledFileStreamHandler(
        ISeenFileStore seenFileStore, 
        IPublisher publisher, 
        ILogger<PolledFileStreamHandler> logger)
    {
        _seenFileStore = seenFileStore;
        _publisher = publisher;
        _logger = logger;
    }

    public async IAsyncEnumerable<FileRecord> Handle(
        PolledFileStream request, 
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        var queue = new ConcurrentQueue<FileRecord>();
        while (!cancellationToken.IsCancellationRequested)
        {
            var files = Directory.EnumerateFiles(request.Folder)
                .Where(f => !_seenFileStore.Contains(f));

            await Parallel.ForEachAsync(files, CancellationToken.None, async (f,t) =>
            {
                var info = new FileRecord(f);
                
                _seenFileStore.Add(f);
                await _publisher.Publish(new FileSeenNotification { FileInfo = info }, t);
                queue.Enqueue(info);
            });
            
            // TODO: Try mixing the above parallel task with the serving task... Might be chaos...

            while (!queue.IsEmpty)
            {
                if (queue.TryDequeue(out var result))
                    yield return result;
            }

            _logger.LogInformation("PolledFileStreamHandler watching {Directory} at: {Time}", request.Folder, DateTimeOffset.Now);
            
            await Task.Delay(request.Interval, cancellationToken)
                .ContinueWith(_ => {}, CancellationToken.None);
        }
    }
}

和 FileSystemWatcher

public class FileSystemStreamHandler : 
    IStreamRequestHandler<FileSystemStream, FileRecord>
{
    private readonly ISeenFileStore _seenFileStore;
    private readonly ILogger<FileSystemStreamHandler> _logger;
    private readonly IPublisher _publisher;
    private readonly ConcurrentQueue<FileRecord> _queue;

    private Action<object, FileSystemEventArgs>? _tearDown;

    public FileSystemStreamHandler(
        ISeenFileStore seenFileStore, 
        ILogger<FileSystemStreamHandler> logger, 
        IPublisher publisher)
    {
        _seenFileStore = seenFileStore;
        _logger = logger;
        _publisher = publisher;
        _queue = new ConcurrentQueue<FileRecord>();
    }

    public async IAsyncEnumerable<FileRecord> Handle(
        FileSystemStream request, 
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        var watcher = SetupWatcher(request.Folder, cancellationToken);
        
        while (!cancellationToken.IsCancellationRequested)
        {
            if (_queue.TryDequeue(out var record))
                yield return record;

            await Task.Delay(100, cancellationToken)
                .ContinueWith(_ => {}, CancellationToken.None);
        }
        
        TearDownWatcher(watcher);
    }
    
    private FileSystemWatcher SetupWatcher(string folder, CancellationToken cancellation)
    {
        var watcher = new FileSystemWatcher(folder);
        watcher.NotifyFilter = NotifyFilters.Attributes
                               | NotifyFilters.CreationTime
                               | NotifyFilters.DirectoryName
                               | NotifyFilters.FileName
                               | NotifyFilters.LastAccess
                               | NotifyFilters.LastWrite
                               | NotifyFilters.Security
                               | NotifyFilters.Size;
        watcher.EnableRaisingEvents = true;
        _tearDown = (_, args) => OnWatcherOnChanged(args, cancellation);
        watcher.Created += _tearDown.Invoke;

        return watcher;
    }
    
    private async void OnWatcherOnChanged(FileSystemEventArgs args, CancellationToken cancellationToken)
    {
        var path = args.FullPath;

        if (_seenFileStore.Contains(path)) return;
            
        _seenFileStore.Add(path);

        try
        {
            if ((File.GetAttributes(path) & FileAttributes.Directory) != 0) return;
        }
        catch (FileNotFoundException)
        {
            _logger.LogWarning("File {File} was not found. During a routine check. Will not be broadcast", path);
            return;
        }
            
        var record = new FileRecord(path);
        _queue.Enqueue(record);
        await _publisher.Publish(new FileSeenNotification { FileInfo = record }, cancellationToken);
    }

    private void TearDownWatcher(FileSystemWatcher watcher)
    {
        if (_tearDown != null)
            watcher.Created -= _tearDown.Invoke;
    }
}

最后,class 将所有内容联系在一起并尝试监视流(在 StartAsync 方法中)。您会注意到来自 System.Interactive.AsyncMerge 运算符的存在,目前未按预期运行。

public class StreamedFolderWatcher : IDisposable
{
    private readonly ConcurrentBag<Func<IAsyncEnumerable<FileRecord>>> _streams;
    private CancellationTokenSource? _cancellationTokenSource;
    private readonly IMediator _mediator;
    private readonly ILogger<StreamedFolderWatcher> _logger;

    public StreamedFolderWatcher(
        IMediator mediator,
        IEnumerable<IFileStream> fileStreams, 
        ILogger<StreamedFolderWatcher> logger)
    {
        _mediator = mediator;
        _logger = logger;
        _streams = new ConcurrentBag<Func<IAsyncEnumerable<FileRecord>>>();
        _cancellationTokenSource = new CancellationTokenSource();

        fileStreams.ToList()
            .ForEach(f => AddStream(f, _cancellationTokenSource.Token));
    }

    private void AddStream<T>(
        T request, 
        CancellationToken cancellationToken) 
        where T : IStreamRequest<FileRecord>
    {
        _streams.Add(() => _mediator.CreateStream(request, cancellationToken));
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        _cancellationTokenSource = CancellationTokenSource
            .CreateLinkedTokenSource(cancellationToken);

        var streams = _streams.Select(s => s()).ToList();
        while (!cancellationToken.IsCancellationRequested)
        {
            await foreach (var file in streams.Merge().WithCancellation(cancellationToken))
            {
                _logger.LogInformation("Incoming file {File}", file);
            }
            
            await Task.Delay(1000, cancellationToken)
                .ContinueWith(_ => {}, CancellationToken.None);
        }
    }

    public Task StopAsync()
    {
        _cancellationTokenSource?.Cancel();

        return Task.CompletedTask;
    }

    public void Dispose()
    {
        _cancellationTokenSource?.Dispose();
        GC.SuppressFinalize(this);
    }
}

我对 Merge 行为的期望是,如果我有 3 个 IAsyncEnumerables,则每个项目都应在生成后立即发出。相反,除非我将 yield break 放置在循环中的某个位置,否则获取的第一个 IStreamRequestHandler 将无限执行直到取消标记强制停止。

如何将多个输入 IAsyncEnumerables 合并到一个长期存在的输出流中,每次产生结果时都会发出?

最小可重现样本

static async IAsyncEnumerable<(Guid Id, int Value)> CreateSequence(
    [EnumeratorCancellation] CancellationToken cancellationToken)
{
    var random = new Random();
    var id = Guid.NewGuid();
    while (!cancellationToken.IsCancellationRequested)
    {
        await Task.Delay(TimeSpan.FromMilliseconds(random.Next(100, 1000)));
        yield return (id, random.Next(0, 10));
    }
}

var token = new CancellationTokenSource();
var sequences = Enumerable.Range(0, 10)
    .Select(_ => CreateSequence(token.Token));
var merged = sequences.Merge();

await foreach (var (id, value) in merged)
{
    Console.WriteLine($"[{DateTime.Now.ToShortTimeString()}] Value {value} Emitted from {id}");
}

我设法想出了一个可行的,但可能效率低下且可能存在错误的解决方案。通过将每个 IAsyncEnumerable 放入其自己的后台任务中,我能够将每个任务发送到一个线程安全队列中,并在每个任务可用时在其中提供。

public static async IAsyncEnumerable<TSource> MergeAsyncEnumerable<TSource>(
    this IEnumerable<IAsyncEnumerable<TSource>> sources,
    TimeSpan? debounceTime = default,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var queue = new ConcurrentQueue<TSource>();
    var tasks = SetupCollections(sources, queue, cancellationToken);
    
    while (!Task.WhenAll(tasks).IsCompleted)
    {
        while (!queue.IsEmpty)
            if (queue.TryDequeue(out var record))
                yield return record;
            
        // Small debounce to prevent an infinite loop from just spinning. 
        await WaitIfDebounce(debounceTime, cancellationToken);
    }

    await Task.CompletedTask;
}

private static Task WaitIfDebounce(
    TimeSpan? debounceTime,
    CancellationToken cancellationToken)
{
    return debounceTime.HasValue
        ? Task.Delay(debounceTime.Value, cancellationToken)
            .ContinueWith(_ => { }, CancellationToken.None)
        : Task.CompletedTask;
}

private static IList<Task> SetupCollections<TSource>(
    IEnumerable<IAsyncEnumerable<TSource>> sources,
    ConcurrentQueue<TSource> queue,
    CancellationToken cancellationToken)
{
    return sources
        .Select(s => Task.Run(async () =>
        {
            await foreach (var file in s.WithCancellation(cancellationToken)) 
                queue.Enqueue(file);
        }, cancellationToken))
        .ToList();
}

Rx 团队似乎搞砸了 Merge 运算符,并创建了具有不同行为的重载。此重载支持并发:

public static IAsyncEnumerable<TSource> Merge<TSource>(
    params IAsyncEnumerable<TSource>[] sources);

此重载不支持并发:

public static IAsyncEnumerable<TSource> Merge<TSource>(
    this IEnumerable<IAsyncEnumerable<TSource>> sources);

来自source code里面的评论:

// REVIEW:
// This implementation does not exploit concurrency. We should not introduce such
// behavior in order to avoid breaking changes, but we could introduce a parallel
// ConcurrentMerge implementation. It is unfortunate though that the Merge
// overload accepting an array has always been concurrent, so we can't change that
// either (in order to have consistency where Merge is non-concurrent, and
// ConcurrentMerge is).

所以你要做的是在 Merge().

之前转换你的可枚举 .ToArray()