ActionBlock B 从不接收 TransformBlock A 返回的项目

ActionBlock B never receives item returned by TransformBlock A

我的 C#/WPF 应用程序中的 TPL 数据流网格出现问题。第一个输入项(称为 "Job")总是通过链一直到达最终的 TPL 块。但是剩余的作业永远不会到达最后一个块(#4),即使日志语句清楚地显示它们已成功从块 #3

返回

这是网格。设置一次并存储在我的 View-Model class.

的私有成员中
// 1. _meshStartBlock:  On UI thread.   This block always works fine.

_meshStartBlock = new TransformBlock<Job, Job>(job =>
{
    Jobs.Add(job);
    Fire(_scanCapturedTrigger, job);  // Notify sstate machine.
    Log.Debug("Started: " + job.Name);
    return job;
},
new ExecutionDataflowBlockOptions
{
    CancellationToken = TokenSource.Token,
    TaskScheduler = UiTaskScheduler   // Run on UI thread (because it edits
                                      // our ObservableCollection)
});

// 2. createBlock:  This block also always works fine.

var createBlock = new TransformBlock<Job, Job>(job =>
{
    job.CreateScan();          // Saves some disk files
    job.CreateThumbnail(true); // Creates and saves a thumbnail image.
    Log.Debug("Created: " + job.Name);
    return job;
},
new ExecutionDataflowBlockOptions
{ CancellationToken = TokenSource.Token, MaxDegreeOfParallelism = 1 });


// 3. processBlock - do heavy work in parallel
// This block succeeds for all 3 jobs but 2nd and 3rd returned jobs never
// reach the next block.

var processBlock = new TransformBlock<Job, Job>(job =>
{
    try
    {
        Log.Debug("Processing: " + job.Name);
        job.AlignImages();            // heavy image processing
        job.Generate3d();             // heavy 3d math
        job.FindShapes();             // more heavy match
        job.GetContext().Scan.Save(); // save disk files
        Log.Debug("Processing succeeded: " + job.Name
    }
    catch (Exception e)
    {
        Log.Error("Processing failed: " + job.Name);
    }

    // *** THIS LOG STATEMENT SHOWS UP FOR ALL 3 JOBS ***

    Log.Debug("Leaving process block: " + job.Name);

    return job;
},
new ExecutionDataflowBlockOptions
{ CancellationToken = TokenSource.Token, MaxDegreeOfParallelism = 3 });


// 4. doneBlock: Cleans up.
// Since we schedule this on the UI thread it should not be heavy.

var doneBlock = new ActionBlock<Job>(job =>
{
    // *** ONLY REACHED BY JOB 1 ***  

    Log.Debug("Done: " + job.Name);
    Fire(Trigger.ScanProcessed);    // Notify State Machine
},
new ExecutionDataflowBlockOptions
{ CancellationToken = TokenSource.Token,  TaskScheduler = UiTaskScheduler });

// Set up the mesh.  Link the blocks together to form a chain.

_meshStartBlock.LinkTo(createBlock);
createBlock.LinkTo(processBlock);
processBlock.LinkTo(doneBlock);

return _meshStartBlock;

这是我得到的日志输出

Started: Job1
Created: Job1
Started: Job2
Processing: Job1
Created: Job2
Processing: Job2
Started: Job3
Created: Job3
Processing: Job3
Processing succeeded: Job1
Leaving process block: Job1
Done: Job1
Processing succeeded: Job2
Leaving process block: Job2
Processing succeeded: Job3
Leaving process block: Job3

Debug window 在处理或转储任何类型的错误消息期间不报告任何异常。

我应该注意,我在发布版本中被迫 运行 这个。如果我 运行 调试构建,那么该过程块需要几个小时。此外,永远不会调用 CancellationToken

任何 TPL-Dataflow 专家都可以告诉我如何诊断 Job2 和 Job3 可能发生的情况吗?无论如何我可以让 TPL Dataflow 告诉我我的工作发生了什么?

将错误处理程序附加到块可能会有所帮助,以便在异常发生时立即将其记录下来。这是一个简单的通用错误处理程序的示例:

public static async void OnErrorLog(IDataflowBlock block)
{
    try
    {
        await block.Completion.ConfigureAwait(false);
    }
    catch (Exception ex)
    {
        Log.Error($"{block.GetType().Name} failed", ex);
    }
}

您可以根据自己的喜好进行调整。

用法示例:

OnErrorLog(_meshStartBlock);
OnErrorLog(createBlock);
OnErrorLog(processBlock);
OnErrorLog(doneBlock);