Azure 数据湖 u-sql 枢轴

azure data lake u-sql pivot

我喜欢 Azure Data Lake,但缺乏文档可能会减慢采用速度。我希望有人比我有更多关于 U-SQL 的经验。

尝试从 Microsoft.Analytics.Interfaces 下可用的内容和通过 U-SQL 解释器导出,但运气不佳。动态 sql 似乎不支持在 运行 时间定义行集的架构,并且 IUpdatableRow 的架构是只读的,因此处理器方法不可行。而且 U-SQL.

中没有开箱即用的 PIVOT 功能

我也想也许我可以一起处理行集并编写一个自定义输出器来进行数据透视但无法弄清楚。

可能有一种非常简单的方法可以做到这一点,因为它是标准的枢轴操作。对于不确定数量的 ColA 和 ColB 值,您将如何以高效的方式将行集从 I 重塑为 II?

|ColA |ColB |ColC|
|1    |A    |30  |
|1    |B    |70  |
|1    |ZA   |12  |
|2    |C    |22  |
|2    |A    |13  |

|ID   |A    |B    |C   |...... |ZA   |.....
|1    |30   |70   |0   |       |12   |
|2    |13   |0    |22  |...... |0    |.....

您有多种选择来执行这样的 PIVOT

这是一个使用 U-SQL MAP 数据类型(称为 SQL.MAP)。对于缺失值,它将 return null 而不是 0(使用空合并表达式将其变为 0)这将在以下条件下起作用:

  1. 生成的 MAP 保持在 4MB 的行大小限制内。如果不, 请参阅下一个解决方案。
  2. 你提前知道你有哪些专栏 (如果没有,只需将数据保留在地图列中并提取为 需要)。

地图解决方案:

@t = SELECT *
     FROM(
        VALUES
        ( 1, "A", 30 ),
        ( 1, "B", 70 ),
        ( 1, "ZA", 12 ),
        ( 2, "C", 22 ),
        ( 2, "A", 13 ),
        ( 2, "ABC", 42)
     ) AS T(ColA, ColB, ColC);

@m = SELECT ColA AS [ID],
            MAP_AGG(ColB, (int?) ColC) AS m
     FROM @t
     GROUP BY ColA;

@r =
    SELECT [ID],
           m["A"]AS A,
           m["B"]AS B,
           m["C"]AS C,
           m["ZA"]AS [ZA],
           m["ABC"]AS [ABC]
    FROM @m;

OUTPUT @r
TO "/output/pivot1.csv"
USING Outputters.Csv();

这是一个确实使用标准 SQL 枢轴变通模式的解决方案(一些 SQL 数据库实现实际上用于在内部将 PIVOT 表达式转换为这样的表达式,并且可能仍然去做)。同样,您必须提前了解所有列。如果不是这种情况,只需使用 MAP 数据类型。

@t =
    SELECT *
    FROM(
        VALUES
        ( 1, "A", 30 ),
        ( 1, "B", 70 ),
        ( 1, "ZA", 12 ),
        ( 2, "C", 22 ),
        ( 2, "A", 13 ),
        ( 2, "ABC", 42)
    ) AS T(ColA, ColB, ColC);

@r =
    SELECT ColA AS [ID],
           (ColB == "A") ? ColC : 0 AS A,
           (ColB == "B") ? ColC : 0 AS B,
           (ColB == "C") ? ColC : 0 AS C,
           (ColB == "ZA") ? ColC : 0 AS [ZA],
           (ColB == "ABC") ? ColC : 0 AS [ABC]
    FROM @t;

@r =
    SELECT DISTINCT [ID],
           LAST_VALUE(A) OVER(PARTITION BY [ID] ORDER BY A) AS A,
           LAST_VALUE(B) OVER(PARTITION BY [ID] ORDER BY B) AS B,
           LAST_VALUE(C) OVER(PARTITION BY [ID] ORDER BY C) AS C,
           LAST_VALUE([ZA]) OVER(PARTITION BY [ID] ORDER BY [ZA]) AS [ZA],
           LAST_VALUE([ABC]) OVER(PARTITION BY [ID] ORDER BY [ABC]) AS [ABC]
    FROM @r;

OUTPUT @r
TO "/output/pivot2.csv"
USING Outputters.Csv();

请注意 PIVOT / UNPIVOT 语法已添加到 U-SQL March 2017

使用上面的示例数据:

@t = SELECT *
     FROM(
        VALUES
        ( 1, "A", 30 ),
        ( 1, "B", 70 ),
        ( 1, "ZA", 12 ),
        ( 2, "C", 22 ),
        ( 2, "A", 13 ),
        ( 2, "ABC", 42)
     ) AS T(ColA, ColB, ColC);


@p =
    SELECT Column_0 AS id, Column_1 AS a
    FROM @t
      PIVOT (MAX(ColC) FOR ColB IN ("A" AS [A], "B" AS [B], "C" AS [C], "ZA" AS [ZA], "ABC" AS [ABC])
           ) AS pvt;


OUTPUT @p
TO "/output/pivot3.csv"
USING Outputters.Csv();

这是我的团队成员针对我们事先不知道列数的情况提出的一种解决方法。

@t = SELECT *
     FROM(
        VALUES
        ( 1, "A", 30 ),
        ( 1, "B", 70 ),
        ( 1, "ZA", 12 ),
        ( 2, "C", 22 ),
        ( 2, "A", 13 ),
        ( 2, "ABC", 42)
     ) AS T(ColA, ColB, ColC);

@t1 =
    SELECT DISTINCT ColB
    FROM @t
ORDER BY ColB DESC
OFFSET 0 ROW;

@t1 =
    SELECT ARRAY_AGG(ColB) AS ColBArray
    FROM @t1;

@result =
    SELECT ColA,
           MAP_AGG(ColB, (int?) ColC) AS ColCMap
    FROM @t
    GROUP BY ColA;

@result =
    SELECT a.ColA,
           DPivotNS.DPivot.FillGapsAndConvert(a.ColCMap, b.ColBArray) AS Values
    FROM @result AS a
         CROSS JOIN
             @t1 AS b;

@result =
    SELECT ColA,
           ArrayColumn
    FROM
    (
    SELECT 0 AS ColA,
           ColBArray AS ArrayColumn,
           0 AS Ord
    FROM @t1
    UNION ALL
    SELECT ColA AS ColA,
           Values AS ArrayColumn,
           1 AS Ord
    FROM @result
    ) AS rs1
ORDER BY rs1.Ord
OFFSET 0 ROWS;

@result =
    SELECT ColA,
           String.Join(",", ArrayColumn) AS Values
    FROM @result;


OUTPUT @result
TO "result.csv"
USING Outputters.Csv(quoting:false);

这是上面脚本的 UDF:

    public static SqlArray<string> FillGapsAndConvert (SqlMap<string, int?> ColCMap, SqlArray<string> ColDArray)
        {
        var list = new LinkedList<string> ();
        foreach ( string colD in ColDArray )
            {
            int? currentCount = ColCMap[colD];
            int newCount = currentCount.HasValue ? currentCount.Value : 0;
            list.AddLast (newCount.ToString ());
            }
        return new SqlArray<string> (list);
        }