如何在 TPL/Dataflow 中发出笛卡尔积?
How to emit a cartesian product in TPL/Dataflow?
我正在尝试实现以下行为:
[TestMethod]
public async Task ProducesCartesianProductOfInputs()
{
var block = new CartesianProductBlock<int, string>();
var target = new BufferBlock<Tuple<int, string>>();
var left = block.Left;
var right = block.Right;
block.LinkTo(target);
var actual = new List<Tuple<int, string>>();
Assert.IsTrue(left.Post(1));
Assert.IsTrue(right.Post("a"));
Assert.IsTrue(left.Post(2));
Assert.IsTrue(right.Post("b"));
// PROBLEM?: These can run before messages have been processed and appear to abort further processing.
left.Complete();
right.Complete();
while (await target.OutputAvailableAsync())
{
actual.Add(target.Receive());
}
var expected = new List<Tuple<int, string>>()
{
Tuple.Create(1, "a"),
Tuple.Create(2, "a"),
Tuple.Create(1, "b"),
Tuple.Create(2, "b"),
};
CollectionAssert.AreEquivalent(expected, actual.ToList());
}
我当前的(部分)实现不起作用,我不明白为什么:
// A block that remembers every message it receives on two channels, and pairs every message on a channel to every message on the other channel
public class CartesianProductBlock<T1, T2> : ISourceBlock<Tuple<T1, T2>>
{
private TransformManyBlock<T1, Tuple<T1, T2>> left;
private TransformManyBlock<T2, Tuple<T1, T2>> right;
private List<T1> leftReceived = new List<T1>();
private List<T2> rightReceived = new List<T2>();
private List<ITargetBlock<Tuple<T1, T2>>> targets = new List<ITargetBlock<Tuple<T1, T2>>>();
private object lockObject = new object();
public ITargetBlock<T1> Left { get { return left; } }
public ITargetBlock<T2> Right { get { return right; } }
public CartesianProductBlock()
{
left = new TransformManyBlock<T1, Tuple<T1, T2>>(l =>
{
lock (lockObject)
{
leftReceived.Add(l);
// Pair this input up with all received alternatives
return rightReceived.Select(r => Tuple.Create(l, r));
}
});
right = new TransformManyBlock<T2, Tuple<T1, T2>>(r =>
{
lock(lockObject)
{
rightReceived.Add(r);
// Pair this input up with all received alternatives
return leftReceived.Select(l => Tuple.Create(l, r));
}
});
Task.WhenAll(Left.Completion, Right.Completion).ContinueWith(_ => {
// TODO: Respect propagate completion linkOptions. Defauting to propagation for now.
foreach (var target in targets)
{
target.Complete();
}
});
}
private TaskCompletionSource<int> completion = new TaskCompletionSource<int>();
public Task Completion => completion.Task;
public void Complete() { throw new NotImplementedException(); }
public void Fault(Exception exception) { throw new NotImplementedException(); }
public IDisposable LinkTo(ITargetBlock<Tuple<T1, T2>> target, DataflowLinkOptions linkOptions)
{
left.LinkTo(target);
right.LinkTo(target);
targets.Add(target);
return null; // TODO: Return something proper to allow unlinking
}
public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
{
throw new NotImplementedException();
}
public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
{
throw new NotImplementedException();
}
public Tuple<T1, T2> ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
{
throw new NotImplementedException();
}
}
我遇到了以下(可能相关的)问题:
- 它是不确定的。测试以不同的方式失败。
- 似乎(通过添加日志记录,以及因为我得到 3 到 6 条输出消息)对两个输入的
Complete
调用导致消息未被处理,尽管我的理解是它应该允许所有队列首先排空。 (如果不是这种情况,那么我不知道如何正确编写测试。)
- 我的锁定方案很可能是 wrong/suboptimal,尽管我的目标是在尝试修复之前先找到一些大而粗糙的东西。
- 我对个人
TransformManyBlocks
的实验未能得出有趣的惊喜,我无法弄清楚在这种情况下有什么不同。
正如所怀疑的那样,这与完整性传播有关。这是一个工作版本,包括适当的 link 一次性和尊重 link 选项:
// A block that remembers every message it receives on two channels, and pairs every message on a channel to every message on the other channel
public class CartesianProductBlock<T1, T2> : ISourceBlock<Tuple<T1, T2>>
{
private TransformManyBlock<T1, Tuple<T1, T2>> left;
private TransformManyBlock<T2, Tuple<T1, T2>> right;
private List<T1> leftReceived = new List<T1>();
private List<T2> rightReceived = new List<T2>();
private List<ITargetBlock<Tuple<T1, T2>>> targets = new List<ITargetBlock<Tuple<T1, T2>>>();
private object lockObject = new object();
public ITargetBlock<T1> Left { get { return left; } }
public ITargetBlock<T2> Right { get { return right; } }
public CartesianProductBlock()
{
left = new TransformManyBlock<T1, Tuple<T1, T2>>(l =>
{
lock (lockObject)
{
leftReceived.Add(l);
return rightReceived.Select(r => Tuple.Create(l, r)).ToList();
}
});
right = new TransformManyBlock<T2, Tuple<T1, T2>>(r =>
{
lock(lockObject)
{
rightReceived.Add(r);
return leftReceived.Select(l => Tuple.Create(l, r)).ToList();
}
});
Task.WhenAll(Left.Completion, Right.Completion).ContinueWith(_ => {
completion.SetResult(VoidResult.Instance);
});
}
private TaskCompletionSource<VoidResult> completion = new TaskCompletionSource<VoidResult>();
public Task Completion => completion.Task;
public void Complete() {
Left.Complete();
Right.Complete();
}
public void Fault(Exception exception) { throw new NotImplementedException(); }
public IDisposable LinkTo(ITargetBlock<Tuple<T1, T2>> target, DataflowLinkOptions linkOptions)
{
var leftLink = left.LinkTo(target);
var rightLink = right.LinkTo(target);
var link = new Link(leftLink, rightLink);
Task task = Task.FromResult(0);
if (linkOptions.PropagateCompletion)
{
task = Task.WhenAny(Task.WhenAll(Left.Completion, Right.Completion), link.Completion).ContinueWith(_ =>
{
// If the link has been disposed of, we should not longer propagate completeness.
if (!link.Completion.IsCompleted)
{
target.Complete();
}
});
}
return link;
}
public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
{
throw new NotImplementedException();
}
public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
{
throw new NotImplementedException();
}
public Tuple<T1, T2> ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
{
throw new NotImplementedException();
}
private class Link : IDisposable
{
private IDisposable leftLink;
private IDisposable rightLink;
public Link(IDisposable leftLink, IDisposable rightLink)
{
this.leftLink = leftLink;
this.rightLink = rightLink;
}
private TaskCompletionSource<VoidResult> completionSource = new TaskCompletionSource<VoidResult>();
public Task Completion { get { return completionSource.Task; } }
public void Dispose()
{
leftLink.Dispose();
rightLink.Dispose();
completionSource.SetResult(VoidResult.Instance);
}
}
private class VoidResult
{
public static VoidResult instance = new VoidResult();
public static VoidResult Instance { get { return instance; } }
protected VoidResult() { }
}
}
我正在尝试实现以下行为:
[TestMethod]
public async Task ProducesCartesianProductOfInputs()
{
var block = new CartesianProductBlock<int, string>();
var target = new BufferBlock<Tuple<int, string>>();
var left = block.Left;
var right = block.Right;
block.LinkTo(target);
var actual = new List<Tuple<int, string>>();
Assert.IsTrue(left.Post(1));
Assert.IsTrue(right.Post("a"));
Assert.IsTrue(left.Post(2));
Assert.IsTrue(right.Post("b"));
// PROBLEM?: These can run before messages have been processed and appear to abort further processing.
left.Complete();
right.Complete();
while (await target.OutputAvailableAsync())
{
actual.Add(target.Receive());
}
var expected = new List<Tuple<int, string>>()
{
Tuple.Create(1, "a"),
Tuple.Create(2, "a"),
Tuple.Create(1, "b"),
Tuple.Create(2, "b"),
};
CollectionAssert.AreEquivalent(expected, actual.ToList());
}
我当前的(部分)实现不起作用,我不明白为什么:
// A block that remembers every message it receives on two channels, and pairs every message on a channel to every message on the other channel
public class CartesianProductBlock<T1, T2> : ISourceBlock<Tuple<T1, T2>>
{
private TransformManyBlock<T1, Tuple<T1, T2>> left;
private TransformManyBlock<T2, Tuple<T1, T2>> right;
private List<T1> leftReceived = new List<T1>();
private List<T2> rightReceived = new List<T2>();
private List<ITargetBlock<Tuple<T1, T2>>> targets = new List<ITargetBlock<Tuple<T1, T2>>>();
private object lockObject = new object();
public ITargetBlock<T1> Left { get { return left; } }
public ITargetBlock<T2> Right { get { return right; } }
public CartesianProductBlock()
{
left = new TransformManyBlock<T1, Tuple<T1, T2>>(l =>
{
lock (lockObject)
{
leftReceived.Add(l);
// Pair this input up with all received alternatives
return rightReceived.Select(r => Tuple.Create(l, r));
}
});
right = new TransformManyBlock<T2, Tuple<T1, T2>>(r =>
{
lock(lockObject)
{
rightReceived.Add(r);
// Pair this input up with all received alternatives
return leftReceived.Select(l => Tuple.Create(l, r));
}
});
Task.WhenAll(Left.Completion, Right.Completion).ContinueWith(_ => {
// TODO: Respect propagate completion linkOptions. Defauting to propagation for now.
foreach (var target in targets)
{
target.Complete();
}
});
}
private TaskCompletionSource<int> completion = new TaskCompletionSource<int>();
public Task Completion => completion.Task;
public void Complete() { throw new NotImplementedException(); }
public void Fault(Exception exception) { throw new NotImplementedException(); }
public IDisposable LinkTo(ITargetBlock<Tuple<T1, T2>> target, DataflowLinkOptions linkOptions)
{
left.LinkTo(target);
right.LinkTo(target);
targets.Add(target);
return null; // TODO: Return something proper to allow unlinking
}
public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
{
throw new NotImplementedException();
}
public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
{
throw new NotImplementedException();
}
public Tuple<T1, T2> ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
{
throw new NotImplementedException();
}
}
我遇到了以下(可能相关的)问题:
- 它是不确定的。测试以不同的方式失败。
- 似乎(通过添加日志记录,以及因为我得到 3 到 6 条输出消息)对两个输入的
Complete
调用导致消息未被处理,尽管我的理解是它应该允许所有队列首先排空。 (如果不是这种情况,那么我不知道如何正确编写测试。) - 我的锁定方案很可能是 wrong/suboptimal,尽管我的目标是在尝试修复之前先找到一些大而粗糙的东西。
- 我对个人
TransformManyBlocks
的实验未能得出有趣的惊喜,我无法弄清楚在这种情况下有什么不同。
正如所怀疑的那样,这与完整性传播有关。这是一个工作版本,包括适当的 link 一次性和尊重 link 选项:
// A block that remembers every message it receives on two channels, and pairs every message on a channel to every message on the other channel
public class CartesianProductBlock<T1, T2> : ISourceBlock<Tuple<T1, T2>>
{
private TransformManyBlock<T1, Tuple<T1, T2>> left;
private TransformManyBlock<T2, Tuple<T1, T2>> right;
private List<T1> leftReceived = new List<T1>();
private List<T2> rightReceived = new List<T2>();
private List<ITargetBlock<Tuple<T1, T2>>> targets = new List<ITargetBlock<Tuple<T1, T2>>>();
private object lockObject = new object();
public ITargetBlock<T1> Left { get { return left; } }
public ITargetBlock<T2> Right { get { return right; } }
public CartesianProductBlock()
{
left = new TransformManyBlock<T1, Tuple<T1, T2>>(l =>
{
lock (lockObject)
{
leftReceived.Add(l);
return rightReceived.Select(r => Tuple.Create(l, r)).ToList();
}
});
right = new TransformManyBlock<T2, Tuple<T1, T2>>(r =>
{
lock(lockObject)
{
rightReceived.Add(r);
return leftReceived.Select(l => Tuple.Create(l, r)).ToList();
}
});
Task.WhenAll(Left.Completion, Right.Completion).ContinueWith(_ => {
completion.SetResult(VoidResult.Instance);
});
}
private TaskCompletionSource<VoidResult> completion = new TaskCompletionSource<VoidResult>();
public Task Completion => completion.Task;
public void Complete() {
Left.Complete();
Right.Complete();
}
public void Fault(Exception exception) { throw new NotImplementedException(); }
public IDisposable LinkTo(ITargetBlock<Tuple<T1, T2>> target, DataflowLinkOptions linkOptions)
{
var leftLink = left.LinkTo(target);
var rightLink = right.LinkTo(target);
var link = new Link(leftLink, rightLink);
Task task = Task.FromResult(0);
if (linkOptions.PropagateCompletion)
{
task = Task.WhenAny(Task.WhenAll(Left.Completion, Right.Completion), link.Completion).ContinueWith(_ =>
{
// If the link has been disposed of, we should not longer propagate completeness.
if (!link.Completion.IsCompleted)
{
target.Complete();
}
});
}
return link;
}
public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
{
throw new NotImplementedException();
}
public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
{
throw new NotImplementedException();
}
public Tuple<T1, T2> ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
{
throw new NotImplementedException();
}
private class Link : IDisposable
{
private IDisposable leftLink;
private IDisposable rightLink;
public Link(IDisposable leftLink, IDisposable rightLink)
{
this.leftLink = leftLink;
this.rightLink = rightLink;
}
private TaskCompletionSource<VoidResult> completionSource = new TaskCompletionSource<VoidResult>();
public Task Completion { get { return completionSource.Task; } }
public void Dispose()
{
leftLink.Dispose();
rightLink.Dispose();
completionSource.SetResult(VoidResult.Instance);
}
}
private class VoidResult
{
public static VoidResult instance = new VoidResult();
public static VoidResult Instance { get { return instance; } }
protected VoidResult() { }
}
}