Spark 时间序列 - 按 10 分钟间隔自定义分组:提高性能

Spark Time Series - Custom Group By 10 Minute Intervals: Improve Performance

我们有时间序列数据(我们自 1970 年以来的时间戳和整数数据值):

# load data and cache it
df_cache = readInData() # read data from several files (paritioned by hour)
df_cache.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
df_cache.agg({"data": "max"}).collect()

# now data is cached
df_cache.show()
+--------------------+---------+
|                time|     data| 
+--------------------+---------+
|1.448409599861109E15|1551.7468|
|1.448409599871109E15|1551.7463|
|1.448409599881109E15|1551.7468|

现在我们想使用外部 python 库在 10 分钟时间 windows 上计算一些重要的事情。为此,我们需要将每个时间帧的数据加载到内存中,应用外部函数并存储结果。因此,用户定义的聚合函数 (UDAF) 是不可能的。

现在的问题是,当我们将GroupBy应用于RDD时,它非常慢。

df_cache.rdd.groupBy(lambda x: int(x.time / 600e6) ). \ # create 10 minute groups
             map(lambda x: 1). \ # do some calculations, e.g. external library
             collect() # get results

此操作在两个具有 6GB Ram 的节点上进行 120Mio 样本(100Hz 数据)大约需要 14 分钟。 groupBy 阶段的 Spark 详细信息:

Total Time Across All Tasks: 1.2 h
Locality Level Summary: Process local: 8
Input Size / Records: 1835.0 MB / 12097
Shuffle Write: 1677.6 MB / 379
Shuffle Spill (Memory): 79.4 GB
Shuffle Spill (Disk): 1930.6 MB

如果我使用一个简单的 python 脚本并让它遍历输入文件,完成所需的时间会少得多。

如何在spark中优化这个作业?

groupBy 是您的瓶颈:它需要在所有分区之间随机播放数据,这很耗时,并且会占用大量 space 内存,正如您从指标中看到的那样。

这里的方法是使用 reduceByKey 操作并将其链接如下: df_cache.rdd.map(lambda x: (int(x.time/600e6), (x.time, x.data) ).reduceByKey(lambda x,y: 1).collect()

这里的关键要点是 groupBy 需要跨所有分区随机播放所有数据,而 reduceByKey 将首先减少每个分区,然后再跨所有分区 - 大大减少全局洗牌的大小。请注意我如何将输入组织成一个键以利用 reduceByKey 操作。

正如我在评论中提到的,您可能还想通过使用 Spark SQL 的 DataFrame 抽象来尝试您的程序,由于其优化器,这可能会给您带来额外的提升。