如何以同步方式合并两个 TPL DataFlow 管道?
How to merge two TPL DataFlow pipelines in synchronized fashion?
我想编写一个应用程序来评估来自两个传感器的传感器数据。两个传感器都在 Package
个对象中发送它们的数据,这些对象被分成 Frame
个对象。 Package
本质上是 Tuple<Timestamp, Data[]>
,Frame
是 Tuple<Timestamp, Data>
。然后我需要始终使用来自两个来源的最早时间戳的 Frame
。
所以基本上我的对象流是
Package -(1:n)-> Frame \
}-pair synchronized-> Tuple<Frame, Frame>
Package -(1:n)-> Frame /
例子
假设每个 Package
包含 2 个或 3 个值(现实:5-7)和递增 1 的整数时间戳(现实:~200Hz => ~5ms 递增)。为了简单起见,"data" 只是 timestamp * 100
。
Packages (timestamp, values[])
Source 1:
{(19, [1700, 1800, 1900]), (22, [2000, 2100, 2200]), (26, [2500, 2600]),
(29, [2700, 2800, 2900]), ...}
Source 2:
{(17, [1500, 1600, 1700]), (19, [1800, 1900]), (21, [2000, 2100]),
(26, [2400, 2500, 2600]), ...}
(1:n)
步之后:
Frames (timestamp, value)
Source 1:
{(17, 1700), (18, 1800), (19, 1900), (20, 2000), (21, 2100),
(22, 2200), (25, 2500), (26, 2600), (27, 2700), (28, 2800),
(29, 2900), ...}
Source 2:
{(15, 1500), (16, 1600), (17, 1700), (18, 1800), (19, 1900),
(20, 2000), (21, 2100), (24, 2400), (25, 2500), (26, 2600), ...}
在 pair synchronized
步骤之后:
Merged tuples (timestamp, source1, source2)
{(15, null, 1500), (16, null, 1600), (17, 1700, 1700), (18, 1800, 1800),
(19, 1900, 1900), (20, 2000, 2000), (21, 2100, 2100), (22, 2200, null),
(24, null, 2400), (25, 2500, 2500), (26, 2600, 2600), ...}
请注意,由于两个来源的 none 都发送了一个值,因此缺少时间戳 23
。那只是一个副作用。我可以放入或不放入一个空元组,这无关紧要。元组是 (27, 2700, 2700)
还是 ((27, 2700), (27, 2700))
也没有关系,我。 e. Tuple<Timestamp, Data, Data>
或 Tuple<Frame, Frame>
.
如果文档正确,我很确定 (1:n)
部分应该是 TransformManyBlock<Package, Frame>
。
但是pair synchronized
部分用哪个块呢?一开始我以为JoinBlock<Frame, Frame>
将是我一直在寻找的东西,但它似乎只是将两个元素按索引配对。但由于既不能确保两个管道都以相同的时间戳开始,也不能确保两个管道始终产生稳定的连续时间戳流(因为有时带有几帧的包可能会在传输中丢失),这不是一个选项。所以我需要的是更多的 "MergeBlock" 可以决定下一个将两个输入流的哪个元素传播到输出(如果有的话)。
我想我必须自己写这样的东西。但是我无法编写正确处理两个 ISourceBlock 变量和一个 ITargetBlock 变量的代码。我基本上尽早陷入困境:
private void MergeSynchronized(
ISourceBlock<Frame> source1,
ISourceBlock<Frame> source2,
ITargetBlock<Tuple<Frame, Frame>> target)
{
var frame1 = source1.Receive();
var frame2 = source2.Receive();
//Loop {
// Depending on the timestamp [mis]match,
// either pair frame1+frame2 or frame1+null or null+frame2, and
// replace whichever frame(s) was/were propagated already
// with the next frame from the respective pipeline
//}
}
我什至不确定这个草稿:方法应该是 async
以便我可以使用 var frame1 = await source1.ReceiveAsnyc();
吗?循环的条件是什么?在哪里以及如何检查是否完成?如何解决这个明显的问题,即我的代码意味着我必须等到流中的间隙 超过 才能意识到存在间隙?
我考虑的替代方案是在管道中添加一个额外的块,确保每个传感器有足够的 "sentinel frames" 放入管道中,以便始终对齐每个管道中的第一个将对齐正确的两个。我 猜测 那将是一种 TransformManyBlock
读取帧,将 "expected" 时间戳与实际时间戳进行比较,然后为丢失的插入标记帧时间戳,直到帧的时间戳再次正确。
或者 pair synchronized
部分是停止 TPL 数据流对象并开始已经与 Data
部分一起使用的实际代码的地方吗?
TPL DataFlow API 的问题是,一切都是 internal/private and/or 密封的。这给你扩展 API.
的可能性不大
无论如何,对于您的问题,实施新的 SynchronizedJoinBlock class 可能是个好主意。实际业务逻辑位于 GetMessagesRecursive 方法中:
public sealed class SynchronizedJoinBlock<T1, T2>
: IReceivableSourceBlock<Tuple<T1, T2>>
{
private readonly object _syncObject = new object();
private readonly Func<T1, T2, int> _compareFunction;
private readonly Queue<T1> _target1Messages;
private readonly Queue<T2> _target2Messages;
private readonly TransformManyBlock<T1, Tuple<T1, T2>> _target1;
private readonly TransformManyBlock<T2, Tuple<T1, T2>> _target2;
private readonly BatchedJoinBlock<Tuple<T1, T2>, Tuple<T1, T2>> _batchedJoinBlock;
private readonly TransformManyBlock<Tuple<IList<Tuple<T1, T2>>, IList<Tuple<T1, T2>>>, Tuple<T1, T2>> _transformManyBlock;
public ITargetBlock<T1> Target1 => _target1;
public ITargetBlock<T2> Target2 => _target2;
public Task Completion => _transformManyBlock.Completion;
public SynchronizedJoinBlock(Func<T1, T2, int> compareFunction)
{
_compareFunction = compareFunction
?? throw new ArgumentNullException(nameof(compareFunction));
_batchedJoinBlock = new BatchedJoinBlock<Tuple<T1, T2>, Tuple<T1, T2>>(1);
_target1Messages = new Queue<T1>();
_target2Messages = new Queue<T2>();
Func<ICollection<Tuple<T1, T2>>> getMessagesFunction = () =>
{
lock (_syncObject)
{
if (_target1Messages.Count > 0 && _target2Messages.Count > 0)
{
return GetMessagesRecursive(_target1Messages.Peek(), _target2Messages.Peek()).ToArray();
}
else
{
return new Tuple<T1, T2>[0];
}
}
};
_target1 = new TransformManyBlock<T1, Tuple<T1, T2>>((element) =>
{
_target1Messages.Enqueue(element);
return getMessagesFunction();
});
_target1.LinkTo(_batchedJoinBlock.Target1, new DataflowLinkOptions() { PropagateCompletion = true });
_target2 = new TransformManyBlock<T2, Tuple<T1, T2>>((element) =>
{
_target2Messages.Enqueue(element);
return getMessagesFunction();
});
_target2.LinkTo(_batchedJoinBlock.Target2, new DataflowLinkOptions() { PropagateCompletion = true });
_transformManyBlock = new TransformManyBlock<Tuple<IList<Tuple<T1, T2>>, IList<Tuple<T1, T2>>>, Tuple<T1, T2>>(
element => element.Item1.Concat(element.Item2)
);
_batchedJoinBlock.LinkTo(_transformManyBlock, new DataflowLinkOptions() { PropagateCompletion = true });
}
private IEnumerable<Tuple<T1, T2>> GetMessagesRecursive(T1 value1, T2 value2)
{
int result = _compareFunction(value1, value2);
if (result == 0)
{
yield return Tuple.Create(_target1Messages.Dequeue(), _target2Messages.Dequeue());
}
else if (result < 0)
{
yield return Tuple.Create(_target1Messages.Dequeue(), default(T2));
if (_target1Messages.Count > 0)
{
foreach (var item in GetMessagesRecursive(_target1Messages.Peek(), value2))
{
yield return item;
}
}
}
else
{
yield return Tuple.Create(default(T1), _target2Messages.Dequeue());
if (_target2Messages.Count > 0)
{
foreach (var item in GetMessagesRecursive(value1, _target2Messages.Peek()))
{
yield return item;
}
}
}
}
public void Complete()
{
_target1.Complete();
_target2.Complete();
}
Tuple<T1, T2> ISourceBlock<Tuple<T1, T2>>.ConsumeMessage(
DataflowMessageHeader messageHeader,
ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
{
return ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)
.ConsumeMessage(messageHeader, target, out messageConsumed);
}
void IDataflowBlock.Fault(Exception exception)
{
((IDataflowBlock)_transformManyBlock).Fault(exception);
}
public IDisposable LinkTo(ITargetBlock<Tuple<T1, T2>> target,
DataflowLinkOptions linkOptions)
{
return _transformManyBlock.LinkTo(target, linkOptions);
}
void ISourceBlock<Tuple<T1, T2>>.ReleaseReservation(
DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
{
((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)
.ReleaseReservation(messageHeader, target);
}
bool ISourceBlock<Tuple<T1, T2>>.ReserveMessage(
DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
{
return ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)
.ReserveMessage(messageHeader, target);
}
public bool TryReceive(Predicate<Tuple<T1, T2>> filter, out Tuple<T1, T2> item)
{
return _transformManyBlock.TryReceive(filter, out item);
}
public bool TryReceiveAll(out IList<Tuple<T1, T2>> items)
{
return _transformManyBlock.TryReceiveAll(out items);
}
}
这是一个 SynchronizedJoinBlock
块的实现,类似于 Hardy Hobeck 的 中提出的那个。这个负责处理一些次要的细节,例如取消、处理异常以及在输入块 Target1
和 Target2
标记为已完成时处理传播剩余项目。此外,合并逻辑不涉及递归,这应该使其性能更好(希望我没有测量它)并且不易受到堆栈溢出异常的影响。小偏差:输出是 ValueTuple<T1, T2>
而不是 Tuple<T1, T2>
(目的是减少分配)。
public sealed class SynchronizedJoinBlock<T1, T2> : IReceivableSourceBlock<(T1, T2)>
{
private readonly Func<T1, T2, int> _comparison;
private readonly Queue<T1> _queue1 = new Queue<T1>();
private readonly Queue<T2> _queue2 = new Queue<T2>();
private readonly ActionBlock<T1> _input1;
private readonly ActionBlock<T2> _input2;
private readonly BufferBlock<(T1, T2)> _output;
private readonly object _locker = new object();
public SynchronizedJoinBlock(Func<T1, T2, int> comparison,
CancellationToken cancellationToken = default)
{
_comparison = comparison ?? throw new ArgumentNullException(nameof(comparison));
// Create the three internal blocks
var options = new ExecutionDataflowBlockOptions()
{
CancellationToken = cancellationToken
};
_input1 = new ActionBlock<T1>(Add1, options);
_input2 = new ActionBlock<T2>(Add2, options);
_output = new BufferBlock<(T1, T2)>(options);
// Link the input blocks with the output block
var inputTasks = new Task[] { _input1.Completion, _input2.Completion };
Task.WhenAny(inputTasks).Unwrap().ContinueWith(t =>
{
// If ANY input block fails, then the whole block has failed
((IDataflowBlock)_output).Fault(t.Exception.InnerException);
if (!_input1.Completion.IsCompleted) _input1.Complete();
if (!_input2.Completion.IsCompleted) _input2.Complete();
ClearQueues();
}, default, TaskContinuationOptions.OnlyOnFaulted |
TaskContinuationOptions.RunContinuationsAsynchronously,
TaskScheduler.Default);
Task.WhenAll(inputTasks).ContinueWith(t =>
{
// If ALL input blocks succeeded, then the whole block has succeeded
try
{
if (!t.IsCanceled) PostRemaining(); // Post what's left
}
catch (Exception ex)
{
((IDataflowBlock)_output).Fault(ex);
}
_output.Complete();
ClearQueues();
}, default, TaskContinuationOptions.NotOnFaulted |
TaskContinuationOptions.RunContinuationsAsynchronously,
TaskScheduler.Default);
}
public ITargetBlock<T1> Target1 => _input1;
public ITargetBlock<T2> Target2 => _input2;
public Task Completion => _output.Completion;
private void Add1(T1 value1)
{
lock (_locker)
{
_queue1.Enqueue(value1);
FindAndPostMatched_Unsafe();
}
}
private void Add2(T2 value2)
{
lock (_locker)
{
_queue2.Enqueue(value2);
FindAndPostMatched_Unsafe();
}
}
private void FindAndPostMatched_Unsafe()
{
while (_queue1.Count > 0 && _queue2.Count > 0)
{
var result = _comparison(_queue1.Peek(), _queue2.Peek());
if (result < 0)
{
_output.Post((_queue1.Dequeue(), default));
}
else if (result > 0)
{
_output.Post((default, _queue2.Dequeue()));
}
else // result == 0
{
_output.Post((_queue1.Dequeue(), _queue2.Dequeue()));
}
}
}
private void PostRemaining()
{
lock (_locker)
{
while (_queue1.Count > 0)
{
_output.Post((_queue1.Dequeue(), default));
}
while (_queue2.Count > 0)
{
_output.Post((default, _queue2.Dequeue()));
}
}
}
private void ClearQueues()
{
lock (_locker)
{
_queue1.Clear();
_queue2.Clear();
}
}
public void Complete() => _output.Complete();
public void Fault(Exception exception)
=> ((IDataflowBlock)_output).Fault(exception);
public IDisposable LinkTo(ITargetBlock<(T1, T2)> target,
DataflowLinkOptions linkOptions)
=> _output.LinkTo(target, linkOptions);
public bool TryReceive(Predicate<(T1, T2)> filter, out (T1, T2) item)
=> _output.TryReceive(filter, out item);
public bool TryReceiveAll(out IList<(T1, T2)> items)
=> _output.TryReceiveAll(out items);
(T1, T2) ISourceBlock<(T1, T2)>.ConsumeMessage(
DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target,
out bool messageConsumed)
=> ((ISourceBlock<(T1, T2)>)_output).ConsumeMessage(
messageHeader, target, out messageConsumed);
void ISourceBlock<(T1, T2)>.ReleaseReservation(
DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target)
=> ((ISourceBlock<(T1, T2)>)_output).ReleaseReservation(
messageHeader, target);
bool ISourceBlock<(T1, T2)>.ReserveMessage(
DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target)
=> ((ISourceBlock<(T1, T2)>)_output).ReserveMessage(
messageHeader, target);
}
用法示例:
var joinBlock = new SynchronizedJoinBlock<(int, int), (int, int)>(
(x, y) => Comparer<int>.Default.Compare(x.Item1, y.Item1));
var source1 = new (int, int)[] {(17, 1700), (18, 1800), (19, 1900),
(20, 2000), (21, 2100), (22, 2200), (25, 2500), (26, 2600),
(27, 2700), (28, 2800), (29, 2900)};
var source2 = new (int, int)[] {(15, 1500), (16, 1600), (17, 1700),
(18, 1800), (19, 1900), (20, 2000), (21, 2100), (24, 2400),
(25, 2500), (26, 2600)};
Array.ForEach(source1, x => joinBlock.Target1.Post(x));
Array.ForEach(source2, x => joinBlock.Target2.Post(x));
joinBlock.Target1.Complete();
joinBlock.Target2.Complete();
while (joinBlock.OutputAvailableAsync().Result)
{
Console.WriteLine($"> Received: {joinBlock.Receive()}");
}
输出:
Received: ((0, 0), (15, 1500))
Received: ((0, 0), (16, 1600))
Received: ((17, 1700), (17, 1700))
Received: ((18, 1800), (18, 1800))
Received: ((19, 1900), (19, 1900))
Received: ((20, 2000), (20, 2000))
Received: ((21, 2100), (21, 2100))
Received: ((22, 2200), (0, 0))
Received: ((0, 0), (24, 2400))
Received: ((25, 2500), (25, 2500))
Received: ((26, 2600), (26, 2600))
Received: ((27, 2700), (0, 0))
Received: ((28, 2800), (0, 0))
Received: ((29, 2900), (0, 0))
假定传入的数据是有序的。
此 class 与我不久前在 somewhat related question.
中发布的 JoinDependencyBlock
class 具有相似的结构
我想编写一个应用程序来评估来自两个传感器的传感器数据。两个传感器都在 Package
个对象中发送它们的数据,这些对象被分成 Frame
个对象。 Package
本质上是 Tuple<Timestamp, Data[]>
,Frame
是 Tuple<Timestamp, Data>
。然后我需要始终使用来自两个来源的最早时间戳的 Frame
。
所以基本上我的对象流是
Package -(1:n)-> Frame \
}-pair synchronized-> Tuple<Frame, Frame>
Package -(1:n)-> Frame /
例子
假设每个 Package
包含 2 个或 3 个值(现实:5-7)和递增 1 的整数时间戳(现实:~200Hz => ~5ms 递增)。为了简单起见,"data" 只是 timestamp * 100
。
Packages (timestamp, values[])
Source 1:
{(19, [1700, 1800, 1900]), (22, [2000, 2100, 2200]), (26, [2500, 2600]),
(29, [2700, 2800, 2900]), ...}
Source 2:
{(17, [1500, 1600, 1700]), (19, [1800, 1900]), (21, [2000, 2100]),
(26, [2400, 2500, 2600]), ...}
(1:n)
步之后:
Frames (timestamp, value)
Source 1:
{(17, 1700), (18, 1800), (19, 1900), (20, 2000), (21, 2100),
(22, 2200), (25, 2500), (26, 2600), (27, 2700), (28, 2800),
(29, 2900), ...}
Source 2:
{(15, 1500), (16, 1600), (17, 1700), (18, 1800), (19, 1900),
(20, 2000), (21, 2100), (24, 2400), (25, 2500), (26, 2600), ...}
在 pair synchronized
步骤之后:
Merged tuples (timestamp, source1, source2)
{(15, null, 1500), (16, null, 1600), (17, 1700, 1700), (18, 1800, 1800),
(19, 1900, 1900), (20, 2000, 2000), (21, 2100, 2100), (22, 2200, null),
(24, null, 2400), (25, 2500, 2500), (26, 2600, 2600), ...}
请注意,由于两个来源的 none 都发送了一个值,因此缺少时间戳 23
。那只是一个副作用。我可以放入或不放入一个空元组,这无关紧要。元组是 (27, 2700, 2700)
还是 ((27, 2700), (27, 2700))
也没有关系,我。 e. Tuple<Timestamp, Data, Data>
或 Tuple<Frame, Frame>
.
如果文档正确,我很确定 (1:n)
部分应该是 TransformManyBlock<Package, Frame>
。
但是pair synchronized
部分用哪个块呢?一开始我以为JoinBlock<Frame, Frame>
将是我一直在寻找的东西,但它似乎只是将两个元素按索引配对。但由于既不能确保两个管道都以相同的时间戳开始,也不能确保两个管道始终产生稳定的连续时间戳流(因为有时带有几帧的包可能会在传输中丢失),这不是一个选项。所以我需要的是更多的 "MergeBlock" 可以决定下一个将两个输入流的哪个元素传播到输出(如果有的话)。
我想我必须自己写这样的东西。但是我无法编写正确处理两个 ISourceBlock 变量和一个 ITargetBlock 变量的代码。我基本上尽早陷入困境:
private void MergeSynchronized(
ISourceBlock<Frame> source1,
ISourceBlock<Frame> source2,
ITargetBlock<Tuple<Frame, Frame>> target)
{
var frame1 = source1.Receive();
var frame2 = source2.Receive();
//Loop {
// Depending on the timestamp [mis]match,
// either pair frame1+frame2 or frame1+null or null+frame2, and
// replace whichever frame(s) was/were propagated already
// with the next frame from the respective pipeline
//}
}
我什至不确定这个草稿:方法应该是 async
以便我可以使用 var frame1 = await source1.ReceiveAsnyc();
吗?循环的条件是什么?在哪里以及如何检查是否完成?如何解决这个明显的问题,即我的代码意味着我必须等到流中的间隙 超过 才能意识到存在间隙?
我考虑的替代方案是在管道中添加一个额外的块,确保每个传感器有足够的 "sentinel frames" 放入管道中,以便始终对齐每个管道中的第一个将对齐正确的两个。我 猜测 那将是一种 TransformManyBlock
读取帧,将 "expected" 时间戳与实际时间戳进行比较,然后为丢失的插入标记帧时间戳,直到帧的时间戳再次正确。
或者 pair synchronized
部分是停止 TPL 数据流对象并开始已经与 Data
部分一起使用的实际代码的地方吗?
TPL DataFlow API 的问题是,一切都是 internal/private and/or 密封的。这给你扩展 API.
的可能性不大无论如何,对于您的问题,实施新的 SynchronizedJoinBlock class 可能是个好主意。实际业务逻辑位于 GetMessagesRecursive 方法中:
public sealed class SynchronizedJoinBlock<T1, T2>
: IReceivableSourceBlock<Tuple<T1, T2>>
{
private readonly object _syncObject = new object();
private readonly Func<T1, T2, int> _compareFunction;
private readonly Queue<T1> _target1Messages;
private readonly Queue<T2> _target2Messages;
private readonly TransformManyBlock<T1, Tuple<T1, T2>> _target1;
private readonly TransformManyBlock<T2, Tuple<T1, T2>> _target2;
private readonly BatchedJoinBlock<Tuple<T1, T2>, Tuple<T1, T2>> _batchedJoinBlock;
private readonly TransformManyBlock<Tuple<IList<Tuple<T1, T2>>, IList<Tuple<T1, T2>>>, Tuple<T1, T2>> _transformManyBlock;
public ITargetBlock<T1> Target1 => _target1;
public ITargetBlock<T2> Target2 => _target2;
public Task Completion => _transformManyBlock.Completion;
public SynchronizedJoinBlock(Func<T1, T2, int> compareFunction)
{
_compareFunction = compareFunction
?? throw new ArgumentNullException(nameof(compareFunction));
_batchedJoinBlock = new BatchedJoinBlock<Tuple<T1, T2>, Tuple<T1, T2>>(1);
_target1Messages = new Queue<T1>();
_target2Messages = new Queue<T2>();
Func<ICollection<Tuple<T1, T2>>> getMessagesFunction = () =>
{
lock (_syncObject)
{
if (_target1Messages.Count > 0 && _target2Messages.Count > 0)
{
return GetMessagesRecursive(_target1Messages.Peek(), _target2Messages.Peek()).ToArray();
}
else
{
return new Tuple<T1, T2>[0];
}
}
};
_target1 = new TransformManyBlock<T1, Tuple<T1, T2>>((element) =>
{
_target1Messages.Enqueue(element);
return getMessagesFunction();
});
_target1.LinkTo(_batchedJoinBlock.Target1, new DataflowLinkOptions() { PropagateCompletion = true });
_target2 = new TransformManyBlock<T2, Tuple<T1, T2>>((element) =>
{
_target2Messages.Enqueue(element);
return getMessagesFunction();
});
_target2.LinkTo(_batchedJoinBlock.Target2, new DataflowLinkOptions() { PropagateCompletion = true });
_transformManyBlock = new TransformManyBlock<Tuple<IList<Tuple<T1, T2>>, IList<Tuple<T1, T2>>>, Tuple<T1, T2>>(
element => element.Item1.Concat(element.Item2)
);
_batchedJoinBlock.LinkTo(_transformManyBlock, new DataflowLinkOptions() { PropagateCompletion = true });
}
private IEnumerable<Tuple<T1, T2>> GetMessagesRecursive(T1 value1, T2 value2)
{
int result = _compareFunction(value1, value2);
if (result == 0)
{
yield return Tuple.Create(_target1Messages.Dequeue(), _target2Messages.Dequeue());
}
else if (result < 0)
{
yield return Tuple.Create(_target1Messages.Dequeue(), default(T2));
if (_target1Messages.Count > 0)
{
foreach (var item in GetMessagesRecursive(_target1Messages.Peek(), value2))
{
yield return item;
}
}
}
else
{
yield return Tuple.Create(default(T1), _target2Messages.Dequeue());
if (_target2Messages.Count > 0)
{
foreach (var item in GetMessagesRecursive(value1, _target2Messages.Peek()))
{
yield return item;
}
}
}
}
public void Complete()
{
_target1.Complete();
_target2.Complete();
}
Tuple<T1, T2> ISourceBlock<Tuple<T1, T2>>.ConsumeMessage(
DataflowMessageHeader messageHeader,
ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
{
return ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)
.ConsumeMessage(messageHeader, target, out messageConsumed);
}
void IDataflowBlock.Fault(Exception exception)
{
((IDataflowBlock)_transformManyBlock).Fault(exception);
}
public IDisposable LinkTo(ITargetBlock<Tuple<T1, T2>> target,
DataflowLinkOptions linkOptions)
{
return _transformManyBlock.LinkTo(target, linkOptions);
}
void ISourceBlock<Tuple<T1, T2>>.ReleaseReservation(
DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
{
((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)
.ReleaseReservation(messageHeader, target);
}
bool ISourceBlock<Tuple<T1, T2>>.ReserveMessage(
DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
{
return ((ISourceBlock<Tuple<T1, T2>>)_transformManyBlock)
.ReserveMessage(messageHeader, target);
}
public bool TryReceive(Predicate<Tuple<T1, T2>> filter, out Tuple<T1, T2> item)
{
return _transformManyBlock.TryReceive(filter, out item);
}
public bool TryReceiveAll(out IList<Tuple<T1, T2>> items)
{
return _transformManyBlock.TryReceiveAll(out items);
}
}
这是一个 SynchronizedJoinBlock
块的实现,类似于 Hardy Hobeck 的 Target1
和 Target2
标记为已完成时处理传播剩余项目。此外,合并逻辑不涉及递归,这应该使其性能更好(希望我没有测量它)并且不易受到堆栈溢出异常的影响。小偏差:输出是 ValueTuple<T1, T2>
而不是 Tuple<T1, T2>
(目的是减少分配)。
public sealed class SynchronizedJoinBlock<T1, T2> : IReceivableSourceBlock<(T1, T2)>
{
private readonly Func<T1, T2, int> _comparison;
private readonly Queue<T1> _queue1 = new Queue<T1>();
private readonly Queue<T2> _queue2 = new Queue<T2>();
private readonly ActionBlock<T1> _input1;
private readonly ActionBlock<T2> _input2;
private readonly BufferBlock<(T1, T2)> _output;
private readonly object _locker = new object();
public SynchronizedJoinBlock(Func<T1, T2, int> comparison,
CancellationToken cancellationToken = default)
{
_comparison = comparison ?? throw new ArgumentNullException(nameof(comparison));
// Create the three internal blocks
var options = new ExecutionDataflowBlockOptions()
{
CancellationToken = cancellationToken
};
_input1 = new ActionBlock<T1>(Add1, options);
_input2 = new ActionBlock<T2>(Add2, options);
_output = new BufferBlock<(T1, T2)>(options);
// Link the input blocks with the output block
var inputTasks = new Task[] { _input1.Completion, _input2.Completion };
Task.WhenAny(inputTasks).Unwrap().ContinueWith(t =>
{
// If ANY input block fails, then the whole block has failed
((IDataflowBlock)_output).Fault(t.Exception.InnerException);
if (!_input1.Completion.IsCompleted) _input1.Complete();
if (!_input2.Completion.IsCompleted) _input2.Complete();
ClearQueues();
}, default, TaskContinuationOptions.OnlyOnFaulted |
TaskContinuationOptions.RunContinuationsAsynchronously,
TaskScheduler.Default);
Task.WhenAll(inputTasks).ContinueWith(t =>
{
// If ALL input blocks succeeded, then the whole block has succeeded
try
{
if (!t.IsCanceled) PostRemaining(); // Post what's left
}
catch (Exception ex)
{
((IDataflowBlock)_output).Fault(ex);
}
_output.Complete();
ClearQueues();
}, default, TaskContinuationOptions.NotOnFaulted |
TaskContinuationOptions.RunContinuationsAsynchronously,
TaskScheduler.Default);
}
public ITargetBlock<T1> Target1 => _input1;
public ITargetBlock<T2> Target2 => _input2;
public Task Completion => _output.Completion;
private void Add1(T1 value1)
{
lock (_locker)
{
_queue1.Enqueue(value1);
FindAndPostMatched_Unsafe();
}
}
private void Add2(T2 value2)
{
lock (_locker)
{
_queue2.Enqueue(value2);
FindAndPostMatched_Unsafe();
}
}
private void FindAndPostMatched_Unsafe()
{
while (_queue1.Count > 0 && _queue2.Count > 0)
{
var result = _comparison(_queue1.Peek(), _queue2.Peek());
if (result < 0)
{
_output.Post((_queue1.Dequeue(), default));
}
else if (result > 0)
{
_output.Post((default, _queue2.Dequeue()));
}
else // result == 0
{
_output.Post((_queue1.Dequeue(), _queue2.Dequeue()));
}
}
}
private void PostRemaining()
{
lock (_locker)
{
while (_queue1.Count > 0)
{
_output.Post((_queue1.Dequeue(), default));
}
while (_queue2.Count > 0)
{
_output.Post((default, _queue2.Dequeue()));
}
}
}
private void ClearQueues()
{
lock (_locker)
{
_queue1.Clear();
_queue2.Clear();
}
}
public void Complete() => _output.Complete();
public void Fault(Exception exception)
=> ((IDataflowBlock)_output).Fault(exception);
public IDisposable LinkTo(ITargetBlock<(T1, T2)> target,
DataflowLinkOptions linkOptions)
=> _output.LinkTo(target, linkOptions);
public bool TryReceive(Predicate<(T1, T2)> filter, out (T1, T2) item)
=> _output.TryReceive(filter, out item);
public bool TryReceiveAll(out IList<(T1, T2)> items)
=> _output.TryReceiveAll(out items);
(T1, T2) ISourceBlock<(T1, T2)>.ConsumeMessage(
DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target,
out bool messageConsumed)
=> ((ISourceBlock<(T1, T2)>)_output).ConsumeMessage(
messageHeader, target, out messageConsumed);
void ISourceBlock<(T1, T2)>.ReleaseReservation(
DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target)
=> ((ISourceBlock<(T1, T2)>)_output).ReleaseReservation(
messageHeader, target);
bool ISourceBlock<(T1, T2)>.ReserveMessage(
DataflowMessageHeader messageHeader, ITargetBlock<(T1, T2)> target)
=> ((ISourceBlock<(T1, T2)>)_output).ReserveMessage(
messageHeader, target);
}
用法示例:
var joinBlock = new SynchronizedJoinBlock<(int, int), (int, int)>(
(x, y) => Comparer<int>.Default.Compare(x.Item1, y.Item1));
var source1 = new (int, int)[] {(17, 1700), (18, 1800), (19, 1900),
(20, 2000), (21, 2100), (22, 2200), (25, 2500), (26, 2600),
(27, 2700), (28, 2800), (29, 2900)};
var source2 = new (int, int)[] {(15, 1500), (16, 1600), (17, 1700),
(18, 1800), (19, 1900), (20, 2000), (21, 2100), (24, 2400),
(25, 2500), (26, 2600)};
Array.ForEach(source1, x => joinBlock.Target1.Post(x));
Array.ForEach(source2, x => joinBlock.Target2.Post(x));
joinBlock.Target1.Complete();
joinBlock.Target2.Complete();
while (joinBlock.OutputAvailableAsync().Result)
{
Console.WriteLine($"> Received: {joinBlock.Receive()}");
}
输出:
Received: ((0, 0), (15, 1500))
Received: ((0, 0), (16, 1600))
Received: ((17, 1700), (17, 1700))
Received: ((18, 1800), (18, 1800))
Received: ((19, 1900), (19, 1900))
Received: ((20, 2000), (20, 2000))
Received: ((21, 2100), (21, 2100))
Received: ((22, 2200), (0, 0))
Received: ((0, 0), (24, 2400))
Received: ((25, 2500), (25, 2500))
Received: ((26, 2600), (26, 2600))
Received: ((27, 2700), (0, 0))
Received: ((28, 2800), (0, 0))
Received: ((29, 2900), (0, 0))
假定传入的数据是有序的。
此 class 与我不久前在 somewhat related question.
中发布的JoinDependencyBlock
class 具有相似的结构