如何为自定义 U-SQL 提取器启用并行性

How to enable parallelism for a custom U-SQL Extractor

我正在为我们的内部文件格式(二进制序列化)实施自定义 U-SQL 提取器。它在 "Atomic" 模式下运行良好:

[SqlUserDefinedExtractor(AtomicFileProcessing = true)]
public class BinaryExtractor : IExtractor

如果我关闭“原子”模式,看起来 U-SQL 会在随机位置拆分文件(我猜只有 250MB 块)。这对我来说是不能接受的。文件格式有一个特殊的行分隔符。我可以在我的提取器中定义自定义行定界符并为其启用并行性吗?从技术上讲,如果有帮助,我可以将行分隔符更改为新的。 谁能帮我解决这个问题?

文件确实被分成了块(我认为目前是 1 GB,但确切的值是实现定义的,并且可能会因性能原因而改变)。

如果文件确实是行分隔的,并且假设该行的原始输入数据小于 4MB,您可以在 UDO 中使用 input.Split() 函数来拆分成行。如果原始输入数据跨越块边界(假设小于 4MB),调用将自动处理这种情况。

Here是一个例子:

public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow outputrow)
{
   // this._row_delim = this._encoding.GetBytes(row_delim); in class ctor
   foreach (Stream current in input.Split(this._row_delim))
   {
       using (StreamReader streamReader = new StreamReader(current, this._encoding))
       {
           int num = 0;
           string[] array = streamReader.ReadToEnd().Split(new string[]{this._col_delim}, StringSplitOptions.None);
           for (int i = 0; i < array.Length; i++)
           {
              // DO YOUR PROCESSING
           }
       }
       yield return outputrow.AsReadOnly();
    }
}

请注意,您不能自己读取跨块边界,您应该确保您的数据确实可以拆分成行。