在 ASA 中使用 JOIN 导致不匹配

Usage of JOIN in ASA caused mismatch

我在我的 ASA 代码中遇到了一个案例。这个想法是匹配两条记录 Record1 和 Record2。在这两条记录中,都有key1和key2。 Record2中,如果key2不为null,则将Record2按key1和key2分组,形成一个新的tableRecord2WithKey2。如果key2为null,我们将Record2按key1分组,形成一个新的tableRecord2WithoutKey2。在tableRecord2WithKey2和Record2WithoutKey2中,如果两个table中的任意两条记录分别有相同的key1,则需要合并并保留tableRecord2WithKey2中的key2;否则,记录保持不变。经过上面的步骤,我们就有了一个新的tableFinalRecord2。然后我们将 Record1 与 FinalRecord2 进行匹配。如果我们从 FinalRecord2 中找到 key1 和 key2 都匹配的匹配项,则 Record1 中的记录被标记为有效。如果 key1 匹配并且来自 FinalRecord2 的 key2 为空,则来自 Record1 的记录被标记为无效。然后将结果输出为 table。这是我的代码:

With Record2WithKey2 AS (
  SELECT [key1], [key2], COLLECT([values]) [values]
  FROM Record2
  WHERE key2 IS NOT NULL
  GROUP BY System.Timestamp(), key1, key2
),
Record2WithoutKey2 AS (
  SELECT [key1], '0' [key2], COLLECT([values]) [values]
  FROM Record2
  WHERE key2 IS NULL
  GROUP BY System.Timestamp(), key1
),
FinalRecord2 AS (
  SELECT e1.key1 [key1], e1.key2 [key2], UDF.ConcatArrays(e1.values, e2.values) [values]
  FROM Record2WithKey2 e1
  LEFT OUTER JOIN Record2WithoutKey2 e2
  ON DATEDIFF(second, e1, e2) = 0
  AND e1.key1 = e2.key1
  
  UNION ALL
  
  SELECT e2.key1 [key1], e2.key2 [key2], e2.values [values]
  FROM Record2WithoutKey2 e2
  LEFT OUTER JOIN Record2WithtKey2 e1
  ON DATEDIFF(second, e1, e2) = 0
  AND e1.key1 = e2.key1
  WHERE e1.key1 IS NULL
),
JoinedRecord AS (
  SELECT r1.*, r2.values [values], 'Valid' [Result]
  FROM FinalRecord2 r2
  INNER JOIN Record1 r1
  ON DATEDIFF(second, r1, r2) = 0
  AND r1.key1 = r2.key1
  AND r1.key2 = r2.key2

  UNION ALL

  SELECT r1.*, r2.values [values], 'Invalid' [Result]
  FROM FinalRecord2 r2
  INNER JOIN Record1 r1
  ON DATEDIFF(second, r1, r2) = 0
  AND r1.key1 = r2.key1
  AND r2.key2 = '0'
)
SELECT * 
INTO Output
From JoinedRecord

记录1:

[
    {
        "key1": "value1",
        "key2": "value2",
    },
    {
        "key1": "value3",
        "key2": "value4",
    },
    {
        "key1": "value5",
        "key2": "value6",
    },
    {
        "key1": "value5",
        "key2": "value7",
    },
    {
        "key1": "value8",
        "key2": "value9",
    }
]

记录 2:

[
    {
        "key1": "value1",
        "key2": "value2",
        "values": 1
    },
    {
        "key1": "value3",
        "key2": null,
        "values": 2
    },
    {
        "key1": "value5",
        "key2": "value6",
        "values": 3
    },
    {
        "key1": "value5",
        "key2": null,
        "values": 4
    },
    {
        "key1": "value5",
        "key2": "value7",
        "values": 5
    }
]

预期输出:

[
    {
        "key1": "value1",
        "key2": "value2",
        "values": [1],
        "Result": "Valid"
    },
    {
        "key1": "value3",
        "key2": "value4",
        "values": [2],
        "Result": "Inalid"
    },
    {
        "key1": "value5",
        "key2": "value6",
        "values": [3,4],
        "Result": "Valid"
    },
    {
        "key1": "value5",
        "key2": "value7",
        "values": [4,5],
        "Result": "Valid"
    }
]

但我实际上得到了:

{
    "key1": "value5",
    "key2": "value6",
    "values": [3,4],
    "Result": "Valid"
},
{
    "key1": "value5",
    "key2": "value7",
    "values": [4,5],
    "Result": "Valid"
}

有人知道我的 ASA 代码有什么问题吗?任何建议将不胜感激。谢谢!

已更新以适应:

  • key1在record1中可以有多个key2
  • key1在record2中可以有多个key2,包括null

添加上述约束迫使我再次使用 UNION。这样我就能够了解您的查询中发生了什么。

ASA 是一个流处理器。这意味着查询的每个步骤都发生在时间轴上。每个阻塞操作,如 GROUP BY 或 JOIN,都需要一个时间组件。有关详细信息,请参阅 that doc

但在纸面上,当在给定时间戳操作时,我们应该能够忘记时间线。如果我们用 snapshot window, and set the DATEDIFF at 0 for joins 聚合,那么整个查询应该在相同的时间跨度内执行,并且表现得像标准 SQL 数据库中的普通 SQL 查询。

在现实中,某些操作可能会对它们处理的记录的时间戳产生影响。这是由于 LEFT JOIN。在 ASA 中,所有连接都是暂时的。假设我们正在尝试使用 A LEFT JOIN B ON DATEDIFF(second,A,B) = 0 连接 2 个数据流 A 和 B。当我们从 A 获得记录时,我们将开始等待来自 B 的所有记录的预期持续时间 - 此处为 0 秒。假设 none 到达此处。当时钟移动到下一个滴答声(此处为下一秒)时,我们宣布周期结束并可以从 A 中释放那个孤独的记录。但是我们需要更新它的时间戳:它是在向前移动一个滴答声的新时钟时间发出的(1 秒)。

这是第一个有效的查询,尽管使用了 LEFT JOIN:


WITH
-- Cleaning and timestamp
Record1preprocessed AS (
    SELECT
        CAST([key1] AS NVARCHAR(MAX)) AS key1,
        CAST([key2] AS NVARCHAR(MAX)) AS key2
    FROM record1 TIMESTAMP BY appTimeStamp
),

-- Cleaning and timestamp
Record2preprocessed AS (
    SELECT
        CAST([key1] AS NVARCHAR(MAX)) AS key1,
        CAST([key2] AS NVARCHAR(MAX)) AS key2,
        CAST([values] AS BIGINT) AS [values]
    FROM record2 TIMESTAMP BY appTimeStamp
),

Record2WithKey2 AS (
    SELECT
        [key1],
        [key2],
        COLLECT([values]) AS [values],
        System.Timestamp() AS ts
    FROM Record2preprocessed
    WHERE key2 IS NOT NULL
    GROUP BY System.Timestamp(), key1, key2
),

Record2WithoutKey2 AS (
    SELECT
        [key1],
        '0' AS [key2],
        COLLECT([values]) [values],
        System.Timestamp() AS ts
    FROM Record2preprocessed
    WHERE key2 IS NULL
    GROUP BY System.Timestamp(), key1
),

FinalRecord2 AS (
    -- Records with key2 are joined with records without to merge their arrays
    SELECT
        e1.key1,
        e1.key2,
        UDF.ConcatArrays(e1.[values], e2.[values]) AS [values],
        System.Timestamp() AS ts
    FROM Record2WithKey2 e1
    LEFT JOIN Record2WithoutKey2 e2
    ON  DATEDIFF(second, e1, e2) = 0
        AND e1.key1 = e2.key1

    UNION ALL

    -- Records without key2 are joined with records with key2 to filter them out (they are already in the first subquery above)
    -- Here we only want the key1 that never have a key2
    SELECT
        e2.key1,
        e2.key2,
        e2.[values] [values],
        System.Timestamp() AS ts
    FROM Record2WithoutKey2 e2
    LEFT JOIN Record2WithKey2 e1
    ON  DATEDIFF(second, e1, e2) = 0
        AND e1.key1 = e2.key1
    WHERE e1.key1 IS NULL
)

,JoinedRecord AS (
    SELECT
        r1.key1,
        r1.key2,
        r2.[values],
        CASE WHEN r1.key2 = r2.key2 THEN 'Valid' ELSE 'Invalid' END AS [Result],
        System.Timestamp() AS ts
    FROM Record1preprocessed r1
    LEFT JOIN FinalRecord2 r2
    ON  DATEDIFF(second, r1, r2) BETWEEN 0 AND 1
        AND r1.key1 = r2.key1
    WHERE
        (r2.key2 = '0')
        OR (r1.key2 = r2.key2)
)

SELECT * INTO OutputFinal FROM JoinedRecord

它使用 JavaScript UDF:

function main(arg1, arg2) {
    if (arg2 == null) {return arg1;}
    else {return arg1.concat(arg2);}
}

您会发现它看起来很像您的。在大多数情况下是这样。 我添加了一些 pre-processing 步骤来添加显式时间戳。这不应该是绝对必要的,但我想确定一下。

新的输入记录是:

  • 记录1
{"key1": "value1","key2": "value2","appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value3","key2": "value4","appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": "value6","appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": "value7","appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value8","key2": "value9","appTimeStamp":"2022-02-02T00:00:00"}
  • 记录2
{"key1": "value1","key2": "value2","values": 1,"appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value3","key2": null,"values": 2,"appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": "value6","values": 3,"appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": null,"values": 4,"appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": "value7","values": 5,"appTimeStamp":"2022-02-02T00:00:00"}

我向每个 CTE 添加了 SYSTEM.TIMESTAMP(),以便能够调试发生偏移的位置。

如果你这样做,你会注意到 FinalRecord2 给你:

{"key1":"value5","key2":"value7","values":[5,4],"ts":"2022-02-02T00:00:00.0000000Z"}
{"key1":"value5","key2":"value6","values":[3,4],"ts":"2022-02-02T00:00:00.0000000Z"}
{"key1":"value1","key2":"value2","values":[1],"ts":"2022-02-02T00:00:01.0000000Z"}
{"key1":"value3","key2":"0","values":[2],"ts":"2022-02-02T00:00:01.0000000Z"}

注意 LEFT JOIN 中必须超时的记录的时间戳如何提前 1 秒。

这可以通过在 JoinedRecord 的 JOIN 条件中给予一些余地来解决:DATEDIFF(second, r1, r2) BETWEEN 0 AND 1.

这反过来会给你预期的结果:

{"key1":"value5","key2":"value7","values":[5,4],"Result":"Valid","ts":"2022-02-02T00:00:00.0000000Z"}
{"key1":"value5","key2":"value6","values":[3,4],"Result":"Valid","ts":"2022-02-02T00:00:00.0000000Z"}
{"key1":"value1","key2":"value2","values":[1],"Result":"Valid","ts":"2022-02-02T00:00:01.0000000Z"}
{"key1":"value3","key2":"value4","values":[2],"Result":"Invalid","ts":"2022-02-02T00:00:01.0000000Z"}

这里添加SYSTEM.TIMESTAMP()是调试此类问题时绝对必须的。 这种行为不会发生在 INNER JOIN 中。我将尝试更新查询以删除左连接。


更新前

这是我对这个问题的看法:

WITH

-- First we regroup record2 to have only one row per key1. We also replace null key2 by a hardcoded value
Record2Collected AS (
    SELECT
        [key1],
        COLLECT(CASE WHEN [key2] IS NULL THEN '0' ELSE [key2] END) AS [key2],
        COLLECT([values]) [values]
    FROM Record2
    GROUP BY System.Timestamp(), key1
)

-- Then we re-expend all key combinations, but this time they will have all the values each time. Then we filter: either there's a single key2 (the array of key2 has a single value) or we want the key2 that's not null
,Record2Processed AS (
    SELECT
        R.key1,
        C.ArrayValue AS key2,
        R.[values]
    FROM Record2Collected R
    CROSS APPLY GetArrayElements(R.[key2]) AS C
    WHERE
        GetArrayLength(R.key2) = 1 -- Single key2 for a key1
        OR
        C.ArrayValue != '0' -- Multiple key2 for a key1, we remove null values
)

--Then we match Record1 with Record2Processed. We do it in a single join to avoid duplications.
,JoinedRecord AS (
    SELECT
        r1.key1,
        r1.key2,
        r2.[values],
        CASE WHEN r1.key2 = r2.key2 THEN 'Valid' ELSE 'Invalid' END AS [Result]
    FROM Record2Processed r2
    LEFT JOIN Record1 r1 -- We're expecting r1 to have unique key1s here
    ON DATEDIFF(second, r1, r2) = 0 -- All records are supposed to be on the same event time, see comment below
    AND r1.key1 = r2.key1
)

SELECT * INTO myOutput FROM JoinedRecord

哪个会给你:

{"key1":"value3","key2":"value4","values":[2],"Result":"Invalid"}
{"key1":"value1","key2":"value2","values":[1],"Result":"Valid"}
{"key1":"value5","key2":"value6","values":[3,4],"Result":"Valid"}

以下是我持谨慎态度的一些方面:

  • 我们正在使用 snapshot windows, and not using a TIMESTAMP BY, which means that the logic will apply to groups of records landing in the streaming input at the same time. This feels a bit risky. Is it the expected behavior? I would either define a time window, or use an application time
  • 在 Record2Processed 中,我们将对具有多个非空 key2 的 key1 重复值。如果发生这种情况,我们需要更新查询逻辑
  • 在 JoinedRecord 中,我们必须仅在 key1 上从 r2 连接到 r1,这样我们就不会丢失空 key2。如果r1中key1可以有多个key2,我们需要更新逻辑。此外,如果某些组合键可以在 r1 中找到而在 r2 中找不到,则它们当前将丢失。我们还需要更新逻辑

请让我知道是否会出现上述某些情况,我会更新查询逻辑。