ReceiveAsync interrupting/breaking 消息传递
ReceiveAsync interrupting/breaking message passing
在尝试实施针对 的建议解决方案时出现了这个问题。
问题总结
执行从 TransformBlock 到 WriteOnceBlock 的 ReceiveAsync() 调用导致 TransformBlock 实质上将其自身从流中移除。它停止传播任何类型的消息,无论是数据还是完成信号。
系统设计
该系统旨在通过一系列步骤解析大型 CSV 文件。
流程中有问题的部分可以(不专业地)可视化如下:
平行四边形是BufferBlock,菱形是BroadcastBlock,三角形是WriteOnceBlock,箭头是TransformBlock。实线表示使用 LinkTo() 创建的 link,虚线表示从 ParsedHeaderAndRecordJoiner 到 ParsedHeaderContainer 块的 ReceiveAsync() 调用。我知道此流程有些欠佳,但这不是问题的主要原因。
代码
应用程序根目录
这是 class 的一部分,它创建了必要的块并 link 使用 PropagateCompletion
将它们组合在一起
using (var cancellationSource = new CancellationTokenSource())
{
var cancellationToken = cancellationSource.Token;
var temporaryEntityInstance = new Card(); // Just as an example
var producerQueue = queueFactory.CreateQueue<string>(new DataflowBlockOptions{CancellationToken = cancellationToken});
var recordDistributor = distributorFactory.CreateDistributor<string>(s => (string)s.Clone(),
new DataflowBlockOptions { CancellationToken = cancellationToken });
var headerRowContainer = containerFactory.CreateContainer<string>(s => (string)s.Clone(),
new DataflowBlockOptions { CancellationToken = cancellationToken });
var headerRowParser = new HeaderRowParserFactory().CreateHeaderRowParser(temporaryEntityInstance.GetType(), ';',
new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
var parsedHeaderContainer = containerFactory.CreateContainer<HeaderParsingResult>(HeaderParsingResult.Clone,
new DataflowBlockOptions { CancellationToken = cancellationToken});
var parsedHeaderAndRecordJoiner = new ParsedHeaderAndRecordJoinerFactory().CreateParsedHeaderAndRecordJoiner(parsedHeaderContainer,
new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
var entityParser = new entityParserFactory().CreateEntityParser(temporaryEntityInstance.GetType(), ';',
dataflowBlockOptions: new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
var entityDistributor = distributorFactory.CreateDistributor<EntityParsingResult>(EntityParsingResult.Clone,
new DataflowBlockOptions{CancellationToken = cancellationToken});
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
// Producer subprocess
producerQueue.LinkTo(recordDistributor, linkOptions);
// Header subprocess
recordDistributor.LinkTo(headerRowContainer, linkOptions);
headerRowContainer.LinkTo(headerRowParser, linkOptions);
headerRowParser.LinkTo(parsedHeaderContainer, linkOptions);
parsedHeaderContainer.LinkTo(errorQueue, new DataflowLinkOptions{MaxMessages = 1, PropagateCompletion = true}, dataflowResult => !dataflowResult.WasSuccessful);
// Parsing subprocess
recordDistributor.LinkTo(parsedHeaderAndRecordJoiner, linkOptions);
parsedHeaderAndRecordJoiner.LinkTo(entityParser, linkOptions, joiningResult => joiningResult.WasSuccessful);
entityParser.LinkTo(entityDistributor, linkOptions);
entityDistributor.LinkTo(errorQueue, linkOptions, dataflowResult => !dataflowResult.WasSuccessful);
}
HeaderRowParser
此块解析 CSV 文件中的 header 行并进行一些验证。
public class HeaderRowParserFactory
{
public TransformBlock<string, HeaderParsingResult> CreateHeaderRowParser(Type entityType,
char delimiter,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
return new TransformBlock<string, HeaderParsingResult>(headerRow =>
{
// Set up some containers
var result = new HeaderParsingResult(identifier: "N/A", wasSuccessful: true);
var fieldIndexesByPropertyName = new Dictionary<string, int>();
// Get all serializable properties on the chosen entity type
var serializableProperties = entityType.GetProperties()
.Where(prop => prop.IsDefined(typeof(CsvFieldNameAttribute), false))
.ToList();
// Add their CSV fieldnames to the result
var entityFieldNames = serializableProperties.Select(prop => prop.GetCustomAttribute<CsvFieldNameAttribute>().FieldName);
result.SetEntityFieldNames(entityFieldNames);
// Create the dictionary of properties by field name
var serializablePropertiesByFieldName = serializableProperties.ToDictionary(prop => prop.GetCustomAttribute<CsvFieldNameAttribute>().FieldName, prop => prop, StringComparer.OrdinalIgnoreCase);
var fields = headerRow.Split(delimiter);
for (var i = 0; i < fields.Length; i++)
{
// If any field in the CSV is unknown as a serializable property, we return a failed result
if (!serializablePropertiesByFieldName.TryGetValue(fields[i], out var foundProperty))
{
result.Invalidate($"The header row contains a field that does not match any of the serializable properties - {fields[i]}.",
DataflowErrorSeverity.Critical);
return result;
}
// Perform a bunch more validation
fieldIndexesByPropertyName.Add(foundProperty.Name, i);
}
result.SetFieldIndexesByName(fieldIndexesByPropertyName);
return result;
}, dataflowBlockOptions ?? new ExecutionDataflowBlockOptions());
}
}
ParsedHeaderAndRecordJoiner
对于通过管道的每个后续记录,此块旨在检索已解析的 header 数据并将其添加到记录中。
public class ParsedHeaderAndRecordJoinerFactory
{
public TransformBlock<string, HeaderAndRecordJoiningResult> CreateParsedHeaderAndRecordJoiner(WriteOnceBlock<HeaderParsingResult> parsedHeaderContainer,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
return new TransformBlock<string, HeaderAndRecordJoiningResult>(async csvRecord =>
{
var headerParsingResult = await parsedHeaderContainer.ReceiveAsync();
// If the header couldn't be parsed, a critical error is already on its way to the failure logger so we don't need to continue
if (!headerParsingResult.WasSuccessful) return new HeaderAndRecordJoiningResult(identifier: "N.A.", wasSuccessful: false, null, null);
// The entity parser can't do anything with the header record, so we send a message with wasSuccessful false
var isHeaderRecord = true;
foreach (var entityFieldName in headerParsingResult.EntityFieldNames)
{
isHeaderRecord &= csvRecord.Contains(entityFieldName);
}
if (isHeaderRecord) return new HeaderAndRecordJoiningResult(identifier: "N.A.", wasSuccessful: false, null, null);
return new HeaderAndRecordJoiningResult(identifier: "N.A.", wasSuccessful: true, headerParsingResult, csvRecord);
}, dataflowBlockOptions ?? new ExecutionDataflowBlockOptions());
}
}
问题详情
在当前的实现中,ParsedHeaderAndRecordJoiner 正确地从对 ParsedHeaderContainer 的 ReceiveAsync() 调用接收数据,并且 returns 如预期的那样,但是没有消息到达 EntityParser。
此外,当 Complete 信号被发送到流的前端(ProducerQueue)时,它会传播到 RecordDistributor,但随后会在 ParsedHeaderAndRecordJoiner 处停止(它确实从 HeaderRowContainer 继续向前,因此 RecordDistributor 正在传递它在)。
如果我删除 ReceiveAsync() 调用并自己模拟数据,该块将按预期运行。
我觉得这部分很关键
however no message arrives at the EntityParser.
根据示例,EntityParser
没有收到 ParsedHeaderAndRecordJoiner
输出的消息的唯一方法是当 WasSuccessful
returns 为 false 时。 link 中使用的谓词排除了失败的消息,但这些消息无处可去,因此它们在 ParsedHeaderAndRecordJoiner
输出缓冲区中累积,并且还会阻止 Completion
传播。您需要 link 一个空目标来转储失败的消息。
parsedHeaderAndRecordJoiner.LinkTo(DataflowBlock.NullTarget<HeaderParsingResult>());
此外,如果您的模拟数据总是返回 WasSuccessful
true,那么这可能会将您指向 await ...ReceiveAsync()
不一定是确凿的证据,而是一个很好的起点。当管道卡住时,你能确认ParsedHeaderAndRecordJoiner
的输出缓冲区中所有消息的状态吗?
在尝试实施针对
问题总结
执行从 TransformBlock 到 WriteOnceBlock 的 ReceiveAsync() 调用导致 TransformBlock 实质上将其自身从流中移除。它停止传播任何类型的消息,无论是数据还是完成信号。
系统设计
该系统旨在通过一系列步骤解析大型 CSV 文件。
流程中有问题的部分可以(不专业地)可视化如下:
平行四边形是BufferBlock,菱形是BroadcastBlock,三角形是WriteOnceBlock,箭头是TransformBlock。实线表示使用 LinkTo() 创建的 link,虚线表示从 ParsedHeaderAndRecordJoiner 到 ParsedHeaderContainer 块的 ReceiveAsync() 调用。我知道此流程有些欠佳,但这不是问题的主要原因。
代码
应用程序根目录
这是 class 的一部分,它创建了必要的块并 link 使用 PropagateCompletion
将它们组合在一起using (var cancellationSource = new CancellationTokenSource())
{
var cancellationToken = cancellationSource.Token;
var temporaryEntityInstance = new Card(); // Just as an example
var producerQueue = queueFactory.CreateQueue<string>(new DataflowBlockOptions{CancellationToken = cancellationToken});
var recordDistributor = distributorFactory.CreateDistributor<string>(s => (string)s.Clone(),
new DataflowBlockOptions { CancellationToken = cancellationToken });
var headerRowContainer = containerFactory.CreateContainer<string>(s => (string)s.Clone(),
new DataflowBlockOptions { CancellationToken = cancellationToken });
var headerRowParser = new HeaderRowParserFactory().CreateHeaderRowParser(temporaryEntityInstance.GetType(), ';',
new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
var parsedHeaderContainer = containerFactory.CreateContainer<HeaderParsingResult>(HeaderParsingResult.Clone,
new DataflowBlockOptions { CancellationToken = cancellationToken});
var parsedHeaderAndRecordJoiner = new ParsedHeaderAndRecordJoinerFactory().CreateParsedHeaderAndRecordJoiner(parsedHeaderContainer,
new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
var entityParser = new entityParserFactory().CreateEntityParser(temporaryEntityInstance.GetType(), ';',
dataflowBlockOptions: new ExecutionDataflowBlockOptions { CancellationToken = cancellationToken });
var entityDistributor = distributorFactory.CreateDistributor<EntityParsingResult>(EntityParsingResult.Clone,
new DataflowBlockOptions{CancellationToken = cancellationToken});
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
// Producer subprocess
producerQueue.LinkTo(recordDistributor, linkOptions);
// Header subprocess
recordDistributor.LinkTo(headerRowContainer, linkOptions);
headerRowContainer.LinkTo(headerRowParser, linkOptions);
headerRowParser.LinkTo(parsedHeaderContainer, linkOptions);
parsedHeaderContainer.LinkTo(errorQueue, new DataflowLinkOptions{MaxMessages = 1, PropagateCompletion = true}, dataflowResult => !dataflowResult.WasSuccessful);
// Parsing subprocess
recordDistributor.LinkTo(parsedHeaderAndRecordJoiner, linkOptions);
parsedHeaderAndRecordJoiner.LinkTo(entityParser, linkOptions, joiningResult => joiningResult.WasSuccessful);
entityParser.LinkTo(entityDistributor, linkOptions);
entityDistributor.LinkTo(errorQueue, linkOptions, dataflowResult => !dataflowResult.WasSuccessful);
}
HeaderRowParser
此块解析 CSV 文件中的 header 行并进行一些验证。
public class HeaderRowParserFactory
{
public TransformBlock<string, HeaderParsingResult> CreateHeaderRowParser(Type entityType,
char delimiter,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
return new TransformBlock<string, HeaderParsingResult>(headerRow =>
{
// Set up some containers
var result = new HeaderParsingResult(identifier: "N/A", wasSuccessful: true);
var fieldIndexesByPropertyName = new Dictionary<string, int>();
// Get all serializable properties on the chosen entity type
var serializableProperties = entityType.GetProperties()
.Where(prop => prop.IsDefined(typeof(CsvFieldNameAttribute), false))
.ToList();
// Add their CSV fieldnames to the result
var entityFieldNames = serializableProperties.Select(prop => prop.GetCustomAttribute<CsvFieldNameAttribute>().FieldName);
result.SetEntityFieldNames(entityFieldNames);
// Create the dictionary of properties by field name
var serializablePropertiesByFieldName = serializableProperties.ToDictionary(prop => prop.GetCustomAttribute<CsvFieldNameAttribute>().FieldName, prop => prop, StringComparer.OrdinalIgnoreCase);
var fields = headerRow.Split(delimiter);
for (var i = 0; i < fields.Length; i++)
{
// If any field in the CSV is unknown as a serializable property, we return a failed result
if (!serializablePropertiesByFieldName.TryGetValue(fields[i], out var foundProperty))
{
result.Invalidate($"The header row contains a field that does not match any of the serializable properties - {fields[i]}.",
DataflowErrorSeverity.Critical);
return result;
}
// Perform a bunch more validation
fieldIndexesByPropertyName.Add(foundProperty.Name, i);
}
result.SetFieldIndexesByName(fieldIndexesByPropertyName);
return result;
}, dataflowBlockOptions ?? new ExecutionDataflowBlockOptions());
}
}
ParsedHeaderAndRecordJoiner
对于通过管道的每个后续记录,此块旨在检索已解析的 header 数据并将其添加到记录中。
public class ParsedHeaderAndRecordJoinerFactory
{
public TransformBlock<string, HeaderAndRecordJoiningResult> CreateParsedHeaderAndRecordJoiner(WriteOnceBlock<HeaderParsingResult> parsedHeaderContainer,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
return new TransformBlock<string, HeaderAndRecordJoiningResult>(async csvRecord =>
{
var headerParsingResult = await parsedHeaderContainer.ReceiveAsync();
// If the header couldn't be parsed, a critical error is already on its way to the failure logger so we don't need to continue
if (!headerParsingResult.WasSuccessful) return new HeaderAndRecordJoiningResult(identifier: "N.A.", wasSuccessful: false, null, null);
// The entity parser can't do anything with the header record, so we send a message with wasSuccessful false
var isHeaderRecord = true;
foreach (var entityFieldName in headerParsingResult.EntityFieldNames)
{
isHeaderRecord &= csvRecord.Contains(entityFieldName);
}
if (isHeaderRecord) return new HeaderAndRecordJoiningResult(identifier: "N.A.", wasSuccessful: false, null, null);
return new HeaderAndRecordJoiningResult(identifier: "N.A.", wasSuccessful: true, headerParsingResult, csvRecord);
}, dataflowBlockOptions ?? new ExecutionDataflowBlockOptions());
}
}
问题详情
在当前的实现中,ParsedHeaderAndRecordJoiner 正确地从对 ParsedHeaderContainer 的 ReceiveAsync() 调用接收数据,并且 returns 如预期的那样,但是没有消息到达 EntityParser。
此外,当 Complete 信号被发送到流的前端(ProducerQueue)时,它会传播到 RecordDistributor,但随后会在 ParsedHeaderAndRecordJoiner 处停止(它确实从 HeaderRowContainer 继续向前,因此 RecordDistributor 正在传递它在)。
如果我删除 ReceiveAsync() 调用并自己模拟数据,该块将按预期运行。
我觉得这部分很关键
however no message arrives at the EntityParser.
根据示例,EntityParser
没有收到 ParsedHeaderAndRecordJoiner
输出的消息的唯一方法是当 WasSuccessful
returns 为 false 时。 link 中使用的谓词排除了失败的消息,但这些消息无处可去,因此它们在 ParsedHeaderAndRecordJoiner
输出缓冲区中累积,并且还会阻止 Completion
传播。您需要 link 一个空目标来转储失败的消息。
parsedHeaderAndRecordJoiner.LinkTo(DataflowBlock.NullTarget<HeaderParsingResult>());
此外,如果您的模拟数据总是返回 WasSuccessful
true,那么这可能会将您指向 await ...ReceiveAsync()
不一定是确凿的证据,而是一个很好的起点。当管道卡住时,你能确认ParsedHeaderAndRecordJoiner
的输出缓冲区中所有消息的状态吗?