在 TPL 数据流循环中完成
Completion in TPL Dataflow Loops
我在确定如何检测循环 TPL 数据流中的完成时遇到问题。
我在数据流的一部分有一个反馈循环,它正在向远程服务器发出 GET
请求并处理数据响应(用更多数据流转换这些响应,然后提交结果)。
数据源将其结果分成 1000 条记录的页面,并且不会告诉我它有多少页可供我使用。我必须继续阅读,直到我得到不到一整页的数据。
通常页数是1页,经常是10页,偶尔有1000页。
一开始我有很多请求要获取。
我希望能够使用一个线程池来处理这个,这一切都很好,我可以对多个数据请求进行排队并同时请求它们。如果我偶然发现需要获取大量页面的实例,我想为此使用我的所有线程。我不想留下一个线程在其他线程完成时被搅动。
我遇到的问题是当我将此逻辑放入数据流时,例如:
//generate initial requests for activity
var request = new TransformManyBlock<int, DataRequest>(cmp => QueueRequests(cmp));
//fetch the initial requests and feedback more requests to our input buffer if we need to
TransformBlock<DataRequest, DataResponse> fetch = null;
fetch = new TransformBlock<DataRequest, DataResponse>(async req =>
{
var resp = await Fetch(req);
if (resp.Results.Count == 1000)
await fetch.SendAsync(QueueAnotherRequest(req));
return resp;
}
, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });
//commit each type of request
var commit = new ActionBlock<DataResponse>(async resp => await Commit(resp));
request.LinkTo(fetch);
fetch.LinkTo(commit);
//when are we complete?
QueueRequests
生成 IEnumerable<DataRequest>
。我立即将下 N 个页面请求排队,接受这意味着我发送的呼叫比我需要的略多。 DataRequest 实例共享一个 LastPage 计数器,以避免不必要地发出我们知道在最后一页之后的请求。这一切都很好。
问题:
如果我通过将更多请求反馈到 fetch 的输入缓冲区来循环,如我在本例中所示,那么我会遇到如何发出(甚至检测)完成信号的问题。我无法设置从请求中提取完成,因为一旦设置完成我就不能再反馈了。
我可以在获取时监视输入和输出缓冲区是否为空,但我认为当我设置完成时我冒着获取仍然忙于请求的风险,从而防止对额外页面的请求排队。
我可以通过某种方式知道 fetch 正忙(有输入或正忙于处理输入)。
我是否缺少解决此问题的 obvious/straightforward 方法?
我可以在提取中循环,而不是排队更多的请求。问题是我希望能够使用设定的最大线程数来限制我对远程服务器所做的事情。块内的并行循环能否与块本身共享一个调度程序,并通过调度程序控制生成的线程数?
我可以为 fetch 创建一个自定义转换块来处理完成信号。对于这样一个简单的场景,似乎需要做很多工作。
非常感谢您提供的任何帮助!
现在我已经在获取块中添加了一个简单的忙碌状态计数器:-
int fetch_busy = 0;
TransformBlock<DataRequest, DataResponse> fetch_activity=null;
fetch = new TransformBlock<DataRequest, ActivityResponse>(async req =>
{
try
{
Interlocked.Increment(ref fetch_busy);
var resp = await Fetch(req);
if (resp.Results.Count == 1000)
{
await fetch.SendAsync( QueueAnotherRequest(req) );
}
Interlocked.Decrement(ref fetch_busy);
return resp;
}
catch (Exception ex)
{
Interlocked.Decrement(ref fetch_busy);
throw ex;
}
}
, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });
然后我用它来表示完成如下:-
request.Completion.ContinueWith(async _ =>
{
while ( fetch.InputCount > 0 || fetch_busy > 0 )
{
await Task.Delay(100);
}
fetch.Complete();
});
这看起来不太优雅,但我认为应该可行。
在 TPL 数据流中,您可以 link the blocks with DataflowLinkOptions
with specifying the propagation of completion of the block:
request.LinkTo(fetch, new DataflowLinkOptions { PropagateCompletion = true });
fetch.LinkTo(commit, new DataflowLinkOptions { PropagateCompletion = true });
之后,您只需为 request
块调用 Complete()
方法,就大功告成了!
// the completion will be propagated to all the blocks
request.Complete();
你应该使用的最后一件事是最后一个块的Completion
任务属性:
commit.Completion.ContinueWith(t =>
{
/* check the status of the task and correctness of the requests handling */
});
我在确定如何检测循环 TPL 数据流中的完成时遇到问题。
我在数据流的一部分有一个反馈循环,它正在向远程服务器发出 GET
请求并处理数据响应(用更多数据流转换这些响应,然后提交结果)。
数据源将其结果分成 1000 条记录的页面,并且不会告诉我它有多少页可供我使用。我必须继续阅读,直到我得到不到一整页的数据。
通常页数是1页,经常是10页,偶尔有1000页。
一开始我有很多请求要获取。
我希望能够使用一个线程池来处理这个,这一切都很好,我可以对多个数据请求进行排队并同时请求它们。如果我偶然发现需要获取大量页面的实例,我想为此使用我的所有线程。我不想留下一个线程在其他线程完成时被搅动。
我遇到的问题是当我将此逻辑放入数据流时,例如:
//generate initial requests for activity
var request = new TransformManyBlock<int, DataRequest>(cmp => QueueRequests(cmp));
//fetch the initial requests and feedback more requests to our input buffer if we need to
TransformBlock<DataRequest, DataResponse> fetch = null;
fetch = new TransformBlock<DataRequest, DataResponse>(async req =>
{
var resp = await Fetch(req);
if (resp.Results.Count == 1000)
await fetch.SendAsync(QueueAnotherRequest(req));
return resp;
}
, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });
//commit each type of request
var commit = new ActionBlock<DataResponse>(async resp => await Commit(resp));
request.LinkTo(fetch);
fetch.LinkTo(commit);
//when are we complete?
QueueRequests
生成 IEnumerable<DataRequest>
。我立即将下 N 个页面请求排队,接受这意味着我发送的呼叫比我需要的略多。 DataRequest 实例共享一个 LastPage 计数器,以避免不必要地发出我们知道在最后一页之后的请求。这一切都很好。
问题:
如果我通过将更多请求反馈到 fetch 的输入缓冲区来循环,如我在本例中所示,那么我会遇到如何发出(甚至检测)完成信号的问题。我无法设置从请求中提取完成,因为一旦设置完成我就不能再反馈了。
我可以在获取时监视输入和输出缓冲区是否为空,但我认为当我设置完成时我冒着获取仍然忙于请求的风险,从而防止对额外页面的请求排队。
我可以通过某种方式知道 fetch 正忙(有输入或正忙于处理输入)。
我是否缺少解决此问题的 obvious/straightforward 方法?
我可以在提取中循环,而不是排队更多的请求。问题是我希望能够使用设定的最大线程数来限制我对远程服务器所做的事情。块内的并行循环能否与块本身共享一个调度程序,并通过调度程序控制生成的线程数?
我可以为 fetch 创建一个自定义转换块来处理完成信号。对于这样一个简单的场景,似乎需要做很多工作。
非常感谢您提供的任何帮助!
现在我已经在获取块中添加了一个简单的忙碌状态计数器:-
int fetch_busy = 0;
TransformBlock<DataRequest, DataResponse> fetch_activity=null;
fetch = new TransformBlock<DataRequest, ActivityResponse>(async req =>
{
try
{
Interlocked.Increment(ref fetch_busy);
var resp = await Fetch(req);
if (resp.Results.Count == 1000)
{
await fetch.SendAsync( QueueAnotherRequest(req) );
}
Interlocked.Decrement(ref fetch_busy);
return resp;
}
catch (Exception ex)
{
Interlocked.Decrement(ref fetch_busy);
throw ex;
}
}
, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });
然后我用它来表示完成如下:-
request.Completion.ContinueWith(async _ =>
{
while ( fetch.InputCount > 0 || fetch_busy > 0 )
{
await Task.Delay(100);
}
fetch.Complete();
});
这看起来不太优雅,但我认为应该可行。
在 TPL 数据流中,您可以 link the blocks with DataflowLinkOptions
with specifying the propagation of completion of the block:
request.LinkTo(fetch, new DataflowLinkOptions { PropagateCompletion = true });
fetch.LinkTo(commit, new DataflowLinkOptions { PropagateCompletion = true });
之后,您只需为 request
块调用 Complete()
方法,就大功告成了!
// the completion will be propagated to all the blocks
request.Complete();
你应该使用的最后一件事是最后一个块的Completion
任务属性:
commit.Completion.ContinueWith(t =>
{
/* check the status of the task and correctness of the requests handling */
});