如何在 Spark 中处理增量 S3 文件

How to process incremental S3 files in Spark

我制作了以下管道: 任务管理器 -> SQS -> scraper worker(我的应用程序) -> AWS Firehose -> S3 文件 -> Spark ->(?) Redshift。

有些事情我正在尝试 solve/improve,我很乐意提供指导:

  1. 爬虫可能会获取重复的数据,并将它们再次冲洗到 firehose,这将导致 spark 重复。我可以在开始计算之前使用 Distinct 函数在 spark 中解决这个问题吗?
  2. 我没有删除S3处理过的文件,所以数据越来越大。这是一个好习惯吗? (将 s3 作为输入数据库)还是我应该处理每个文件并在 spark 完成后将其删除?目前我正在做 sc.textFile("s3n://...../*/*/*") - 这将收集我所有的存储桶文件和 运行 计算。
  3. 要将结果放在 Redshift(或 s3)中 -> 我如何逐步执行此操作?也就是说,如果 s3 越来越大,redshift 将有重复的数据......我之前总是刷新它吗?怎么样?

我以前遇到过这些问题,但不是在单个管道中。这是我所做的。

  1. 删除重复项

    一个。我使用 BloomFilter 删除本地重复项。请注意,该文档相对不完整,但您可以 save/load/union/intersect 布隆过滤器对象很容易。您甚至可以对过滤器执行 reduce

    b。如果您将数据直接从 Spark 保存到 RedShift,您可能需要花费一些时间和精力来更新当前批次的 BloomFilter,广播它,然后进行过滤以确保全局没有重复。之前我在RDS中使用了一个UNIQUE约束并忽略了这个错误,但是不幸的是RedShift does not honour the constraint.

  2. 和3.数据越来越大

我使用 EMR 集群来 运行 s3-dist-cp command 移动和合并数据(因为通常有很多小日志文件,这会影响 Spark 的性能)。如果您碰巧使用 EMR 来托管您的 Spark 集群,只需在您的分析之前添加一个步骤,将数据从一个存储桶移动到另一个存储桶。该步骤将 command-runner.jar 作为自定义 jar,命令看起来像

s3-dist-cp --src=s3://INPUT_BUCKET/ --dest=s3://OUTPUT_BUCKET_AND_PATH/ --groupBy=".*\.2016-08-(..)T.*" --srcPattern=".*\.2016-08.*" --appendToLastFile --deleteOnSuccess

注意原来的distcp不支持合并文件。

一般来说,您应该尽量避免将已处理和未处理的数据放在同一个存储桶(或至少是路径)中。