如何仅通过 BroadcastBlock 和 ActionBlock 使用数据流
How To Using Dataflow With Only BroadcastBlock And ActionBlock
这是我在 SO 中的第一个问题,我是使用 DataFlow 与 BroadcastBlock 和 ActionBlock 的新手,我希望我能在这里得到解决方案。
这是结构。
型号
class SampleModel
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public bool Success { get; set; } = true;
public object UniqueData { get; set; }
public override string ToString()
{
StringBuilder sb = new StringBuilder();
sb.AppendLine($"Id - {Id}");
sb.AppendLine($"Success - {Success}");
sb.AppendLine($"UniqueData - {UniqueData}");
string tmp = sb.ToString();
sb.Clear();
return tmp;
}
}
数据流逻辑
class CreateDownloadTask
{
public async Task VeryLongProcess()
{
await Task.Run(async () =>
{
Console.WriteLine("Long Process Working..");
await Task.Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("Long Process Done..");
});
}
public async Task CreateSimpleBroadcastX<T>(T data)
{
Action<T> process = async model =>
{
Console.WriteLine("Working..");
await VeryLongProcess();
Console.WriteLine("Done");
};
var broad = new BroadcastBlock<T>(null);
var action = new ActionBlock<T>(process);
var dflo = new DataflowLinkOptions { PropagateCompletion = true };
broad.LinkTo(action, dflo);
await broad.SendAsync(data);
broad.Complete();
await action.Completion.ContinueWith(async tsk =>
{
Console.WriteLine("Continue data");
}).ConfigureAwait(false);
Console.WriteLine("All Done");
}
}
来电者
var task = cdt.CreateSimpleBroadcastX<SampleModel>(new SampleModel
{
UniqueData = cdt.GetHashCode()
});
task.GetAwaiter().GetResult();
Console.WriteLin("Completed");
我希望结果应该是
Working..
Long Process Working..
Long Process Done..
Done
Continue data
All Done
Completed
但我得到的是
Working..
Long Process Working..
Continue data
All Done
Completed
Long Process Done..
Done
当 ActionBlock 中有 async-await
时会发生这种情况。
现在,问题是,如果没有 WaitHandle
是否有可能得到我预期的结果?
也就是说,ActionBlock.Completion会等到ActionBlock里面的Action
或Delegate
执行完毕?
还是我说错了?
提前致谢,抱歉我的英语不好。
您的问题在这里:
Action<T> process = async model => ...
该代码创建了一个 async void
方法,该方法 should be avoided。您应该避免 async void
的原因之一是因为很难知道该方法何时完成。这正是正在发生的事情:ActionBlock<T>
无法知道您的委托何时完成,因为它是 async void
.
正确的 delegate type for an asynchronous method without a return value that takes a single argument 是 Func<T, Task>
:
Func<T, Task> process = async model => ...
现在异步方法returns一个Task
,ActionBlock
可以知道它什么时候完成。
这是我在 SO 中的第一个问题,我是使用 DataFlow 与 BroadcastBlock 和 ActionBlock 的新手,我希望我能在这里得到解决方案。 这是结构。
型号
class SampleModel
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public bool Success { get; set; } = true;
public object UniqueData { get; set; }
public override string ToString()
{
StringBuilder sb = new StringBuilder();
sb.AppendLine($"Id - {Id}");
sb.AppendLine($"Success - {Success}");
sb.AppendLine($"UniqueData - {UniqueData}");
string tmp = sb.ToString();
sb.Clear();
return tmp;
}
}
数据流逻辑
class CreateDownloadTask
{
public async Task VeryLongProcess()
{
await Task.Run(async () =>
{
Console.WriteLine("Long Process Working..");
await Task.Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("Long Process Done..");
});
}
public async Task CreateSimpleBroadcastX<T>(T data)
{
Action<T> process = async model =>
{
Console.WriteLine("Working..");
await VeryLongProcess();
Console.WriteLine("Done");
};
var broad = new BroadcastBlock<T>(null);
var action = new ActionBlock<T>(process);
var dflo = new DataflowLinkOptions { PropagateCompletion = true };
broad.LinkTo(action, dflo);
await broad.SendAsync(data);
broad.Complete();
await action.Completion.ContinueWith(async tsk =>
{
Console.WriteLine("Continue data");
}).ConfigureAwait(false);
Console.WriteLine("All Done");
}
}
来电者
var task = cdt.CreateSimpleBroadcastX<SampleModel>(new SampleModel
{
UniqueData = cdt.GetHashCode()
});
task.GetAwaiter().GetResult();
Console.WriteLin("Completed");
我希望结果应该是
Working..
Long Process Working..
Long Process Done..
Done
Continue data
All Done
Completed
但我得到的是
Working..
Long Process Working..
Continue data
All Done
Completed
Long Process Done..
Done
当 ActionBlock 中有 async-await
时会发生这种情况。
现在,问题是,如果没有 WaitHandle
是否有可能得到我预期的结果?
也就是说,ActionBlock.Completion会等到ActionBlock里面的Action
或Delegate
执行完毕?
还是我说错了?
提前致谢,抱歉我的英语不好。
您的问题在这里:
Action<T> process = async model => ...
该代码创建了一个 async void
方法,该方法 should be avoided。您应该避免 async void
的原因之一是因为很难知道该方法何时完成。这正是正在发生的事情:ActionBlock<T>
无法知道您的委托何时完成,因为它是 async void
.
正确的 delegate type for an asynchronous method without a return value that takes a single argument 是 Func<T, Task>
:
Func<T, Task> process = async model => ...
现在异步方法returns一个Task
,ActionBlock
可以知道它什么时候完成。