如何将 TPL 数据流 TranformBlock 或 ActionBlock 放在单独的文件中?

How to put a TPL Dataflow TranformBlock or ActionBlock in a separate file?

我想为我的 .NET Core 应用程序使用 TPL 数据流并遵循 the example from the docs.

我不想将所有逻辑都放在一个文件中,而是想将每个 TransformBlockActionBlock(我还不需要其他的)放到它们自己的文件中。将整数转换为字符串的小 TransformBlock 示例

class IntToStringTransformer : TransformBlock<int, string>
{
    public IntToStringTransformer() : base(number => number.ToString()) { }
}

和一个小的 ActionBlock 示例将字符串写入控制台

class StringWriter : ActionBlock<string>
{
    public StringWriter() : base(Console.WriteLine) { }
}

不幸的是,这行不通,因为块 类 是密封的。有什么方法可以将这些块组织到它们自己的文件中吗?

正如@Panagiotis 所解释的,我认为您必须稍微抛开 OOP 思维模式。 DataFlow 是构建块,您可以配置这些构建块来执行所需的操作。我将尝试创建一个小例子来说明我的意思:

// Interface and impl. are in separate files. Actually, they could 
// even be in a different project ...
public interface IMyComplicatedTransform
{
     Task<string> TransformFunction(int input);
}

public class MyComplicatedTransform : IMyComplicatedTransform
{
     public Task<string> IMyComplicatedTransform.TransformFunction(int input)
     {
         // Some complex logic
     }
}

class DataFlowUsingClass{

     private readonly IMyComplicatedTransform myTransformer;
     private readonly TransformBlock<int , string> myTransform;
     // ... some more blocks ...

     public DataFlowUsingClass()
     {
          myTransformer = new MyComplicatedTransform(); // maybe use ctor injection?
          CreatePipeline();
     }

     private void CreatePipeline()
     {
          // create blocks
          myTransform = new TransformBlock<int, string>(myTransformer.TransformFunction);
          // ... init some more blocks

          // TODO link blocks
     }
}

我认为这最接近您的要求。

您最终得到的是一组可以独立测试的接口和实现。客户端基本上归结为“gluecode”。

编辑:正如@Panagiotis 正确指出的那样,界面甚至非常流畅。你可以没有。

数据流 steps/blocks/goroutines 本质上是功能性的,最好组织为工厂功能的模块,而不是单独的 classes。 TPL DataFlow 管道与 F# 或任何其他语言中的函数调用管道非常相似。事实上,可以将其视为一种 PowerShell 管道,只是它更易于编写。

无需创建 class 或实现接口来向该管道添加新函数,您只需添加它并将输出重定向到下一个函数。

TPL 数据流块已经提供了构建管道的原语,只需要一个转换函数。这就是为什么它们是密封的,以防止误用。

组织数据流的自然方式也与 F# 类似 - 使用执行每项工作的 函数 创建库,将它们放入相关函数的模块中。这些函数是无状态的,因此它们可以很容易地进入静态库,就像扩展方法一样。

例如,可以有一个模块用于执行批量插入或读取数据的数据库相关功能,另一个用于处理导出为各种文件格式,单独的 classes 调用外部 Web 服务,另一个用于解析特定的消息格式。

一个真实的例子

在过去的 7 年里,我一直在为在线旅行社 (OTA) 处理多个复杂的管道。其中一个调用多个 GDS(OTA 和航空公司之间的中介)来检索交易信息 - 机票问题、退款、取消等。下一步检索机票记录,详细的机票信息。最后将记录插入数据库。

GDS 太大而不必理会标准,因此他们的“SOAP”Web 服务甚至 SOAP-compliant,更不用说遵循 WS-* 标准了。因此每个 GDS 都需要一个单独的 class 库来调用服务和解析输出。那里还没有数据流,项目已经够复杂了

将数据写入数据库几乎总是一样,因此有一个单独的项目,其方法采用 IEnumerable<T> 并使用 SqlBulkCopy.[=16 将其写入数据库=]

虽然加载新数据还不够,但经常会出错,所以我需要能够加载已存储的票证信息。

组织

保持理智:

  • 每个管道都有自己的文件:
    • 用于加载新数据的每日管道,
    • 用于加载所有存储数据的重新加载管道
    • 使用现有数据的“重新运行”管道并且再次询问任何丢失的数据。
  • 静态 classes 用于保存 worker 函数和 separately 工厂方法,这些方法根据配置生成数据流块。例如,CreateLogger(path,level) 创建一个记录特定消息的 ActionBlock<Message>
  • 常见的数据流扩展方法——因为数据流块遵循相同的基本模式,所以很容易通过组合例如Func<TIn,TOut>和记录器块来创建记录块。或者创建一个 LinkTo 重载,将错误记录重定向到记录器或数据库。这些很常见,可以成为扩展方法。

如果它们在同一个文件中,则很难在不影响另一个管道的情况下编辑一个管道。此外,除了核心任务之外,管道还有很多其他内容,例如:

  • 日志记录
  • 处理错误记录和部分结果(无法停止 100K 导入 10 个错误)
  • 错误处理(与处理错误记录不同)
  • 监控 - 这个怪物在过去的 15 分钟里在做什么? DOP=10 是否提高了性能?

不创建父管道class

有些步骤很常见,所以一开始,我创建了一个父级 class,其中的常用步骤过载了,或者只是在子级 class 中替换了。 非常糟糕的想法。每个管道相似但不完全相同,继承意味着修改一个步骤或一个连接可能会破坏一切。大约 1 年后,事情变得难以忍受,所以我将父 class 拆分为单独的 classes.