TPL 完成与完成
TPL Complete vs Completion
我需要从遗留数据库导入客户相关数据,并在此过程中执行多项转换。这意味着单个条目需要执行额外的 "events"(同步产品、创建发票等)。
我最初的解决方案是一种简单的并行方法。
它工作正常,但 有时 它有问题。如果当前处理的客户需要等待相同类型的事件,他们的处理队列可能会卡住并最终超时,导致每个底层事件也失败(它们取决于失败的事件)。它不会一直发生,但很烦人。
于是我又有了一个想法,分批工作。我的意思是不仅要限制同时处理的客户数量,还要限制广播到队列的事件数量。在四处寻找想法时,我发现 this answer, which points to the TPL DataFlow.
我做了一个骨架来熟悉一下。我设置了一个简单的管道,但我对 Complete()
和等待 Completion()
的用法有点困惑。
步骤如下
- 制作一个数字列表(要导入的客户的 ID)- 这在导入逻辑之外,它只是为了能够触发其余的逻辑
- 创建一个
BatchBlock
(能够限制同时处理的客户数量)
- 根据 ID (
TransformBlock<int, MyClass1>
) 创建单个 MyClass1
项目
- 执行一些逻辑并生成
MyClass2
(TransformManyBlock<MyClass1, MyClass2>
) 的集合 - 例如,睡眠 1 秒
- 对集合中的每个项目执行一些逻辑 (
ActionBlock<MyClass2>
) - 例如,休眠 1 秒
完整代码如下:
public static class Program
{
private static void Main(string[] args)
{
var batchBlock = new BatchBlock<int>(2);
for (var i = 1; i < 10; i++)
{
batchBlock.Post(i);
}
batchBlock.Complete();
while (batchBlock.TryReceive(null, out var ids))
{
var transformBlock = new TransformBlock<int, MyClass1>(delegate (int id)
{
Console.WriteLine($"TransformBlock(id: {id})");
return new MyClass1(id, "Star Wars");
});
var transformManyBlock = new TransformManyBlock<MyClass1, MyClass2>(delegate (MyClass1 myClass1)
{
Console.WriteLine($"TransformManyBlock(myClass1: {myClass1.Id}|{myClass1.Value})");
Thread.Sleep(1000);
return GetMyClass22Values(myClass1);
});
var actionBlock = new ActionBlock<MyClass2>(delegate (MyClass2 myClass2)
{
Console.WriteLine($"ActionBlock(myClass2: {myClass2.Id}|{myClass2.Value})");
Thread.Sleep(1000);
});
transformBlock.LinkTo(transformManyBlock);
transformManyBlock.LinkTo(actionBlock);
foreach (var id in ids)
{
transformBlock.Post(id);
}
// this is the point when I'm not 100% sure
//transformBlock.Complete();
//transformManyBlock.Complete();
//transformManyBlock.Completion.Wait();
actionBlock.Complete();
actionBlock.Completion.Wait();
}
Console.WriteLine();
Console.WriteLine("Press any key to continue...");
Console.ReadKey();
}
private static IEnumerable<MyClass2> GetMyClass22Values(MyClass1 myClass1)
{
return new List<MyClass2>
{
new MyClass2(1, myClass1.Id+ " did this"),
new MyClass2(2, myClass1.Id+ " did that"),
new MyClass2(3, myClass1.Id+ " did this again")
};
}
}
public class MyClass1
{
public MyClass1(int id, string value)
{
Id = id;
Value = value;
}
public int Id { get; set; }
public string Value { get; set; }
}
public class MyClass2
{
public MyClass1(int id, string value)
{
Id = id;
Value = value;
}
public int Id { get; set; }
public string Value { get; set; }
}
所以我纠结的点是结束,我需要调用 Complete()
或等待 Completion
。我似乎找不到合适的组合。我希望看到如下输出:
TransformBlock(id: 1)
TransformBlock(id: 2)
TransformManyBlock(myClass1: 1|Star Wars)
TransformManyBlock(myClass1: 2|Star Wars)
ActionBlock(myClass2: 1|1 did this)
ActionBlock(myClass2: 2|1 did that)
ActionBlock(myClass2: 3|1 did this again)
ActionBlock(myClass2: 1|2 did this)
ActionBlock(myClass2: 2|2 did that)
ActionBlock(myClass2: 3|2 did this again)
TransformBlock(id: 3)
TransformBlock(id: 4)
TransformManyBlock(myClass1: 3|Star Wars)
TransformManyBlock(myClass1: 4|Star Wars)
ActionBlock(myClass2: 1|3 did this)
ActionBlock(myClass2: 2|3 did that)
ActionBlock(myClass2: 3|3 did this again)
ActionBlock(myClass2: 1|4 did this)
ActionBlock(myClass2: 2|4 did that)
ActionBlock(myClass2: 3|4 did this again)
[the rest of the items]
Press any key to exit...
谁能给我指出正确的方向?
就快完成了,您需要在管道中的第一个块上调用 Complete
,然后在最后一个块上调用 await
Completion
。然后在您的链接中,您需要像这样传播完成:
private async static void Main(string[] args) {
var transformBlock = new TransformBlock<int, MyClass1>(delegate (int id)
{
Console.WriteLine($"TransformBlock(id: {id})");
return new MyClass1(id, "Star Wars");
});
var transformManyBlock = new TransformManyBlock<MyClass1, MyClass2>(delegate (MyClass1 myClass1)
{
Console.WriteLine($"TransformManyBlock(myClass1: {myClass1.Id}|{myClass1.Value})");
Thread.Sleep(1000);
return GetMyClass22Values(myClass1);
});
var actionBlock = new ActionBlock<MyClass2>(delegate (MyClass2 myClass2)
{
Console.WriteLine($"ActionBlock(myClass2: {myClass2.Id}|{myClass2.Value})");
Thread.Sleep(1000);
});
//propagate completion
transformBlock.LinkTo(transformManyBlock, new DataflowLinkOptions() { PropagateCompletion = true });
transformManyBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true});
foreach(var id in ids) {
transformBlock.Post(id);
}
//Complete the first block
transformBlock.Complete();
//wait for completion to flow to the last block
await actionBlock.Completion;
}
您还可以将批处理块合并到您的管道中并删除对 TryRecieve
调用的需要,但这似乎是您流程的另一部分。
编辑
将完成传播到多个块的示例:
public async static void Main(string[] args) {
var sourceBlock = new BufferBlock<int>();
var processBlock1 = new ActionBlock<int>(i => Console.WriteLine($"Block1 {i}"));
var processBlock2 = new ActionBlock<int>(i => Console.WriteLine($"Block2 {i}"));
sourceBlock.LinkTo(processBlock1);
sourceBlock.LinkTo(processBlock2);
var sourceBlockCompletion = sourceBlock.Completion.ContinueWith(tsk => {
if(!tsk.IsFaulted) {
processBlock1.Complete();
processBlock2.Complete();
} else {
((IDataflowBlock)processBlock1).Fault(tsk.Exception);
((IDataflowBlock)processBlock2).Fault(tsk.Exception);
}
});
//Send some data...
sourceBlock.Complete();
await Task.WhenAll(sourceBlockCompletion, processBlock1.Completion, processBlock2.Completion);
}
我需要从遗留数据库导入客户相关数据,并在此过程中执行多项转换。这意味着单个条目需要执行额外的 "events"(同步产品、创建发票等)。
我最初的解决方案是一种简单的并行方法。 它工作正常,但 有时 它有问题。如果当前处理的客户需要等待相同类型的事件,他们的处理队列可能会卡住并最终超时,导致每个底层事件也失败(它们取决于失败的事件)。它不会一直发生,但很烦人。
于是我又有了一个想法,分批工作。我的意思是不仅要限制同时处理的客户数量,还要限制广播到队列的事件数量。在四处寻找想法时,我发现 this answer, which points to the TPL DataFlow.
我做了一个骨架来熟悉一下。我设置了一个简单的管道,但我对 Complete()
和等待 Completion()
的用法有点困惑。
步骤如下
- 制作一个数字列表(要导入的客户的 ID)- 这在导入逻辑之外,它只是为了能够触发其余的逻辑
- 创建一个
BatchBlock
(能够限制同时处理的客户数量) - 根据 ID (
TransformBlock<int, MyClass1>
) 创建单个MyClass1
项目 - 执行一些逻辑并生成
MyClass2
(TransformManyBlock<MyClass1, MyClass2>
) 的集合 - 例如,睡眠 1 秒 - 对集合中的每个项目执行一些逻辑 (
ActionBlock<MyClass2>
) - 例如,休眠 1 秒
完整代码如下:
public static class Program
{
private static void Main(string[] args)
{
var batchBlock = new BatchBlock<int>(2);
for (var i = 1; i < 10; i++)
{
batchBlock.Post(i);
}
batchBlock.Complete();
while (batchBlock.TryReceive(null, out var ids))
{
var transformBlock = new TransformBlock<int, MyClass1>(delegate (int id)
{
Console.WriteLine($"TransformBlock(id: {id})");
return new MyClass1(id, "Star Wars");
});
var transformManyBlock = new TransformManyBlock<MyClass1, MyClass2>(delegate (MyClass1 myClass1)
{
Console.WriteLine($"TransformManyBlock(myClass1: {myClass1.Id}|{myClass1.Value})");
Thread.Sleep(1000);
return GetMyClass22Values(myClass1);
});
var actionBlock = new ActionBlock<MyClass2>(delegate (MyClass2 myClass2)
{
Console.WriteLine($"ActionBlock(myClass2: {myClass2.Id}|{myClass2.Value})");
Thread.Sleep(1000);
});
transformBlock.LinkTo(transformManyBlock);
transformManyBlock.LinkTo(actionBlock);
foreach (var id in ids)
{
transformBlock.Post(id);
}
// this is the point when I'm not 100% sure
//transformBlock.Complete();
//transformManyBlock.Complete();
//transformManyBlock.Completion.Wait();
actionBlock.Complete();
actionBlock.Completion.Wait();
}
Console.WriteLine();
Console.WriteLine("Press any key to continue...");
Console.ReadKey();
}
private static IEnumerable<MyClass2> GetMyClass22Values(MyClass1 myClass1)
{
return new List<MyClass2>
{
new MyClass2(1, myClass1.Id+ " did this"),
new MyClass2(2, myClass1.Id+ " did that"),
new MyClass2(3, myClass1.Id+ " did this again")
};
}
}
public class MyClass1
{
public MyClass1(int id, string value)
{
Id = id;
Value = value;
}
public int Id { get; set; }
public string Value { get; set; }
}
public class MyClass2
{
public MyClass1(int id, string value)
{
Id = id;
Value = value;
}
public int Id { get; set; }
public string Value { get; set; }
}
所以我纠结的点是结束,我需要调用 Complete()
或等待 Completion
。我似乎找不到合适的组合。我希望看到如下输出:
TransformBlock(id: 1)
TransformBlock(id: 2)
TransformManyBlock(myClass1: 1|Star Wars)
TransformManyBlock(myClass1: 2|Star Wars)
ActionBlock(myClass2: 1|1 did this)
ActionBlock(myClass2: 2|1 did that)
ActionBlock(myClass2: 3|1 did this again)
ActionBlock(myClass2: 1|2 did this)
ActionBlock(myClass2: 2|2 did that)
ActionBlock(myClass2: 3|2 did this again)
TransformBlock(id: 3)
TransformBlock(id: 4)
TransformManyBlock(myClass1: 3|Star Wars)
TransformManyBlock(myClass1: 4|Star Wars)
ActionBlock(myClass2: 1|3 did this)
ActionBlock(myClass2: 2|3 did that)
ActionBlock(myClass2: 3|3 did this again)
ActionBlock(myClass2: 1|4 did this)
ActionBlock(myClass2: 2|4 did that)
ActionBlock(myClass2: 3|4 did this again)
[the rest of the items]
Press any key to exit...
谁能给我指出正确的方向?
就快完成了,您需要在管道中的第一个块上调用 Complete
,然后在最后一个块上调用 await
Completion
。然后在您的链接中,您需要像这样传播完成:
private async static void Main(string[] args) {
var transformBlock = new TransformBlock<int, MyClass1>(delegate (int id)
{
Console.WriteLine($"TransformBlock(id: {id})");
return new MyClass1(id, "Star Wars");
});
var transformManyBlock = new TransformManyBlock<MyClass1, MyClass2>(delegate (MyClass1 myClass1)
{
Console.WriteLine($"TransformManyBlock(myClass1: {myClass1.Id}|{myClass1.Value})");
Thread.Sleep(1000);
return GetMyClass22Values(myClass1);
});
var actionBlock = new ActionBlock<MyClass2>(delegate (MyClass2 myClass2)
{
Console.WriteLine($"ActionBlock(myClass2: {myClass2.Id}|{myClass2.Value})");
Thread.Sleep(1000);
});
//propagate completion
transformBlock.LinkTo(transformManyBlock, new DataflowLinkOptions() { PropagateCompletion = true });
transformManyBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true});
foreach(var id in ids) {
transformBlock.Post(id);
}
//Complete the first block
transformBlock.Complete();
//wait for completion to flow to the last block
await actionBlock.Completion;
}
您还可以将批处理块合并到您的管道中并删除对 TryRecieve
调用的需要,但这似乎是您流程的另一部分。
编辑
将完成传播到多个块的示例:
public async static void Main(string[] args) {
var sourceBlock = new BufferBlock<int>();
var processBlock1 = new ActionBlock<int>(i => Console.WriteLine($"Block1 {i}"));
var processBlock2 = new ActionBlock<int>(i => Console.WriteLine($"Block2 {i}"));
sourceBlock.LinkTo(processBlock1);
sourceBlock.LinkTo(processBlock2);
var sourceBlockCompletion = sourceBlock.Completion.ContinueWith(tsk => {
if(!tsk.IsFaulted) {
processBlock1.Complete();
processBlock2.Complete();
} else {
((IDataflowBlock)processBlock1).Fault(tsk.Exception);
((IDataflowBlock)processBlock2).Fault(tsk.Exception);
}
});
//Send some data...
sourceBlock.Complete();
await Task.WhenAll(sourceBlockCompletion, processBlock1.Completion, processBlock2.Completion);
}