如何仅通过 BroadcastBlock 和 ActionBlock 使用数据流

How To Using Dataflow With Only BroadcastBlock And ActionBlock

这是我在 SO 中的第一个问题,我是使用 DataFlow 与 BroadcastBlock 和 ActionBlock 的新手,我希望我能在这里得到解决方案。 这是结构。

型号

class SampleModel
{
    public string Id { get; set; } = Guid.NewGuid().ToString();
    public bool Success { get; set; } = true;
    public object UniqueData { get; set; }

    public override string ToString()
    {
        StringBuilder sb = new StringBuilder();
        sb.AppendLine($"Id - {Id}");
        sb.AppendLine($"Success - {Success}");
        sb.AppendLine($"UniqueData - {UniqueData}");
        string tmp = sb.ToString();
        sb.Clear();
        return tmp;
    }
}

数据流逻辑

class CreateDownloadTask
{
    public async Task VeryLongProcess()
    {
        await Task.Run(async () =>
        {
            Console.WriteLine("Long Process Working..");
            await Task.Delay(TimeSpan.FromSeconds(5));
            Console.WriteLine("Long Process Done..");
        });
    }

    public async Task CreateSimpleBroadcastX<T>(T data)
    {
        Action<T> process = async model =>
        {
            Console.WriteLine("Working..");
            await VeryLongProcess();
            Console.WriteLine("Done");
        };


        var broad = new BroadcastBlock<T>(null);

        var action = new ActionBlock<T>(process);

        var dflo = new DataflowLinkOptions { PropagateCompletion = true };

        broad.LinkTo(action, dflo);

        await broad.SendAsync(data);

        broad.Complete();

        await action.Completion.ContinueWith(async tsk =>
        {
            Console.WriteLine("Continue data");
        }).ConfigureAwait(false);

        Console.WriteLine("All Done");
    }
}

来电者

var task = cdt.CreateSimpleBroadcastX<SampleModel>(new SampleModel
{
    UniqueData = cdt.GetHashCode()
});
task.GetAwaiter().GetResult();
Console.WriteLin("Completed");

我希望结果应该是

Working..
Long Process Working..
Long Process Done..
Done
Continue data
All Done
Completed

但我得到的是

Working..
Long Process Working..
Continue data
All Done
Completed
Long Process Done..
Done

当 ActionBlock 中有 async-await 时会发生这种情况。 现在,问题是,如果没有 WaitHandle 是否有可能得到我预期的结果?

也就是说,ActionBlock.Completion会等到ActionBlock里面的ActionDelegate执行完毕?

还是我说错了?

提前致谢,抱歉我的英语不好。

您的问题在这里:

Action<T> process = async model => ...

该代码创建了一个 async void 方法,该方法 should be avoided。您应该避免 async void 的原因之一是因为很难知道该方法何时完成。这正是正在发生的事情:ActionBlock<T> 无法知道您的委托何时完成,因为它是 async void.

正确的 delegate type for an asynchronous method without a return value that takes a single argumentFunc<T, Task>:

Func<T, Task> process = async model => ...

现在异步方法returns一个TaskActionBlock可以知道它什么时候完成。