并行识别最新记录
Identifying newest records in parallel
我们正在使用 U-SQL 从一组 .csv 文件中提取传感器数据。每条记录包含传感器 ID、测量时间和值,以及接收记录的时间:
+----------+---------------------+------------------+---------------------+
| SensorID | MeasurementTime | MeasurementValue | ReceivedTime |
+----------+---------------------+------------------+---------------------+
| xxx | 2017-09-10 11:00:00 | 12.342 | 2017-09-19 14:25:17 |
| xxx | 2017-09-10 12:00:00 | 14.654 | 2017-09-19 14:25:17 |
| yyy | 2017-09-10 11:00:00 | 1.054 | 2017-09-19 14:25:17 |
| yyy | 2017-09-10 12:00:00 | 1.354 | 2017-09-19 14:25:17 |
...
| xxx | 2017-09-10 11:00:00 | 10.261 | 2017-09-19 15:25:17 |
+----------+---------------------+------------------+---------------------+
文件存储在 ADLS 中的路径基于测量时间的日期部分,因此上面看到的数据将在 /Data/2017/09/10/measurements.csv
中找到,其中前四行写在 14:25:17 在 9 月 19 日,最后一行在一小时后添加到 15:25:17。
如上例所示,可以稍后接收相同 SensorID 和 MeasurementTime 的新值。每个分区包含几百万行,每天都有几千行附加到少量分区。我们想要 运行 一个批处理作业,比如每 24 小时一次,对于任何给定的 SensorID 和 MeasurementTime,它将只输出最新的值。为此,我们使用一个 U-SQL 脚本,它看起来类似于:
@newestMeasurements_addRN =
SELECT *,
ROW_NUMBER() OVER (PARTITION BY PDate,
SensorId,
MeasurementTime
ORDER BY ReceivedTime DESC) AS MeasurementRN;
@newestMeasurements =
SELECT SensorId,
MeasurementTime,
MeasurementValue
FROM @newestMeasurements_addRN
WHERE MeasurementRN == 1;
这里,PDate
是从CSV文件路径中的yyyy/MM/dd推断出来的虚拟列(等于MeasurementTime的日期部分)。
现在,由于我们在 window 函数的 PARTITION BY
部分使用了 PDate
,我希望这个操作可以并行化,因为我们不必考虑不同的尝试查找任何给定 SensorID 和 MeasurementTime 的最新记录时的天数(分区)。不幸的是,情况似乎并非如此,查看工作图表:
在这里,我们正在提取 4 个不同日期的数据。每个 Extract 顶点输出完整的记录数,将仅识别最新记录的任务留给底部的 Combine 顶点,表明ROW_NUMBER
和后续过滤不会并行发生。
- 这是
ROW_NUMBER
实现中的错误吗?
- 我们可以使用不同的 U-SQL 技术来确保并行性吗?
我设法找到了一个可用的解决方案,其中我将检测最新测量值的 U-SQL 封装在一个 U-SQL 存储过程中,它的值对应于 pdate
作为输入参数。
然后,我简单地执行这个存储过程几次,并列出我想并行处理的日期:
DetectLatestMeasurements(20170910);
DetectLatestMeasurements(20170911);
DetectLatestMeasurements(20170912);
DetectLatestMeasurements(20170913);
存储过程处理一天数据的提取、转换和输出,所以它完成了工作,并且按照我期望的方式并行化。
我们正在使用 U-SQL 从一组 .csv 文件中提取传感器数据。每条记录包含传感器 ID、测量时间和值,以及接收记录的时间:
+----------+---------------------+------------------+---------------------+
| SensorID | MeasurementTime | MeasurementValue | ReceivedTime |
+----------+---------------------+------------------+---------------------+
| xxx | 2017-09-10 11:00:00 | 12.342 | 2017-09-19 14:25:17 |
| xxx | 2017-09-10 12:00:00 | 14.654 | 2017-09-19 14:25:17 |
| yyy | 2017-09-10 11:00:00 | 1.054 | 2017-09-19 14:25:17 |
| yyy | 2017-09-10 12:00:00 | 1.354 | 2017-09-19 14:25:17 |
...
| xxx | 2017-09-10 11:00:00 | 10.261 | 2017-09-19 15:25:17 |
+----------+---------------------+------------------+---------------------+
文件存储在 ADLS 中的路径基于测量时间的日期部分,因此上面看到的数据将在 /Data/2017/09/10/measurements.csv
中找到,其中前四行写在 14:25:17 在 9 月 19 日,最后一行在一小时后添加到 15:25:17。
如上例所示,可以稍后接收相同 SensorID 和 MeasurementTime 的新值。每个分区包含几百万行,每天都有几千行附加到少量分区。我们想要 运行 一个批处理作业,比如每 24 小时一次,对于任何给定的 SensorID 和 MeasurementTime,它将只输出最新的值。为此,我们使用一个 U-SQL 脚本,它看起来类似于:
@newestMeasurements_addRN =
SELECT *,
ROW_NUMBER() OVER (PARTITION BY PDate,
SensorId,
MeasurementTime
ORDER BY ReceivedTime DESC) AS MeasurementRN;
@newestMeasurements =
SELECT SensorId,
MeasurementTime,
MeasurementValue
FROM @newestMeasurements_addRN
WHERE MeasurementRN == 1;
这里,PDate
是从CSV文件路径中的yyyy/MM/dd推断出来的虚拟列(等于MeasurementTime的日期部分)。
现在,由于我们在 window 函数的 PARTITION BY
部分使用了 PDate
,我希望这个操作可以并行化,因为我们不必考虑不同的尝试查找任何给定 SensorID 和 MeasurementTime 的最新记录时的天数(分区)。不幸的是,情况似乎并非如此,查看工作图表:
在这里,我们正在提取 4 个不同日期的数据。每个 Extract 顶点输出完整的记录数,将仅识别最新记录的任务留给底部的 Combine 顶点,表明ROW_NUMBER
和后续过滤不会并行发生。
- 这是
ROW_NUMBER
实现中的错误吗? - 我们可以使用不同的 U-SQL 技术来确保并行性吗?
我设法找到了一个可用的解决方案,其中我将检测最新测量值的 U-SQL 封装在一个 U-SQL 存储过程中,它的值对应于 pdate
作为输入参数。
然后,我简单地执行这个存储过程几次,并列出我想并行处理的日期:
DetectLatestMeasurements(20170910);
DetectLatestMeasurements(20170911);
DetectLatestMeasurements(20170912);
DetectLatestMeasurements(20170913);
存储过程处理一天数据的提取、转换和输出,所以它完成了工作,并且按照我期望的方式并行化。