在 USQL 中编写自定义提取器以跳过有编码问题的行

Writing custom extractor in USQL to skip rows with encoding problems

我有一大组跨越几百个文件的数据。显然,它有一些编码问题(主要是 UTF-8,但显然有些字符无效)。根据 https://msdn.microsoft.com/en-us/library/azure/mt764098.aspx 如果存在编码错误,无论将静默标志设置为真(目的只是跳过错误行),都会发生 运行 时间错误。

因此,我需要编写一个自定义提取器。我写了一个主要做 https://blogs.msdn.microsoft.com/data_otaku/2016/10/27/a-fixed-width-extractor-for-azure-data-lake-analytics/ 示例的简化版本,因为它只需要一行,用定界符将其拆分,并且只是 returns try 块中的值。如果有任何异常,我只是处理它们然后继续。

不幸的是,我在 USQL 脚本本身中引用这个提取器时遇到了问题。当我遵循上述 link 的指导时,它建议在另一个程序集中编写逻辑,构建它,在 ADLS database/assemblies 中注册它,然后通过 REFERENCE ASSEMBLY MyExtractors; 将其包含在顶部脚本(因为这是使用的命名空间)。在下面的 Using 语句中,我用 USING new SimpleExtractor(); 调用它 如果我这样做,当 运行 将脚本针对 type or namespace cannot be found 的 ADLS 服务时,我会收到错误消息。此外,如果我尝试更精确并在 using 语句中使用 USING new MyExtractors.SimpleExtractor();,它会产生相同的错误,引用上面的 USING 语句。

然后我在 https://azure.microsoft.com/en-us/documentation/articles/data-lake-analytics-u-sql-develop-user-defined-operators/ 的旧源中找到了其他文档,该文档描述了在代码隐藏文件中做同样的事情。我删除了单独的程序集并将逻辑复制到该文件中的 class 中。步骤 #6 中的示例没有显示任何 REFERENCE ASSEMBLY 语句,但是当我 运行 它时,我又得到一个错误 type or namespace name cannot be found

查看最新的发行说明,希望这里有些东西已经过时了,我唯一看到的是,如果我使用 USING 语句,我需要引用自定义代码的程序集(就像第一次尝试一样)在实际使用它之前,我就是。

谁能提供一些关于如何在 USQL 中正确引用 UDO 的指导,或者说明如何让 运行time 静默处理编码异常(并跳过它们)?

这是我的逻辑在提取器本身中的样子:

using System.Collections.Generic;
using System.IO;
using System.Text;
using Microsoft.Analytics.Interfaces;

namespace Utilities
{
    [SqlUserDefinedExtractor(AtomicFileProcessing = true)]
    public class ModifiedTextExtractor : IExtractor
    {
        //Contains the row
        private readonly Encoding _encoding;
        private readonly byte[] _row_delim;
        private readonly char _col_delim;

        public ModifiedTextExtractor()
        {
            _encoding = Encoding.UTF8;
            _row_delim = _encoding.GetBytes("\r\n");
            _col_delim = '\t';
        }

        public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
        {
            //Read the input line by line
            foreach (var current in input.Split(_row_delim))
            {
                using (var reader = new StreamReader(current, this._encoding))
                {
                    var line = reader.ReadToEnd().Trim();

                    //If there are any single or double quotes in the line, escape them
                    line = line.Replace(@"""", @"\""");

                    var count = 0;

                    //Split the input by the column delimiter
                    var parts = line.Split(_col_delim);

                    foreach (var part in parts)
                    {
                        output.Set<string>(count, part);
                        count += 1;
                    }
                }
                yield return output.AsReadOnly();
            }
        }
    }
}

以及我如何尝试在 USQL 语句中使用它的片段(在将其注册为程序集之后):

REFERENCE ASSEMBLY [Utilities];

CREATE VIEW MyView AS ...
USING new Utilities.ModifiedTextExtractor();

谢谢!

解决此问题的另一种方法是使用 Azure SQL 数据仓库和支持拒绝行的 Polybase。

1) 在您的外部文件的 ADW 中创建外部 table:

CREATE EXTERNAL TABLE ext.mycsv (
    colA INT NOT NULL,
    colB INT NOT NULL,
    colC INT NOT NULL
)
WITH (
    DATA_SOURCE = eds_mycsv,
    LOCATION = N'/myblobstorage/',
    FILE_FORMAT = eff_csv,
    REJECT_TYPE = VALUE,
    REJECT_VALUE = 1
)

外部table可以指向单个文件或目录(多个文件与我的示例中的结构相同)。 reject_value 为 1 将允许一行失败而不会使整个作业失败。这也可以是一个百分比,即“允许 3% 的行失败而不会使整个加载失败。该语句还将为您提供有关失败行的信息。阅读更多关于 REJECT_TYPEREJECT_VALUE here.

ADW 的另一个优点是可以在不使用时暂停它。

2) 在ADW中创建一个内部table来实现它,例如

CREATE TABLE dbo.mycsv
WITH 
(   
    CLUSTERED COLUMNSTORE INDEX,
    DISTRIBUTION = ROUND_ROBIN
)
AS
SELECT * FROM ext.mycsv;

3) 使用 U-SQL 在 Azure Data Lake Analytics (ADLA) 中创建外部 table,使用外部数据源到 "query data where it lives",即在仓库中。

// Create external table which is in SQL DW
CREATE EXTERNAL TABLE IF NOT EXISTS adlaExt.mycsv
(
    colA        int,
    colB        int,
    colC        int
)
FROM ds_adw LOCATION "dbo.mycsv";

4) 在U-SQL中查询你的外部table,eg:

// Query external table
@e =
    SELECT *
    FROM dbo.mycsv;


// Join with internal table
@q =
    SELECT a.*, b.someColumn
    FROM @e AS a
            INNER JOIN
                dbo.someOtherTable AS b
            ON a.colA == b.n_colA;


// Output it
OUTPUT @q TO "/output/output.csv"
USING Outputters.Csv();

可选择将其导入 ADLA。 Jorg Klein 在 ADLA here.

中的联合查询设置上有一篇很棒的博客 post

恕我直言,这比使用本机 Azure 组件创建自定义提取器要安全得多。 Polybase 尚不支持 ADLA,但几乎肯定会在未来的某个时候支持,届时可以简化设计。

您 运行 遇到了 VIEW 无法引用自定义代码的问题。在 U-SQL 中,所有对象都需要包含它们的上下文规范(例如在它们的主体中引用的程序集(这使对象更加独立,并避免了拉出潜在的长线依赖关系不为人所知的问题对象的用户)。

您需要做的是将 VIEW 转换为 Table 值函数:

CREATE FUNCTION MyFunct(/* optional parameters */) RETURNS @res AS
BEGIN
  REFERENCE ASSEMBLY [Utilities];
  @res = EXTRACT ... USING new Utilities.ModifiedTextExtractor();
END;

然后调用函数如下(注意你需要在SELECT语句中提供行集别名):

@data = SELECT ... FROM MyFunct() AS f WHERE ...;

或者如果您不想应用投影或过滤器:

@data = MyFunct();

与视图一样,table 值函数将被内联。