重试 tpl 数据流 with/without Rx
Retry tpl dataflow with/without Rx
我正在使用 .net 响应式扩展和 TPL 数据流。这是我的管道:
我正在从一些外部源获取数据点作为流,然后我正在使用数据流 TransformBlocks 转换数据点。在此之后,我使用 Rx 缓冲区缓冲流式传输点 1 秒,最后我使用数据流 Actionblock 来 post REST 端点上的这些缓冲数据点。
我想在出现暂时性错误时重试 REST post 操作。我应该在哪里重试:
- 缓冲后?
- 在操作块内?
- 重试时连续流怎么办?我不想错过任何数据。
只是从提供的高级概述开始,我认为在最后的 ActionBlock
中重试是最容易的。您的 ActionBlock
会 post 并在结束前确认成功。根据数据量的不同,这种方法可以让您尽可能多地处理 ActionBlock
,而不必太担心掉落的项目,实际上不应该有任何掉落的项目。如果单个或多个 ActionBlock
实例失败 post,您的项目仍将根据您设置缓冲区和输入队列的方式进行流式传输和分发,等待它们在线上的机会。
编辑
只是一些伪代码,但这需要一批数据点,IEnumerable,并尝试 post 它们五次。有界容量将导致处理程序的每个实例排队 1000 个批次,并行性将在操作块之间分配批次。可选地,可以在 ActionBlock
之前添加一个无限制的 Buffer
来保存所有传入的批次。您需要注意您的生产者(流)不会大大超过 运行 您的消费者 REST 服务。
public void ConfigureFinalActionBlock() {
var dataPointBuffer = new BufferBlock<IEnumerable<Datapoint>>(new DataflowBlockOptions() {
BoundedCapacity = DataflowBlockOptions.Unbounded
});
var options = new ExecutionDataflowBlockOptions() {
BoundedCapacity = 1000,
MaxDegreeOfParallelism = Environment.ProcessorCount
};
var restBlock = new ActionBlock<IEnumerable<Datapoint>>(async (data) => {
var success = false;
var attempts = 0;
while (!success && attempts < 5) {
attempts++;
success = await MyApiPostAsync(data);
}
}, options);
dataPointBuffer.LinkTo(restBlock, new DataflowLinkOptions() {
PropagateCompletion = true
});
我正在使用 .net 响应式扩展和 TPL 数据流。这是我的管道:
我正在从一些外部源获取数据点作为流,然后我正在使用数据流 TransformBlocks 转换数据点。在此之后,我使用 Rx 缓冲区缓冲流式传输点 1 秒,最后我使用数据流 Actionblock 来 post REST 端点上的这些缓冲数据点。
我想在出现暂时性错误时重试 REST post 操作。我应该在哪里重试:
- 缓冲后?
- 在操作块内?
- 重试时连续流怎么办?我不想错过任何数据。
只是从提供的高级概述开始,我认为在最后的 ActionBlock
中重试是最容易的。您的 ActionBlock
会 post 并在结束前确认成功。根据数据量的不同,这种方法可以让您尽可能多地处理 ActionBlock
,而不必太担心掉落的项目,实际上不应该有任何掉落的项目。如果单个或多个 ActionBlock
实例失败 post,您的项目仍将根据您设置缓冲区和输入队列的方式进行流式传输和分发,等待它们在线上的机会。
编辑
只是一些伪代码,但这需要一批数据点,IEnumerable,并尝试 post 它们五次。有界容量将导致处理程序的每个实例排队 1000 个批次,并行性将在操作块之间分配批次。可选地,可以在 ActionBlock
之前添加一个无限制的 Buffer
来保存所有传入的批次。您需要注意您的生产者(流)不会大大超过 运行 您的消费者 REST 服务。
public void ConfigureFinalActionBlock() {
var dataPointBuffer = new BufferBlock<IEnumerable<Datapoint>>(new DataflowBlockOptions() {
BoundedCapacity = DataflowBlockOptions.Unbounded
});
var options = new ExecutionDataflowBlockOptions() {
BoundedCapacity = 1000,
MaxDegreeOfParallelism = Environment.ProcessorCount
};
var restBlock = new ActionBlock<IEnumerable<Datapoint>>(async (data) => {
var success = false;
var attempts = 0;
while (!success && attempts < 5) {
attempts++;
success = await MyApiPostAsync(data);
}
}, options);
dataPointBuffer.LinkTo(restBlock, new DataflowLinkOptions() {
PropagateCompletion = true
});