在 USQL 中编写自定义提取器以跳过有编码问题的行
Writing custom extractor in USQL to skip rows with encoding problems
我有一大组跨越几百个文件的数据。显然,它有一些编码问题(主要是 UTF-8,但显然有些字符无效)。根据 https://msdn.microsoft.com/en-us/library/azure/mt764098.aspx 如果存在编码错误,无论将静默标志设置为真(目的只是跳过错误行),都会发生 运行 时间错误。
因此,我需要编写一个自定义提取器。我写了一个主要做 https://blogs.msdn.microsoft.com/data_otaku/2016/10/27/a-fixed-width-extractor-for-azure-data-lake-analytics/ 示例的简化版本,因为它只需要一行,用定界符将其拆分,并且只是 returns try 块中的值。如果有任何异常,我只是处理它们然后继续。
不幸的是,我在 USQL 脚本本身中引用这个提取器时遇到了问题。当我遵循上述 link 的指导时,它建议在另一个程序集中编写逻辑,构建它,在 ADLS database/assemblies 中注册它,然后通过 REFERENCE ASSEMBLY MyExtractors;
将其包含在顶部脚本(因为这是使用的命名空间)。在下面的 Using 语句中,我用 USING new SimpleExtractor();
调用它 如果我这样做,当 运行 将脚本针对 type or namespace cannot be found
的 ADLS 服务时,我会收到错误消息。此外,如果我尝试更精确并在 using 语句中使用 USING new MyExtractors.SimpleExtractor();
,它会产生相同的错误,引用上面的 USING 语句。
然后我在 https://azure.microsoft.com/en-us/documentation/articles/data-lake-analytics-u-sql-develop-user-defined-operators/ 的旧源中找到了其他文档,该文档描述了在代码隐藏文件中做同样的事情。我删除了单独的程序集并将逻辑复制到该文件中的 class 中。步骤 #6 中的示例没有显示任何 REFERENCE ASSEMBLY
语句,但是当我 运行 它时,我又得到一个错误 type or namespace name cannot be found
。
查看最新的发行说明,希望这里有些东西已经过时了,我唯一看到的是,如果我使用 USING
语句,我需要引用自定义代码的程序集(就像第一次尝试一样)在实际使用它之前,我就是。
谁能提供一些关于如何在 USQL 中正确引用 UDO 的指导,或者说明如何让 运行time 静默处理编码异常(并跳过它们)?
这是我的逻辑在提取器本身中的样子:
using System.Collections.Generic;
using System.IO;
using System.Text;
using Microsoft.Analytics.Interfaces;
namespace Utilities
{
[SqlUserDefinedExtractor(AtomicFileProcessing = true)]
public class ModifiedTextExtractor : IExtractor
{
//Contains the row
private readonly Encoding _encoding;
private readonly byte[] _row_delim;
private readonly char _col_delim;
public ModifiedTextExtractor()
{
_encoding = Encoding.UTF8;
_row_delim = _encoding.GetBytes("\r\n");
_col_delim = '\t';
}
public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
{
//Read the input line by line
foreach (var current in input.Split(_row_delim))
{
using (var reader = new StreamReader(current, this._encoding))
{
var line = reader.ReadToEnd().Trim();
//If there are any single or double quotes in the line, escape them
line = line.Replace(@"""", @"\""");
var count = 0;
//Split the input by the column delimiter
var parts = line.Split(_col_delim);
foreach (var part in parts)
{
output.Set<string>(count, part);
count += 1;
}
}
yield return output.AsReadOnly();
}
}
}
}
以及我如何尝试在 USQL 语句中使用它的片段(在将其注册为程序集之后):
REFERENCE ASSEMBLY [Utilities];
CREATE VIEW MyView AS ...
USING new Utilities.ModifiedTextExtractor();
谢谢!
解决此问题的另一种方法是使用 Azure SQL 数据仓库和支持拒绝行的 Polybase。
1) 在您的外部文件的 ADW 中创建外部 table:
CREATE EXTERNAL TABLE ext.mycsv (
colA INT NOT NULL,
colB INT NOT NULL,
colC INT NOT NULL
)
WITH (
DATA_SOURCE = eds_mycsv,
LOCATION = N'/myblobstorage/',
FILE_FORMAT = eff_csv,
REJECT_TYPE = VALUE,
REJECT_VALUE = 1
)
外部table可以指向单个文件或目录(多个文件与我的示例中的结构相同)。 reject_value 为 1 将允许一行失败而不会使整个作业失败。这也可以是一个百分比,即“允许 3% 的行失败而不会使整个加载失败。该语句还将为您提供有关失败行的信息。阅读更多关于 REJECT_TYPE
和 REJECT_VALUE
here.
ADW 的另一个优点是可以在不使用时暂停它。
2) 在ADW中创建一个内部table来实现它,例如
CREATE TABLE dbo.mycsv
WITH
(
CLUSTERED COLUMNSTORE INDEX,
DISTRIBUTION = ROUND_ROBIN
)
AS
SELECT * FROM ext.mycsv;
3) 使用 U-SQL 在 Azure Data Lake Analytics (ADLA) 中创建外部 table,使用外部数据源到 "query data where it lives",即在仓库中。
// Create external table which is in SQL DW
CREATE EXTERNAL TABLE IF NOT EXISTS adlaExt.mycsv
(
colA int,
colB int,
colC int
)
FROM ds_adw LOCATION "dbo.mycsv";
4) 在U-SQL中查询你的外部table,eg:
// Query external table
@e =
SELECT *
FROM dbo.mycsv;
// Join with internal table
@q =
SELECT a.*, b.someColumn
FROM @e AS a
INNER JOIN
dbo.someOtherTable AS b
ON a.colA == b.n_colA;
// Output it
OUTPUT @q TO "/output/output.csv"
USING Outputters.Csv();
可选择将其导入 ADLA。 Jorg Klein 在 ADLA here.
中的联合查询设置上有一篇很棒的博客 post
恕我直言,这比使用本机 Azure 组件创建自定义提取器要安全得多。 Polybase 尚不支持 ADLA,但几乎肯定会在未来的某个时候支持,届时可以简化设计。
您 运行 遇到了 VIEW 无法引用自定义代码的问题。在 U-SQL 中,所有对象都需要包含它们的上下文规范(例如在它们的主体中引用的程序集(这使对象更加独立,并避免了拉出潜在的长线依赖关系不为人所知的问题对象的用户)。
您需要做的是将 VIEW 转换为 Table 值函数:
CREATE FUNCTION MyFunct(/* optional parameters */) RETURNS @res AS
BEGIN
REFERENCE ASSEMBLY [Utilities];
@res = EXTRACT ... USING new Utilities.ModifiedTextExtractor();
END;
然后调用函数如下(注意你需要在SELECT
语句中提供行集别名):
@data = SELECT ... FROM MyFunct() AS f WHERE ...;
或者如果您不想应用投影或过滤器:
@data = MyFunct();
与视图一样,table 值函数将被内联。
我有一大组跨越几百个文件的数据。显然,它有一些编码问题(主要是 UTF-8,但显然有些字符无效)。根据 https://msdn.microsoft.com/en-us/library/azure/mt764098.aspx 如果存在编码错误,无论将静默标志设置为真(目的只是跳过错误行),都会发生 运行 时间错误。
因此,我需要编写一个自定义提取器。我写了一个主要做 https://blogs.msdn.microsoft.com/data_otaku/2016/10/27/a-fixed-width-extractor-for-azure-data-lake-analytics/ 示例的简化版本,因为它只需要一行,用定界符将其拆分,并且只是 returns try 块中的值。如果有任何异常,我只是处理它们然后继续。
不幸的是,我在 USQL 脚本本身中引用这个提取器时遇到了问题。当我遵循上述 link 的指导时,它建议在另一个程序集中编写逻辑,构建它,在 ADLS database/assemblies 中注册它,然后通过 REFERENCE ASSEMBLY MyExtractors;
将其包含在顶部脚本(因为这是使用的命名空间)。在下面的 Using 语句中,我用 USING new SimpleExtractor();
调用它 如果我这样做,当 运行 将脚本针对 type or namespace cannot be found
的 ADLS 服务时,我会收到错误消息。此外,如果我尝试更精确并在 using 语句中使用 USING new MyExtractors.SimpleExtractor();
,它会产生相同的错误,引用上面的 USING 语句。
然后我在 https://azure.microsoft.com/en-us/documentation/articles/data-lake-analytics-u-sql-develop-user-defined-operators/ 的旧源中找到了其他文档,该文档描述了在代码隐藏文件中做同样的事情。我删除了单独的程序集并将逻辑复制到该文件中的 class 中。步骤 #6 中的示例没有显示任何 REFERENCE ASSEMBLY
语句,但是当我 运行 它时,我又得到一个错误 type or namespace name cannot be found
。
查看最新的发行说明,希望这里有些东西已经过时了,我唯一看到的是,如果我使用 USING
语句,我需要引用自定义代码的程序集(就像第一次尝试一样)在实际使用它之前,我就是。
谁能提供一些关于如何在 USQL 中正确引用 UDO 的指导,或者说明如何让 运行time 静默处理编码异常(并跳过它们)?
这是我的逻辑在提取器本身中的样子:
using System.Collections.Generic;
using System.IO;
using System.Text;
using Microsoft.Analytics.Interfaces;
namespace Utilities
{
[SqlUserDefinedExtractor(AtomicFileProcessing = true)]
public class ModifiedTextExtractor : IExtractor
{
//Contains the row
private readonly Encoding _encoding;
private readonly byte[] _row_delim;
private readonly char _col_delim;
public ModifiedTextExtractor()
{
_encoding = Encoding.UTF8;
_row_delim = _encoding.GetBytes("\r\n");
_col_delim = '\t';
}
public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
{
//Read the input line by line
foreach (var current in input.Split(_row_delim))
{
using (var reader = new StreamReader(current, this._encoding))
{
var line = reader.ReadToEnd().Trim();
//If there are any single or double quotes in the line, escape them
line = line.Replace(@"""", @"\""");
var count = 0;
//Split the input by the column delimiter
var parts = line.Split(_col_delim);
foreach (var part in parts)
{
output.Set<string>(count, part);
count += 1;
}
}
yield return output.AsReadOnly();
}
}
}
}
以及我如何尝试在 USQL 语句中使用它的片段(在将其注册为程序集之后):
REFERENCE ASSEMBLY [Utilities];
CREATE VIEW MyView AS ...
USING new Utilities.ModifiedTextExtractor();
谢谢!
解决此问题的另一种方法是使用 Azure SQL 数据仓库和支持拒绝行的 Polybase。
1) 在您的外部文件的 ADW 中创建外部 table:
CREATE EXTERNAL TABLE ext.mycsv (
colA INT NOT NULL,
colB INT NOT NULL,
colC INT NOT NULL
)
WITH (
DATA_SOURCE = eds_mycsv,
LOCATION = N'/myblobstorage/',
FILE_FORMAT = eff_csv,
REJECT_TYPE = VALUE,
REJECT_VALUE = 1
)
外部table可以指向单个文件或目录(多个文件与我的示例中的结构相同)。 reject_value 为 1 将允许一行失败而不会使整个作业失败。这也可以是一个百分比,即“允许 3% 的行失败而不会使整个加载失败。该语句还将为您提供有关失败行的信息。阅读更多关于 REJECT_TYPE
和 REJECT_VALUE
here.
ADW 的另一个优点是可以在不使用时暂停它。
2) 在ADW中创建一个内部table来实现它,例如
CREATE TABLE dbo.mycsv
WITH
(
CLUSTERED COLUMNSTORE INDEX,
DISTRIBUTION = ROUND_ROBIN
)
AS
SELECT * FROM ext.mycsv;
3) 使用 U-SQL 在 Azure Data Lake Analytics (ADLA) 中创建外部 table,使用外部数据源到 "query data where it lives",即在仓库中。
// Create external table which is in SQL DW
CREATE EXTERNAL TABLE IF NOT EXISTS adlaExt.mycsv
(
colA int,
colB int,
colC int
)
FROM ds_adw LOCATION "dbo.mycsv";
4) 在U-SQL中查询你的外部table,eg:
// Query external table
@e =
SELECT *
FROM dbo.mycsv;
// Join with internal table
@q =
SELECT a.*, b.someColumn
FROM @e AS a
INNER JOIN
dbo.someOtherTable AS b
ON a.colA == b.n_colA;
// Output it
OUTPUT @q TO "/output/output.csv"
USING Outputters.Csv();
可选择将其导入 ADLA。 Jorg Klein 在 ADLA here.
中的联合查询设置上有一篇很棒的博客 post恕我直言,这比使用本机 Azure 组件创建自定义提取器要安全得多。 Polybase 尚不支持 ADLA,但几乎肯定会在未来的某个时候支持,届时可以简化设计。
您 运行 遇到了 VIEW 无法引用自定义代码的问题。在 U-SQL 中,所有对象都需要包含它们的上下文规范(例如在它们的主体中引用的程序集(这使对象更加独立,并避免了拉出潜在的长线依赖关系不为人所知的问题对象的用户)。
您需要做的是将 VIEW 转换为 Table 值函数:
CREATE FUNCTION MyFunct(/* optional parameters */) RETURNS @res AS
BEGIN
REFERENCE ASSEMBLY [Utilities];
@res = EXTRACT ... USING new Utilities.ModifiedTextExtractor();
END;
然后调用函数如下(注意你需要在SELECT
语句中提供行集别名):
@data = SELECT ... FROM MyFunct() AS f WHERE ...;
或者如果您不想应用投影或过滤器:
@data = MyFunct();
与视图一样,table 值函数将被内联。