并行识别最新记录

Identifying newest records in parallel

我们正在使用 U-SQL 从一组 .csv 文件中提取传感器数据。每条记录包含传感器 ID、测量时间和值,以及接收记录的时间:

+----------+---------------------+------------------+---------------------+
| SensorID |   MeasurementTime   | MeasurementValue |    ReceivedTime     |
+----------+---------------------+------------------+---------------------+
| xxx      | 2017-09-10 11:00:00 |           12.342 | 2017-09-19 14:25:17 |
| xxx      | 2017-09-10 12:00:00 |           14.654 | 2017-09-19 14:25:17 |
| yyy      | 2017-09-10 11:00:00 |            1.054 | 2017-09-19 14:25:17 |
| yyy      | 2017-09-10 12:00:00 |            1.354 | 2017-09-19 14:25:17 |
  ...
| xxx      | 2017-09-10 11:00:00 |           10.261 | 2017-09-19 15:25:17 |
+----------+---------------------+------------------+---------------------+

文件存储在 ADLS 中的路径基于测量时间的日期部分,因此上面看到的数据将在 /Data/2017/09/10/measurements.csv 中找到,其中前四行写在 14:25:17 在 9 月 19 日,最后一行在一小时后添加到 15:25:17。

如上例所示,可以稍后接收相同 SensorID 和 MeasurementTime 的新值。每个分区包含几百万行,每天都有几千行附加到少量分区。我们想要 运行 一个批处理作业,比如每 24 小时一次,对于任何给定的 SensorID 和 MeasurementTime,它将只输出最新的值。为此,我们使用一个 U-SQL 脚本,它看起来类似于:

@newestMeasurements_addRN =
    SELECT *,
           ROW_NUMBER() OVER (PARTITION BY PDate, 
                                           SensorId, 
                                           MeasurementTime
                              ORDER BY ReceivedTime DESC) AS MeasurementRN;

@newestMeasurements =
    SELECT SensorId,
           MeasurementTime,
           MeasurementValue
    FROM @newestMeasurements_addRN
    WHERE MeasurementRN == 1;

这里,PDate是从CSV文件路径中的yyyy/MM/dd推断出来的虚拟列(等于MeasurementTime的日期部分)。

现在,由于我们在 window 函数的 PARTITION BY 部分使用了 PDate,我希望这个操作可以并行化,因为我们不必考虑不同的尝试查找任何给定 SensorID 和 MeasurementTime 的最新记录时的天数(分区)。不幸的是,情况似乎并非如此,查看工作图表:

在这里,我们正在提取 4 个不同日期的数据。每个 Extract 顶点输出完整的记录数,将仅识别最新记录的任务留给底部的 Combine 顶点,表明ROW_NUMBER 和后续过滤不会并行发生。

我设法找到了一个可用的解决方案,其中我将检测最新测量值的 U-SQL 封装在一个 U-SQL 存储过程中,它的值对应于 pdate作为输入参数。

然后,我简单地执行这个存储过程几次,并列出我想并行处理的日期:

DetectLatestMeasurements(20170910);
DetectLatestMeasurements(20170911);
DetectLatestMeasurements(20170912);
DetectLatestMeasurements(20170913);

存储过程处理一天数据的提取、转换和输出,所以它完成了工作,并且按照我期望的方式并行化。