TPL 数据流转换块 post 到批处理块,然后是操作块
TPL Dataflow Transform block post to batch block followed by actionblock
我有一个基于 TPL 数据流的应用程序,它仅使用批处理块和操作块就可以正常工作。
我已经添加了一个 TransformBlock 以尝试在发布到批处理块之前从源中转换数据,但我的操作块从未被击中。没有错误或异常被抛出。
我不确定是否需要完成我的变换块,因为它似乎只被击中一次。
除了返回输出类型的对象之外,我是否需要在我的转换代码中添加一个步骤?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
public const int BATCH_SIZE = 10;
static void Main(string[] args)
{
Console.WriteLine("Application started");
//Create the pipeline of actions
var transformBlock = new TransformBlock<string, string>(input => TransformString(input), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
var batchBlock = new BatchBlock<string>(BATCH_SIZE);
var uploadFilesToAzureBlock = new ActionBlock<IEnumerable<string>>(strings => OutputStrings(strings), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
Console.WriteLine("Blocks created");
//link the actions
transformBlock.LinkTo(batchBlock);
batchBlock.LinkTo(uploadFilesToAzureBlock);
batchBlock.Completion.ContinueWith(obj => uploadFilesToAzureBlock.Complete());
Console.WriteLine("Blocks linked");
var testInputs = new List<string>
{
"Kyle",
"Stephen",
"Jon",
"Conor",
"Adrian",
"Marty",
"Richard",
"Norbert",
"Kerri",
"Mark",
"Declan",
"Ray",
"Paul",
"Andrew",
"Rachel",
"David",
"Darrell"
};
Console.WriteLine("Data created");
var i = 0;
foreach (var name in testInputs)
{
Console.WriteLine("Posting name {0}", i);
transformBlock.Post(name);
i++;
}
batchBlock.Complete();
uploadFilesToAzureBlock.Completion.Wait();
Console.WriteLine("Finishing");
Console.ReadKey();
}
private static void OutputStrings(IEnumerable<string> strings)
{
Console.WriteLine("Beginning Batch...");
foreach (var s in strings)
{
Console.WriteLine(s);
}
Console.WriteLine("Completing Batch...");
}
private static string TransformString(string input)
{
return input += " has been processed";
}
}
}
正如上面"usr"提到的,我没有传播块的完成。以下代码完美运行。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
public const int BATCH_SIZE = 10;
static void Main(string[] args)
{
Console.WriteLine("Application started");
//Create the pipeline of actions
var transformBlock = new TransformBlock<string, string>(input => TransformString(input), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
var batchBlock = new BatchBlock<string>(BATCH_SIZE);
var outputStringsBlock = new ActionBlock<IEnumerable<string>>(strings => OutputStrings(strings), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
Console.WriteLine("Blocks created");
//link the actions
transformBlock.LinkTo(batchBlock, new DataflowLinkOptions { PropagateCompletion = true });
batchBlock.LinkTo(outputStringsBlock, new DataflowLinkOptions { PropagateCompletion = true });
batchBlock.Completion.ContinueWith(obj => outputStringsBlock.Complete());
Console.WriteLine("Blocks linked");
var testInputs = new List<string>
{
"Kyle",
"Stephen",
"Jon",
"Conor",
"Adrian",
"Marty",
"Richard",
"Norbert",
"Kerri",
"Mark",
"Declan",
"Ray",
"Paul",
"Andrew",
"Rachel",
"David",
"Darrell"
};
Console.WriteLine("Data created");
var i = 0;
foreach (var name in testInputs)
{
Console.WriteLine("Posting name {0}", i);
transformBlock.Post(name);
i++;
}
transformBlock.Complete();
outputStringsBlock.Completion.Wait();
Console.WriteLine("Finishing");
Console.ReadKey();
}
private static void OutputStrings(IEnumerable<string> strings)
{
Console.WriteLine("Beginning Batch...");
Console.WriteLine("");
foreach (var s in strings)
{
Console.WriteLine(s);
}
Console.WriteLine("");
Console.WriteLine("Completing Batch...");
}
private static string TransformString(string input)
{
return input += " has been processed";
}
}
}
我有一个基于 TPL 数据流的应用程序,它仅使用批处理块和操作块就可以正常工作。
我已经添加了一个 TransformBlock 以尝试在发布到批处理块之前从源中转换数据,但我的操作块从未被击中。没有错误或异常被抛出。
我不确定是否需要完成我的变换块,因为它似乎只被击中一次。
除了返回输出类型的对象之外,我是否需要在我的转换代码中添加一个步骤?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
public const int BATCH_SIZE = 10;
static void Main(string[] args)
{
Console.WriteLine("Application started");
//Create the pipeline of actions
var transformBlock = new TransformBlock<string, string>(input => TransformString(input), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
var batchBlock = new BatchBlock<string>(BATCH_SIZE);
var uploadFilesToAzureBlock = new ActionBlock<IEnumerable<string>>(strings => OutputStrings(strings), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
Console.WriteLine("Blocks created");
//link the actions
transformBlock.LinkTo(batchBlock);
batchBlock.LinkTo(uploadFilesToAzureBlock);
batchBlock.Completion.ContinueWith(obj => uploadFilesToAzureBlock.Complete());
Console.WriteLine("Blocks linked");
var testInputs = new List<string>
{
"Kyle",
"Stephen",
"Jon",
"Conor",
"Adrian",
"Marty",
"Richard",
"Norbert",
"Kerri",
"Mark",
"Declan",
"Ray",
"Paul",
"Andrew",
"Rachel",
"David",
"Darrell"
};
Console.WriteLine("Data created");
var i = 0;
foreach (var name in testInputs)
{
Console.WriteLine("Posting name {0}", i);
transformBlock.Post(name);
i++;
}
batchBlock.Complete();
uploadFilesToAzureBlock.Completion.Wait();
Console.WriteLine("Finishing");
Console.ReadKey();
}
private static void OutputStrings(IEnumerable<string> strings)
{
Console.WriteLine("Beginning Batch...");
foreach (var s in strings)
{
Console.WriteLine(s);
}
Console.WriteLine("Completing Batch...");
}
private static string TransformString(string input)
{
return input += " has been processed";
}
}
}
正如上面"usr"提到的,我没有传播块的完成。以下代码完美运行。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DataflowTest
{
class Program
{
public const int BATCH_SIZE = 10;
static void Main(string[] args)
{
Console.WriteLine("Application started");
//Create the pipeline of actions
var transformBlock = new TransformBlock<string, string>(input => TransformString(input), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
var batchBlock = new BatchBlock<string>(BATCH_SIZE);
var outputStringsBlock = new ActionBlock<IEnumerable<string>>(strings => OutputStrings(strings), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
Console.WriteLine("Blocks created");
//link the actions
transformBlock.LinkTo(batchBlock, new DataflowLinkOptions { PropagateCompletion = true });
batchBlock.LinkTo(outputStringsBlock, new DataflowLinkOptions { PropagateCompletion = true });
batchBlock.Completion.ContinueWith(obj => outputStringsBlock.Complete());
Console.WriteLine("Blocks linked");
var testInputs = new List<string>
{
"Kyle",
"Stephen",
"Jon",
"Conor",
"Adrian",
"Marty",
"Richard",
"Norbert",
"Kerri",
"Mark",
"Declan",
"Ray",
"Paul",
"Andrew",
"Rachel",
"David",
"Darrell"
};
Console.WriteLine("Data created");
var i = 0;
foreach (var name in testInputs)
{
Console.WriteLine("Posting name {0}", i);
transformBlock.Post(name);
i++;
}
transformBlock.Complete();
outputStringsBlock.Completion.Wait();
Console.WriteLine("Finishing");
Console.ReadKey();
}
private static void OutputStrings(IEnumerable<string> strings)
{
Console.WriteLine("Beginning Batch...");
Console.WriteLine("");
foreach (var s in strings)
{
Console.WriteLine(s);
}
Console.WriteLine("");
Console.WriteLine("Completing Batch...");
}
private static string TransformString(string input)
{
return input += " has been processed";
}
}
}