在 ASA 中使用 JOIN 导致不匹配
Usage of JOIN in ASA caused mismatch
我在我的 ASA 代码中遇到了一个案例。这个想法是匹配两条记录 Record1 和 Record2。在这两条记录中,都有key1和key2。 Record2中,如果key2不为null,则将Record2按key1和key2分组,形成一个新的tableRecord2WithKey2。如果key2为null,我们将Record2按key1分组,形成一个新的tableRecord2WithoutKey2。在tableRecord2WithKey2和Record2WithoutKey2中,如果两个table中的任意两条记录分别有相同的key1,则需要合并并保留tableRecord2WithKey2中的key2;否则,记录保持不变。经过上面的步骤,我们就有了一个新的tableFinalRecord2。然后我们将 Record1 与 FinalRecord2 进行匹配。如果我们从 FinalRecord2 中找到 key1 和 key2 都匹配的匹配项,则 Record1 中的记录被标记为有效。如果 key1 匹配并且来自 FinalRecord2 的 key2 为空,则来自 Record1 的记录被标记为无效。然后将结果输出为 table。这是我的代码:
With Record2WithKey2 AS (
SELECT [key1], [key2], COLLECT([values]) [values]
FROM Record2
WHERE key2 IS NOT NULL
GROUP BY System.Timestamp(), key1, key2
),
Record2WithoutKey2 AS (
SELECT [key1], '0' [key2], COLLECT([values]) [values]
FROM Record2
WHERE key2 IS NULL
GROUP BY System.Timestamp(), key1
),
FinalRecord2 AS (
SELECT e1.key1 [key1], e1.key2 [key2], UDF.ConcatArrays(e1.values, e2.values) [values]
FROM Record2WithKey2 e1
LEFT OUTER JOIN Record2WithoutKey2 e2
ON DATEDIFF(second, e1, e2) = 0
AND e1.key1 = e2.key1
UNION ALL
SELECT e2.key1 [key1], e2.key2 [key2], e2.values [values]
FROM Record2WithoutKey2 e2
LEFT OUTER JOIN Record2WithtKey2 e1
ON DATEDIFF(second, e1, e2) = 0
AND e1.key1 = e2.key1
WHERE e1.key1 IS NULL
),
JoinedRecord AS (
SELECT r1.*, r2.values [values], 'Valid' [Result]
FROM FinalRecord2 r2
INNER JOIN Record1 r1
ON DATEDIFF(second, r1, r2) = 0
AND r1.key1 = r2.key1
AND r1.key2 = r2.key2
UNION ALL
SELECT r1.*, r2.values [values], 'Invalid' [Result]
FROM FinalRecord2 r2
INNER JOIN Record1 r1
ON DATEDIFF(second, r1, r2) = 0
AND r1.key1 = r2.key1
AND r2.key2 = '0'
)
SELECT *
INTO Output
From JoinedRecord
记录1:
[
{
"key1": "value1",
"key2": "value2",
},
{
"key1": "value3",
"key2": "value4",
},
{
"key1": "value5",
"key2": "value6",
},
{
"key1": "value5",
"key2": "value7",
},
{
"key1": "value8",
"key2": "value9",
}
]
记录 2:
[
{
"key1": "value1",
"key2": "value2",
"values": 1
},
{
"key1": "value3",
"key2": null,
"values": 2
},
{
"key1": "value5",
"key2": "value6",
"values": 3
},
{
"key1": "value5",
"key2": null,
"values": 4
},
{
"key1": "value5",
"key2": "value7",
"values": 5
}
]
预期输出:
[
{
"key1": "value1",
"key2": "value2",
"values": [1],
"Result": "Valid"
},
{
"key1": "value3",
"key2": "value4",
"values": [2],
"Result": "Inalid"
},
{
"key1": "value5",
"key2": "value6",
"values": [3,4],
"Result": "Valid"
},
{
"key1": "value5",
"key2": "value7",
"values": [4,5],
"Result": "Valid"
}
]
但我实际上得到了:
{
"key1": "value5",
"key2": "value6",
"values": [3,4],
"Result": "Valid"
},
{
"key1": "value5",
"key2": "value7",
"values": [4,5],
"Result": "Valid"
}
有人知道我的 ASA 代码有什么问题吗?任何建议将不胜感激。谢谢!
已更新以适应:
- key1在record1中可以有多个key2
- key1在record2中可以有多个key2,包括null
添加上述约束迫使我再次使用 UNION。这样我就能够了解您的查询中发生了什么。
ASA 是一个流处理器。这意味着查询的每个步骤都发生在时间轴上。每个阻塞操作,如 GROUP BY 或 JOIN,都需要一个时间组件。有关详细信息,请参阅 that doc。
但在纸面上,当在给定时间戳操作时,我们应该能够忘记时间线。如果我们用 snapshot window, and set the DATEDIFF at 0 for joins 聚合,那么整个查询应该在相同的时间跨度内执行,并且表现得像标准 SQL 数据库中的普通 SQL 查询。
在现实中,某些操作可能会对它们处理的记录的时间戳产生影响。这是由于 LEFT JOIN。在 ASA 中,所有连接都是暂时的。假设我们正在尝试使用 A LEFT JOIN B ON DATEDIFF(second,A,B) = 0
连接 2 个数据流 A 和 B。当我们从 A 获得记录时,我们将开始等待来自 B 的所有记录的预期持续时间 - 此处为 0 秒。假设 none 到达此处。当时钟移动到下一个滴答声(此处为下一秒)时,我们宣布周期结束并可以从 A 中释放那个孤独的记录。但是我们需要更新它的时间戳:它是在向前移动一个滴答声的新时钟时间发出的(1 秒)。
这是第一个有效的查询,尽管使用了 LEFT JOIN:
WITH
-- Cleaning and timestamp
Record1preprocessed AS (
SELECT
CAST([key1] AS NVARCHAR(MAX)) AS key1,
CAST([key2] AS NVARCHAR(MAX)) AS key2
FROM record1 TIMESTAMP BY appTimeStamp
),
-- Cleaning and timestamp
Record2preprocessed AS (
SELECT
CAST([key1] AS NVARCHAR(MAX)) AS key1,
CAST([key2] AS NVARCHAR(MAX)) AS key2,
CAST([values] AS BIGINT) AS [values]
FROM record2 TIMESTAMP BY appTimeStamp
),
Record2WithKey2 AS (
SELECT
[key1],
[key2],
COLLECT([values]) AS [values],
System.Timestamp() AS ts
FROM Record2preprocessed
WHERE key2 IS NOT NULL
GROUP BY System.Timestamp(), key1, key2
),
Record2WithoutKey2 AS (
SELECT
[key1],
'0' AS [key2],
COLLECT([values]) [values],
System.Timestamp() AS ts
FROM Record2preprocessed
WHERE key2 IS NULL
GROUP BY System.Timestamp(), key1
),
FinalRecord2 AS (
-- Records with key2 are joined with records without to merge their arrays
SELECT
e1.key1,
e1.key2,
UDF.ConcatArrays(e1.[values], e2.[values]) AS [values],
System.Timestamp() AS ts
FROM Record2WithKey2 e1
LEFT JOIN Record2WithoutKey2 e2
ON DATEDIFF(second, e1, e2) = 0
AND e1.key1 = e2.key1
UNION ALL
-- Records without key2 are joined with records with key2 to filter them out (they are already in the first subquery above)
-- Here we only want the key1 that never have a key2
SELECT
e2.key1,
e2.key2,
e2.[values] [values],
System.Timestamp() AS ts
FROM Record2WithoutKey2 e2
LEFT JOIN Record2WithKey2 e1
ON DATEDIFF(second, e1, e2) = 0
AND e1.key1 = e2.key1
WHERE e1.key1 IS NULL
)
,JoinedRecord AS (
SELECT
r1.key1,
r1.key2,
r2.[values],
CASE WHEN r1.key2 = r2.key2 THEN 'Valid' ELSE 'Invalid' END AS [Result],
System.Timestamp() AS ts
FROM Record1preprocessed r1
LEFT JOIN FinalRecord2 r2
ON DATEDIFF(second, r1, r2) BETWEEN 0 AND 1
AND r1.key1 = r2.key1
WHERE
(r2.key2 = '0')
OR (r1.key2 = r2.key2)
)
SELECT * INTO OutputFinal FROM JoinedRecord
它使用 JavaScript UDF:
function main(arg1, arg2) {
if (arg2 == null) {return arg1;}
else {return arg1.concat(arg2);}
}
您会发现它看起来很像您的。在大多数情况下是这样。
我添加了一些 pre-processing 步骤来添加显式时间戳。这不应该是绝对必要的,但我想确定一下。
新的输入记录是:
- 记录1
{"key1": "value1","key2": "value2","appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value3","key2": "value4","appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": "value6","appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": "value7","appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value8","key2": "value9","appTimeStamp":"2022-02-02T00:00:00"}
- 记录2
{"key1": "value1","key2": "value2","values": 1,"appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value3","key2": null,"values": 2,"appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": "value6","values": 3,"appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": null,"values": 4,"appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": "value7","values": 5,"appTimeStamp":"2022-02-02T00:00:00"}
我向每个 CTE 添加了 SYSTEM.TIMESTAMP()
,以便能够调试发生偏移的位置。
如果你这样做,你会注意到 FinalRecord2 给你:
{"key1":"value5","key2":"value7","values":[5,4],"ts":"2022-02-02T00:00:00.0000000Z"}
{"key1":"value5","key2":"value6","values":[3,4],"ts":"2022-02-02T00:00:00.0000000Z"}
{"key1":"value1","key2":"value2","values":[1],"ts":"2022-02-02T00:00:01.0000000Z"}
{"key1":"value3","key2":"0","values":[2],"ts":"2022-02-02T00:00:01.0000000Z"}
注意 LEFT JOIN 中必须超时的记录的时间戳如何提前 1 秒。
这可以通过在 JoinedRecord 的 JOIN 条件中给予一些余地来解决:DATEDIFF(second, r1, r2) BETWEEN 0 AND 1
.
这反过来会给你预期的结果:
{"key1":"value5","key2":"value7","values":[5,4],"Result":"Valid","ts":"2022-02-02T00:00:00.0000000Z"}
{"key1":"value5","key2":"value6","values":[3,4],"Result":"Valid","ts":"2022-02-02T00:00:00.0000000Z"}
{"key1":"value1","key2":"value2","values":[1],"Result":"Valid","ts":"2022-02-02T00:00:01.0000000Z"}
{"key1":"value3","key2":"value4","values":[2],"Result":"Invalid","ts":"2022-02-02T00:00:01.0000000Z"}
这里添加SYSTEM.TIMESTAMP()
是调试此类问题时绝对必须的。
这种行为不会发生在 INNER JOIN 中。我将尝试更新查询以删除左连接。
更新前
这是我对这个问题的看法:
WITH
-- First we regroup record2 to have only one row per key1. We also replace null key2 by a hardcoded value
Record2Collected AS (
SELECT
[key1],
COLLECT(CASE WHEN [key2] IS NULL THEN '0' ELSE [key2] END) AS [key2],
COLLECT([values]) [values]
FROM Record2
GROUP BY System.Timestamp(), key1
)
-- Then we re-expend all key combinations, but this time they will have all the values each time. Then we filter: either there's a single key2 (the array of key2 has a single value) or we want the key2 that's not null
,Record2Processed AS (
SELECT
R.key1,
C.ArrayValue AS key2,
R.[values]
FROM Record2Collected R
CROSS APPLY GetArrayElements(R.[key2]) AS C
WHERE
GetArrayLength(R.key2) = 1 -- Single key2 for a key1
OR
C.ArrayValue != '0' -- Multiple key2 for a key1, we remove null values
)
--Then we match Record1 with Record2Processed. We do it in a single join to avoid duplications.
,JoinedRecord AS (
SELECT
r1.key1,
r1.key2,
r2.[values],
CASE WHEN r1.key2 = r2.key2 THEN 'Valid' ELSE 'Invalid' END AS [Result]
FROM Record2Processed r2
LEFT JOIN Record1 r1 -- We're expecting r1 to have unique key1s here
ON DATEDIFF(second, r1, r2) = 0 -- All records are supposed to be on the same event time, see comment below
AND r1.key1 = r2.key1
)
SELECT * INTO myOutput FROM JoinedRecord
哪个会给你:
{"key1":"value3","key2":"value4","values":[2],"Result":"Invalid"}
{"key1":"value1","key2":"value2","values":[1],"Result":"Valid"}
{"key1":"value5","key2":"value6","values":[3,4],"Result":"Valid"}
以下是我持谨慎态度的一些方面:
- 我们正在使用 snapshot windows, and not using a TIMESTAMP BY, which means that the logic will apply to groups of records landing in the streaming input at the same time. This feels a bit risky. Is it the expected behavior? I would either define a time window, or use an application time。
- 在 Record2Processed 中,我们将对具有多个非空 key2 的 key1 重复值。如果发生这种情况,我们需要更新查询逻辑
- 在 JoinedRecord 中,我们必须仅在 key1 上从 r2 连接到 r1,这样我们就不会丢失空 key2。如果r1中key1可以有多个key2,我们需要更新逻辑。此外,如果某些组合键可以在 r1 中找到而在 r2 中找不到,则它们当前将丢失。我们还需要更新逻辑
请让我知道是否会出现上述某些情况,我会更新查询逻辑。
我在我的 ASA 代码中遇到了一个案例。这个想法是匹配两条记录 Record1 和 Record2。在这两条记录中,都有key1和key2。 Record2中,如果key2不为null,则将Record2按key1和key2分组,形成一个新的tableRecord2WithKey2。如果key2为null,我们将Record2按key1分组,形成一个新的tableRecord2WithoutKey2。在tableRecord2WithKey2和Record2WithoutKey2中,如果两个table中的任意两条记录分别有相同的key1,则需要合并并保留tableRecord2WithKey2中的key2;否则,记录保持不变。经过上面的步骤,我们就有了一个新的tableFinalRecord2。然后我们将 Record1 与 FinalRecord2 进行匹配。如果我们从 FinalRecord2 中找到 key1 和 key2 都匹配的匹配项,则 Record1 中的记录被标记为有效。如果 key1 匹配并且来自 FinalRecord2 的 key2 为空,则来自 Record1 的记录被标记为无效。然后将结果输出为 table。这是我的代码:
With Record2WithKey2 AS (
SELECT [key1], [key2], COLLECT([values]) [values]
FROM Record2
WHERE key2 IS NOT NULL
GROUP BY System.Timestamp(), key1, key2
),
Record2WithoutKey2 AS (
SELECT [key1], '0' [key2], COLLECT([values]) [values]
FROM Record2
WHERE key2 IS NULL
GROUP BY System.Timestamp(), key1
),
FinalRecord2 AS (
SELECT e1.key1 [key1], e1.key2 [key2], UDF.ConcatArrays(e1.values, e2.values) [values]
FROM Record2WithKey2 e1
LEFT OUTER JOIN Record2WithoutKey2 e2
ON DATEDIFF(second, e1, e2) = 0
AND e1.key1 = e2.key1
UNION ALL
SELECT e2.key1 [key1], e2.key2 [key2], e2.values [values]
FROM Record2WithoutKey2 e2
LEFT OUTER JOIN Record2WithtKey2 e1
ON DATEDIFF(second, e1, e2) = 0
AND e1.key1 = e2.key1
WHERE e1.key1 IS NULL
),
JoinedRecord AS (
SELECT r1.*, r2.values [values], 'Valid' [Result]
FROM FinalRecord2 r2
INNER JOIN Record1 r1
ON DATEDIFF(second, r1, r2) = 0
AND r1.key1 = r2.key1
AND r1.key2 = r2.key2
UNION ALL
SELECT r1.*, r2.values [values], 'Invalid' [Result]
FROM FinalRecord2 r2
INNER JOIN Record1 r1
ON DATEDIFF(second, r1, r2) = 0
AND r1.key1 = r2.key1
AND r2.key2 = '0'
)
SELECT *
INTO Output
From JoinedRecord
记录1:
[
{
"key1": "value1",
"key2": "value2",
},
{
"key1": "value3",
"key2": "value4",
},
{
"key1": "value5",
"key2": "value6",
},
{
"key1": "value5",
"key2": "value7",
},
{
"key1": "value8",
"key2": "value9",
}
]
记录 2:
[
{
"key1": "value1",
"key2": "value2",
"values": 1
},
{
"key1": "value3",
"key2": null,
"values": 2
},
{
"key1": "value5",
"key2": "value6",
"values": 3
},
{
"key1": "value5",
"key2": null,
"values": 4
},
{
"key1": "value5",
"key2": "value7",
"values": 5
}
]
预期输出:
[
{
"key1": "value1",
"key2": "value2",
"values": [1],
"Result": "Valid"
},
{
"key1": "value3",
"key2": "value4",
"values": [2],
"Result": "Inalid"
},
{
"key1": "value5",
"key2": "value6",
"values": [3,4],
"Result": "Valid"
},
{
"key1": "value5",
"key2": "value7",
"values": [4,5],
"Result": "Valid"
}
]
但我实际上得到了:
{
"key1": "value5",
"key2": "value6",
"values": [3,4],
"Result": "Valid"
},
{
"key1": "value5",
"key2": "value7",
"values": [4,5],
"Result": "Valid"
}
有人知道我的 ASA 代码有什么问题吗?任何建议将不胜感激。谢谢!
已更新以适应:
- key1在record1中可以有多个key2
- key1在record2中可以有多个key2,包括null
添加上述约束迫使我再次使用 UNION。这样我就能够了解您的查询中发生了什么。
ASA 是一个流处理器。这意味着查询的每个步骤都发生在时间轴上。每个阻塞操作,如 GROUP BY 或 JOIN,都需要一个时间组件。有关详细信息,请参阅 that doc。
但在纸面上,当在给定时间戳操作时,我们应该能够忘记时间线。如果我们用 snapshot window, and set the DATEDIFF at 0 for joins 聚合,那么整个查询应该在相同的时间跨度内执行,并且表现得像标准 SQL 数据库中的普通 SQL 查询。
在现实中,某些操作可能会对它们处理的记录的时间戳产生影响。这是由于 LEFT JOIN。在 ASA 中,所有连接都是暂时的。假设我们正在尝试使用 A LEFT JOIN B ON DATEDIFF(second,A,B) = 0
连接 2 个数据流 A 和 B。当我们从 A 获得记录时,我们将开始等待来自 B 的所有记录的预期持续时间 - 此处为 0 秒。假设 none 到达此处。当时钟移动到下一个滴答声(此处为下一秒)时,我们宣布周期结束并可以从 A 中释放那个孤独的记录。但是我们需要更新它的时间戳:它是在向前移动一个滴答声的新时钟时间发出的(1 秒)。
这是第一个有效的查询,尽管使用了 LEFT JOIN:
WITH
-- Cleaning and timestamp
Record1preprocessed AS (
SELECT
CAST([key1] AS NVARCHAR(MAX)) AS key1,
CAST([key2] AS NVARCHAR(MAX)) AS key2
FROM record1 TIMESTAMP BY appTimeStamp
),
-- Cleaning and timestamp
Record2preprocessed AS (
SELECT
CAST([key1] AS NVARCHAR(MAX)) AS key1,
CAST([key2] AS NVARCHAR(MAX)) AS key2,
CAST([values] AS BIGINT) AS [values]
FROM record2 TIMESTAMP BY appTimeStamp
),
Record2WithKey2 AS (
SELECT
[key1],
[key2],
COLLECT([values]) AS [values],
System.Timestamp() AS ts
FROM Record2preprocessed
WHERE key2 IS NOT NULL
GROUP BY System.Timestamp(), key1, key2
),
Record2WithoutKey2 AS (
SELECT
[key1],
'0' AS [key2],
COLLECT([values]) [values],
System.Timestamp() AS ts
FROM Record2preprocessed
WHERE key2 IS NULL
GROUP BY System.Timestamp(), key1
),
FinalRecord2 AS (
-- Records with key2 are joined with records without to merge their arrays
SELECT
e1.key1,
e1.key2,
UDF.ConcatArrays(e1.[values], e2.[values]) AS [values],
System.Timestamp() AS ts
FROM Record2WithKey2 e1
LEFT JOIN Record2WithoutKey2 e2
ON DATEDIFF(second, e1, e2) = 0
AND e1.key1 = e2.key1
UNION ALL
-- Records without key2 are joined with records with key2 to filter them out (they are already in the first subquery above)
-- Here we only want the key1 that never have a key2
SELECT
e2.key1,
e2.key2,
e2.[values] [values],
System.Timestamp() AS ts
FROM Record2WithoutKey2 e2
LEFT JOIN Record2WithKey2 e1
ON DATEDIFF(second, e1, e2) = 0
AND e1.key1 = e2.key1
WHERE e1.key1 IS NULL
)
,JoinedRecord AS (
SELECT
r1.key1,
r1.key2,
r2.[values],
CASE WHEN r1.key2 = r2.key2 THEN 'Valid' ELSE 'Invalid' END AS [Result],
System.Timestamp() AS ts
FROM Record1preprocessed r1
LEFT JOIN FinalRecord2 r2
ON DATEDIFF(second, r1, r2) BETWEEN 0 AND 1
AND r1.key1 = r2.key1
WHERE
(r2.key2 = '0')
OR (r1.key2 = r2.key2)
)
SELECT * INTO OutputFinal FROM JoinedRecord
它使用 JavaScript UDF:
function main(arg1, arg2) {
if (arg2 == null) {return arg1;}
else {return arg1.concat(arg2);}
}
您会发现它看起来很像您的。在大多数情况下是这样。 我添加了一些 pre-processing 步骤来添加显式时间戳。这不应该是绝对必要的,但我想确定一下。
新的输入记录是:
- 记录1
{"key1": "value1","key2": "value2","appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value3","key2": "value4","appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": "value6","appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": "value7","appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value8","key2": "value9","appTimeStamp":"2022-02-02T00:00:00"}
- 记录2
{"key1": "value1","key2": "value2","values": 1,"appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value3","key2": null,"values": 2,"appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": "value6","values": 3,"appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": null,"values": 4,"appTimeStamp":"2022-02-02T00:00:00"}
{"key1": "value5","key2": "value7","values": 5,"appTimeStamp":"2022-02-02T00:00:00"}
我向每个 CTE 添加了 SYSTEM.TIMESTAMP()
,以便能够调试发生偏移的位置。
如果你这样做,你会注意到 FinalRecord2 给你:
{"key1":"value5","key2":"value7","values":[5,4],"ts":"2022-02-02T00:00:00.0000000Z"}
{"key1":"value5","key2":"value6","values":[3,4],"ts":"2022-02-02T00:00:00.0000000Z"}
{"key1":"value1","key2":"value2","values":[1],"ts":"2022-02-02T00:00:01.0000000Z"}
{"key1":"value3","key2":"0","values":[2],"ts":"2022-02-02T00:00:01.0000000Z"}
注意 LEFT JOIN 中必须超时的记录的时间戳如何提前 1 秒。
这可以通过在 JoinedRecord 的 JOIN 条件中给予一些余地来解决:DATEDIFF(second, r1, r2) BETWEEN 0 AND 1
.
这反过来会给你预期的结果:
{"key1":"value5","key2":"value7","values":[5,4],"Result":"Valid","ts":"2022-02-02T00:00:00.0000000Z"}
{"key1":"value5","key2":"value6","values":[3,4],"Result":"Valid","ts":"2022-02-02T00:00:00.0000000Z"}
{"key1":"value1","key2":"value2","values":[1],"Result":"Valid","ts":"2022-02-02T00:00:01.0000000Z"}
{"key1":"value3","key2":"value4","values":[2],"Result":"Invalid","ts":"2022-02-02T00:00:01.0000000Z"}
这里添加SYSTEM.TIMESTAMP()
是调试此类问题时绝对必须的。
这种行为不会发生在 INNER JOIN 中。我将尝试更新查询以删除左连接。
更新前
这是我对这个问题的看法:
WITH
-- First we regroup record2 to have only one row per key1. We also replace null key2 by a hardcoded value
Record2Collected AS (
SELECT
[key1],
COLLECT(CASE WHEN [key2] IS NULL THEN '0' ELSE [key2] END) AS [key2],
COLLECT([values]) [values]
FROM Record2
GROUP BY System.Timestamp(), key1
)
-- Then we re-expend all key combinations, but this time they will have all the values each time. Then we filter: either there's a single key2 (the array of key2 has a single value) or we want the key2 that's not null
,Record2Processed AS (
SELECT
R.key1,
C.ArrayValue AS key2,
R.[values]
FROM Record2Collected R
CROSS APPLY GetArrayElements(R.[key2]) AS C
WHERE
GetArrayLength(R.key2) = 1 -- Single key2 for a key1
OR
C.ArrayValue != '0' -- Multiple key2 for a key1, we remove null values
)
--Then we match Record1 with Record2Processed. We do it in a single join to avoid duplications.
,JoinedRecord AS (
SELECT
r1.key1,
r1.key2,
r2.[values],
CASE WHEN r1.key2 = r2.key2 THEN 'Valid' ELSE 'Invalid' END AS [Result]
FROM Record2Processed r2
LEFT JOIN Record1 r1 -- We're expecting r1 to have unique key1s here
ON DATEDIFF(second, r1, r2) = 0 -- All records are supposed to be on the same event time, see comment below
AND r1.key1 = r2.key1
)
SELECT * INTO myOutput FROM JoinedRecord
哪个会给你:
{"key1":"value3","key2":"value4","values":[2],"Result":"Invalid"}
{"key1":"value1","key2":"value2","values":[1],"Result":"Valid"}
{"key1":"value5","key2":"value6","values":[3,4],"Result":"Valid"}
以下是我持谨慎态度的一些方面:
- 我们正在使用 snapshot windows, and not using a TIMESTAMP BY, which means that the logic will apply to groups of records landing in the streaming input at the same time. This feels a bit risky. Is it the expected behavior? I would either define a time window, or use an application time。
- 在 Record2Processed 中,我们将对具有多个非空 key2 的 key1 重复值。如果发生这种情况,我们需要更新查询逻辑
- 在 JoinedRecord 中,我们必须仅在 key1 上从 r2 连接到 r1,这样我们就不会丢失空 key2。如果r1中key1可以有多个key2,我们需要更新逻辑。此外,如果某些组合键可以在 r1 中找到而在 r2 中找不到,则它们当前将丢失。我们还需要更新逻辑
请让我知道是否会出现上述某些情况,我会更新查询逻辑。