使用 TPL Dataflow 封装以操作块结尾的管道
Use TPL Dataflow to encapsulate pipeline ending in an action block
TPL Dataflow 提供了一个非常有用的功能:
public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput>(
ITargetBlock<TInput> target,
ISourceBlock<TOutput> source)
使您能够将多个块封装到一个变换块中。 return是一个
IPropagatorBlock<TInput, TOutput>
代表管道的开始和结束块。
但是,如果我的管道中的最后一个块是 ActionBlock,我不能使用它,因为 ActionBlock 不是 SourceBlock,并且函数的 return 类型将是 ITargetBlock,而不是一个 IPropagatorBlock。
基本上,我要找的是类似这个函数的东西:
public static ITargetBlock<TStart> Encapsulate<TStart, TEnd>(
ITargetBlock<TStart> startBlock,
ActionBlock<TEnd> endBlock)
这样写是明智的,还是我遗漏了一些简单的东西?我不太确定 到 是如何写的 - 特别是连接完成。我需要创建自己的自定义块类型吗?
编辑:
好的,所以在阅读了@Panagiotis Kanavos 的回复并做了一些修改之后,我想出了这个。这是基于 EncapsulatingPropagator class,这是现有 DataflowBlock.Encapsulate 方法使用的:
internal sealed class EncapsulatingTarget<TStart, TEnd> : ITargetBlock<TStart>
{
private readonly ITargetBlock<TStart> startBlock;
private readonly ActionBlock<TEnd> endBlock;
public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
{
this.startBlock = startBlock;
this.endBlock = endBlock;
}
public Task Completion
{
get { return this.endBlock.Completion; }
}
public void Complete()
{
this.startBlock.Complete();
}
void IDataflowBlock.Fault(Exception exception)
{
if (exception == null)
{
throw new ArgumentNullException("exception");
}
this.startBlock.Fault(exception);
}
public DataflowMessageStatus OfferMessage(
DataflowMessageHeader messageHeader,
TStart messageValue,
ISourceBlock<TStart> source,
bool consumeToAccept)
{
return this.startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
}
Encapsulate 不用于抽象现有管道,它用于创建 propagator 块,该块需要使用现有块无法实现的自定义行为,并且 link秒。
例如,Sliding Window 示例将所有传入消息 post 缓冲到其输入块,并在滑动 window 过期到其输出块时输出一批所有检索到的消息.
方法的名称会造成很多混淆,但当您理解它们的用途时,它们确实有意义:
- target 参数是 preceding 块将连接以发送消息的目标(输入)端点。在这种情况下,处理传入消息并决定是否 post 到输出(源)块的 ActionBlock 是有意义的。
- source 参数是 succeeding 步骤将连接到的源(输出)端点以接收消息。使用 ActionBlock 作为源没有意义,因为它没有任何输出。
接受 ActionBlock 方法作为 source
的 Encapsulate
变体没有用,因为您可以简单地 link 从任何前面的步骤到操作块。
编辑
如果你想模块化一个管道,即将它分解成可重用的、更易于管理的,你可以创建一个class那个构造,你可以使用一个普通的旧class。在那个 class 中,您正常构建管道片段,link 块(确保传播完成),然后将第一步和最后一步的完成任务公开为 public 属性,例如:
class MyFragment
{
public TransformationBlock<SomeMessage,SomeOther> Input {get;}
public Task Completion {get;}
ActionBlock<SomeOther> _finalBlock;
public MyFragment()
{
Input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
_finalBlock=new ActionBlock<SomeOther>(MyMethod);
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}
Input.LinkTo(_finalBlock,linkOptions);
}
private SomeOther MyFunction(SomeMessage msg)
{
...
}
private void MyMethod(SomeOther msg)
{
...
}
}
要将片段连接到管道,您只需 link 从管道块到公开的 Input
块。要等待完成,只需等待暴露的 Completion
任务。
如果你愿意,你可以停在这里,或者你可以实施 ITargetBlock 使片段看起来像一个目标块。您只需将所有方法委托给 Input 块,将 Completion 属性 委托给 final 块。
例如:
class MyFragment:ITargetBlock<SomeMessage>
{
....
public Task Completion {get;}
public void Complete()
{
Input.Complete()
};
public void Fault(Exception exc)
{
Input.Fault(exc);
}
DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
TInput messageValue,ISourceBlock<TInput> source,bool consumeToAccept)
{
return Input.OfferMessage(messageHeader,messageValue,source,consumeToAccept);
}
}
编辑 2
使用@bornfromanegg 的 class 可以将构建片段的行为与公开输入和完成的样板分开:
public ITargetBlock<SomeMessage> BuildMyFragment()
{
var input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
var step2=new TransformationBlock<SomeOther,SomeFinal>(MyFunction2);
var finalBlock=new ActionBlock<SomeFinal>(MyMethod);
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}
input.LinkTo(step2,linkOptions);
step2.LinkTo(finalBlock,linkOptions);
return new EncapsulatingTarget(input,finalBlock);
}
就我而言,我想封装一个包含多个最终 ActionBlock
的网络,并进行总结,因此编辑后的问题中概述的解决方案不起作用。
因为与 "final block" 的唯一交互围绕完成,所以只呈现封装的完成任务就足够了。 (根据建议添加了目标动作构造函数。)
public class EncapsulatingTarget<TInput> : ITargetBlock<TInput>
{
private readonly ITargetBlock<TInput> startBlock;
private readonly Task completion;
public EncapsulatingTarget(ITargetBlock<TInput> startBlock, Task completion)
{
this.startBlock = startBlock;
this.completion = completion;
}
public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
{
this.startBlock = startBlock;
completion = endBlock.Completion;
}
public Task Completion => completion;
public void Complete()
{
startBlock.Complete();
}
void IDataflowBlock.Fault(Exception exception)
{
if (exception == null)
{
throw new ArgumentNullException("exception");
}
startBlock.Fault(exception);
}
public DataflowMessageStatus OfferMessage(
DataflowMessageHeader messageHeader,
TInput messageValue,
ISourceBlock<TInput> source,
bool consumeToAccept)
{
return startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
}
用法示例:
public ITargetBlock<Client.InputRecord> BuildDefaultFinalActions()
{
var splitter = new BroadcastBlock<Client.InputRecord>(null);
var getresults = new TransformManyBlock(...); // propagator
var saveinput = new ActionBlock(...);
var saveresults = new ActionBlock(...);
splitter.LinkTo(saveinput, PropagateCompletion);
splitter.LinkTo(getresults, PropagateCompletion);
getresults.LinkTo(saveresults, PropagateCompletion);
return new Util.EncapsulatedTarget<Client.InputRecord>(splitter, Task.WhenAll(saveinput.Completion, saveresults.Completion));
}
我本可以制作签名 EncapsulatingTarget<T>(ITargetBlock<T> target, params Task[] completions)
并将 WhenAll(...)
移动到构造函数中,但不想对所需的完成通知做出假设。
TPL Dataflow 提供了一个非常有用的功能:
public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput>(
ITargetBlock<TInput> target,
ISourceBlock<TOutput> source)
使您能够将多个块封装到一个变换块中。 return是一个
IPropagatorBlock<TInput, TOutput>
代表管道的开始和结束块。
但是,如果我的管道中的最后一个块是 ActionBlock,我不能使用它,因为 ActionBlock 不是 SourceBlock,并且函数的 return 类型将是 ITargetBlock,而不是一个 IPropagatorBlock。
基本上,我要找的是类似这个函数的东西:
public static ITargetBlock<TStart> Encapsulate<TStart, TEnd>(
ITargetBlock<TStart> startBlock,
ActionBlock<TEnd> endBlock)
这样写是明智的,还是我遗漏了一些简单的东西?我不太确定 到 是如何写的 - 特别是连接完成。我需要创建自己的自定义块类型吗?
编辑:
好的,所以在阅读了@Panagiotis Kanavos 的回复并做了一些修改之后,我想出了这个。这是基于 EncapsulatingPropagator class,这是现有 DataflowBlock.Encapsulate 方法使用的:
internal sealed class EncapsulatingTarget<TStart, TEnd> : ITargetBlock<TStart>
{
private readonly ITargetBlock<TStart> startBlock;
private readonly ActionBlock<TEnd> endBlock;
public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
{
this.startBlock = startBlock;
this.endBlock = endBlock;
}
public Task Completion
{
get { return this.endBlock.Completion; }
}
public void Complete()
{
this.startBlock.Complete();
}
void IDataflowBlock.Fault(Exception exception)
{
if (exception == null)
{
throw new ArgumentNullException("exception");
}
this.startBlock.Fault(exception);
}
public DataflowMessageStatus OfferMessage(
DataflowMessageHeader messageHeader,
TStart messageValue,
ISourceBlock<TStart> source,
bool consumeToAccept)
{
return this.startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
}
Encapsulate 不用于抽象现有管道,它用于创建 propagator 块,该块需要使用现有块无法实现的自定义行为,并且 link秒。
例如,Sliding Window 示例将所有传入消息 post 缓冲到其输入块,并在滑动 window 过期到其输出块时输出一批所有检索到的消息.
方法的名称会造成很多混淆,但当您理解它们的用途时,它们确实有意义:
- target 参数是 preceding 块将连接以发送消息的目标(输入)端点。在这种情况下,处理传入消息并决定是否 post 到输出(源)块的 ActionBlock 是有意义的。
- source 参数是 succeeding 步骤将连接到的源(输出)端点以接收消息。使用 ActionBlock 作为源没有意义,因为它没有任何输出。
接受 ActionBlock 方法作为 source
的 Encapsulate
变体没有用,因为您可以简单地 link 从任何前面的步骤到操作块。
编辑
如果你想模块化一个管道,即将它分解成可重用的、更易于管理的,你可以创建一个class那个构造,你可以使用一个普通的旧class。在那个 class 中,您正常构建管道片段,link 块(确保传播完成),然后将第一步和最后一步的完成任务公开为 public 属性,例如:
class MyFragment
{
public TransformationBlock<SomeMessage,SomeOther> Input {get;}
public Task Completion {get;}
ActionBlock<SomeOther> _finalBlock;
public MyFragment()
{
Input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
_finalBlock=new ActionBlock<SomeOther>(MyMethod);
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}
Input.LinkTo(_finalBlock,linkOptions);
}
private SomeOther MyFunction(SomeMessage msg)
{
...
}
private void MyMethod(SomeOther msg)
{
...
}
}
要将片段连接到管道,您只需 link 从管道块到公开的 Input
块。要等待完成,只需等待暴露的 Completion
任务。
如果你愿意,你可以停在这里,或者你可以实施 ITargetBlock 使片段看起来像一个目标块。您只需将所有方法委托给 Input 块,将 Completion 属性 委托给 final 块。
例如:
class MyFragment:ITargetBlock<SomeMessage>
{
....
public Task Completion {get;}
public void Complete()
{
Input.Complete()
};
public void Fault(Exception exc)
{
Input.Fault(exc);
}
DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
TInput messageValue,ISourceBlock<TInput> source,bool consumeToAccept)
{
return Input.OfferMessage(messageHeader,messageValue,source,consumeToAccept);
}
}
编辑 2
使用@bornfromanegg 的 class 可以将构建片段的行为与公开输入和完成的样板分开:
public ITargetBlock<SomeMessage> BuildMyFragment()
{
var input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
var step2=new TransformationBlock<SomeOther,SomeFinal>(MyFunction2);
var finalBlock=new ActionBlock<SomeFinal>(MyMethod);
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}
input.LinkTo(step2,linkOptions);
step2.LinkTo(finalBlock,linkOptions);
return new EncapsulatingTarget(input,finalBlock);
}
就我而言,我想封装一个包含多个最终 ActionBlock
的网络,并进行总结,因此编辑后的问题中概述的解决方案不起作用。
因为与 "final block" 的唯一交互围绕完成,所以只呈现封装的完成任务就足够了。 (根据建议添加了目标动作构造函数。)
public class EncapsulatingTarget<TInput> : ITargetBlock<TInput>
{
private readonly ITargetBlock<TInput> startBlock;
private readonly Task completion;
public EncapsulatingTarget(ITargetBlock<TInput> startBlock, Task completion)
{
this.startBlock = startBlock;
this.completion = completion;
}
public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
{
this.startBlock = startBlock;
completion = endBlock.Completion;
}
public Task Completion => completion;
public void Complete()
{
startBlock.Complete();
}
void IDataflowBlock.Fault(Exception exception)
{
if (exception == null)
{
throw new ArgumentNullException("exception");
}
startBlock.Fault(exception);
}
public DataflowMessageStatus OfferMessage(
DataflowMessageHeader messageHeader,
TInput messageValue,
ISourceBlock<TInput> source,
bool consumeToAccept)
{
return startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
}
用法示例:
public ITargetBlock<Client.InputRecord> BuildDefaultFinalActions()
{
var splitter = new BroadcastBlock<Client.InputRecord>(null);
var getresults = new TransformManyBlock(...); // propagator
var saveinput = new ActionBlock(...);
var saveresults = new ActionBlock(...);
splitter.LinkTo(saveinput, PropagateCompletion);
splitter.LinkTo(getresults, PropagateCompletion);
getresults.LinkTo(saveresults, PropagateCompletion);
return new Util.EncapsulatedTarget<Client.InputRecord>(splitter, Task.WhenAll(saveinput.Completion, saveresults.Completion));
}
我本可以制作签名 EncapsulatingTarget<T>(ITargetBlock<T> target, params Task[] completions)
并将 WhenAll(...)
移动到构造函数中,但不想对所需的完成通知做出假设。