如何在 Hadoop 作业中保持状态?

How to keep a state in Hadoop jobs?

我正在开发一个 hadoop 程序,计划每天 运行 一次。它需要一堆 json 个文档,每个文档都有一个时间戳,显示添加文档的时间。我的程序应该只处理自上次 运行 以来添加的那些文档。因此,我需要保留一个时间戳状态,显示我的 hadoop 作业上次 运行 的时间。我正在考虑将此状态存储在 SQL 服务器中,并在我的工作驱动程序中查询它。这是一个好的解决方案还是更好的解决方案?

p.s。我的 hadoop 工作是在 HDInsight 上 运行ning。话虽如此,仍然可以从我的驱动程序中查询 SQL 服务器?

您可以使用日期时间重命名结果文档,然后您的程序可以根据文档名称处理文档。

驱动程序检查最后的 运行 时间戳是个好方法,但要存储最后的 运行 时间戳,您可以使用 HDFS 中的临时文件。

我们已经在 AWS(亚马逊网络服务)中解决了我们的工作流程 运行 中存储在 S3 中的数据的这个问题。

我们的设置:

  • 数据存储:AWS S3
  • 数据摄取机制:Flume
  • 工作流管理:Oozie
  • 文件状态存储:MySQL

问题:

我们使用 Flume 将数据提取到 Amazon S3 中。所有摄取的数据都在同一个文件夹中(S3 是一个 key/value 存储,没有文件夹的概念。这里的文件夹意味着,所有数据都有相同的前缀。例如 /tmp/1.txt,[=163= .txt 等。这里 /tmp/ 是键前缀)。

我们有一个 ETL 工作流程,计划 运行 一小时一次。但是,由于所有数据都被提取到同一个文件夹中,我们必须区分 ProcessedUn-Processed 文件。

例如摄取的第一个小时数据是:

/tmp/1.txt
/tmp/2.txt

当工作流第一次启动时,它应该处理来自“1.txt”和“2.txt”的数据,并将它们标记为已处理

如果第二个小时,摄取的数据是:

/tmp/3.txt
/tmp/4.txt
/tmp/5.txt

那么,2小时后文件夹中的总数据将为:

/tmp/1.txt
/tmp/2.txt
/tmp/3.txt
/tmp/4.txt
/tmp/5.txt

因为“1.txt”和“2.txt”已经处理并标记为已处理,在第二个运行期间,作业应该只处理“3.txt”、“4.txt”和“5.txt”。

解法:

我们开发了一个库(我们称之为 FileManager),用于管理已处理文件的列表。我们将此库作为 Java 操作插入到 Oozie 工作流程中。这是工作流程的第一步。

该库还负责忽略 Flume 当前正在写入的文件。当 Flume 正在将数据写入文件时,这些文件具有“_current”后缀。所以,这些文件被忽略处理,直到它们被完全写入。

提取的文件是用时间戳作为后缀生成的。例如"hourly_feed.1234567"。因此,文件名按其创建顺序升序排列。

为了获取未处理文件的列表,我们使用了S3的使用标记查询的功能(例如,如果您在一个文件夹中有10,000个文件,如果您将标记指定为第5,000个文件的名称,那么S3将return 你的文件从 5001 到 10,000)。

每个文件都有以下 3 个状态:

  1. SUCCESS - 已成功处理的文件
  2. 错误 - 拾取文件进行处理,但在处理这些文件时出错。因此,需要重新提取这些文件进行处理
  3. IN_PROGRESS - 已拾取进行处理且当前正在由作业处理的文件

对于每个文件,我们在 MySQL 数据库中存储了以下详细信息:

  • 文件名
  • 上次修改时间 - 我们用它来处理一些特殊情况
  • 文件状态(IN_PROGRESS成功错误

FileManager 公开了以下接口:

  • GetLatestFiles: Return 最新的未处理 文件列表
  • UpdateFileStatus: 处理文件后,更新文件状态

以下是识别尚未处理的文件所遵循的步骤:

  1. 查询数据库 (MySql),检查状态为 SUCCESSlast 文件(查询:order by created desc).
  2. 如果第一步 return 是文件,则查询 S3,文件标记设置为最后成功处理的文件。这将 return 所有文件,在最后一个成功处理的文件之后摄取。
  3. 同时查询数据库,检查是否有任何文件处于错误状态。这些文件需要重新处理,因为以前的工作流程没有成功处理它们。
  4. Return 从第 2 步和第 3 步获得的文件列表(在 returning 它们之前,将它们的状态标记为 IN_PROGRESS) .
  5. 作业成功完成后,将所有已处理文件的状态更新为SUCCESS。如果在处理文件时出现错误,则将所有文件的状态更新为 ERROR(以便下次可以提取它们进行处理)

我们使用 Oozie 进行工作流管理。 Oozie 工作流有以下步骤:

  1. 第 1 步:获取下一组要处理的文件,将它们的每个状态标记为 IN_PROGRESS 并将它们传递到下一阶段
  2. 第 2 步:处理文件
  3. 第 3 步:更新处理状态(SUCCESSERROR

去重: 当您实现这样的库时,可能会出现重复记录(在某些极端情况下,同一文件可能会被拾取两次进行处理)。我们实施了重复数据删除逻辑来删除重复记录。