如何在 Hadoop 作业中保持状态?
How to keep a state in Hadoop jobs?
我正在开发一个 hadoop 程序,计划每天 运行 一次。它需要一堆 json 个文档,每个文档都有一个时间戳,显示添加文档的时间。我的程序应该只处理自上次 运行 以来添加的那些文档。因此,我需要保留一个时间戳状态,显示我的 hadoop 作业上次 运行 的时间。我正在考虑将此状态存储在 SQL 服务器中,并在我的工作驱动程序中查询它。这是一个好的解决方案还是更好的解决方案?
p.s。我的 hadoop 工作是在 HDInsight 上 运行ning。话虽如此,仍然可以从我的驱动程序中查询 SQL 服务器?
您可以使用日期时间重命名结果文档,然后您的程序可以根据文档名称处理文档。
驱动程序检查最后的 运行 时间戳是个好方法,但要存储最后的 运行 时间戳,您可以使用 HDFS 中的临时文件。
我们已经在 AWS(亚马逊网络服务)中解决了我们的工作流程 运行 中存储在 S3 中的数据的这个问题。
我们的设置:
- 数据存储:AWS S3
- 数据摄取机制:Flume
- 工作流管理:Oozie
- 文件状态存储:MySQL
问题:
我们使用 Flume 将数据提取到 Amazon S3 中。所有摄取的数据都在同一个文件夹中(S3 是一个 key/value 存储,没有文件夹的概念。这里的文件夹意味着,所有数据都有相同的前缀。例如 /tmp/1.txt,[=163= .txt 等。这里 /tmp/ 是键前缀)。
我们有一个 ETL 工作流程,计划 运行 一小时一次。但是,由于所有数据都被提取到同一个文件夹中,我们必须区分 Processed 和 Un-Processed 文件。
例如摄取的第一个小时数据是:
/tmp/1.txt
/tmp/2.txt
当工作流第一次启动时,它应该处理来自“1.txt”和“2.txt”的数据,并将它们标记为已处理。
如果第二个小时,摄取的数据是:
/tmp/3.txt
/tmp/4.txt
/tmp/5.txt
那么,2小时后文件夹中的总数据将为:
/tmp/1.txt
/tmp/2.txt
/tmp/3.txt
/tmp/4.txt
/tmp/5.txt
因为“1.txt”和“2.txt”已经处理并标记为已处理,在第二个运行期间,作业应该只处理“3.txt”、“4.txt”和“5.txt”。
解法:
我们开发了一个库(我们称之为 FileManager
),用于管理已处理文件的列表。我们将此库作为 Java 操作插入到 Oozie 工作流程中。这是工作流程的第一步。
该库还负责忽略 Flume 当前正在写入的文件。当 Flume 正在将数据写入文件时,这些文件具有“_current”后缀。所以,这些文件被忽略处理,直到它们被完全写入。
提取的文件是用时间戳作为后缀生成的。例如"hourly_feed.1234567"。因此,文件名按其创建顺序升序排列。
为了获取未处理文件的列表,我们使用了S3的使用标记查询的功能(例如,如果您在一个文件夹中有10,000个文件,如果您将标记指定为第5,000个文件的名称,那么S3将return 你的文件从 5001 到 10,000)。
每个文件都有以下 3 个状态:
- SUCCESS - 已成功处理的文件
- 错误 - 拾取文件进行处理,但在处理这些文件时出错。因此,需要重新提取这些文件进行处理
- IN_PROGRESS - 已拾取进行处理且当前正在由作业处理的文件
对于每个文件,我们在 MySQL 数据库中存储了以下详细信息:
- 文件名
- 上次修改时间 - 我们用它来处理一些特殊情况
- 文件状态(IN_PROGRESS、成功、错误)
FileManager
公开了以下接口:
GetLatestFiles
: Return 最新的未处理 文件列表
UpdateFileStatus
: 处理文件后,更新文件状态
以下是识别尚未处理的文件所遵循的步骤:
- 查询数据库 (MySql),检查状态为 SUCCESS 的 last 文件(查询:
order by created desc
).
- 如果第一步 return 是文件,则查询 S3,文件标记设置为最后成功处理的文件。这将 return 所有文件,在最后一个成功处理的文件之后摄取。
- 同时查询数据库,检查是否有任何文件处于错误状态。这些文件需要重新处理,因为以前的工作流程没有成功处理它们。
- Return 从第 2 步和第 3 步获得的文件列表(在 returning 它们之前,将它们的状态标记为 IN_PROGRESS) .
- 作业成功完成后,将所有已处理文件的状态更新为SUCCESS。如果在处理文件时出现错误,则将所有文件的状态更新为 ERROR(以便下次可以提取它们进行处理)
我们使用 Oozie 进行工作流管理。 Oozie 工作流有以下步骤:
- 第 1 步:获取下一组要处理的文件,将它们的每个状态标记为 IN_PROGRESS 并将它们传递到下一阶段
- 第 2 步:处理文件
- 第 3 步:更新处理状态(SUCCESS 或 ERROR)
去重:
当您实现这样的库时,可能会出现重复记录(在某些极端情况下,同一文件可能会被拾取两次进行处理)。我们实施了重复数据删除逻辑来删除重复记录。
我正在开发一个 hadoop 程序,计划每天 运行 一次。它需要一堆 json 个文档,每个文档都有一个时间戳,显示添加文档的时间。我的程序应该只处理自上次 运行 以来添加的那些文档。因此,我需要保留一个时间戳状态,显示我的 hadoop 作业上次 运行 的时间。我正在考虑将此状态存储在 SQL 服务器中,并在我的工作驱动程序中查询它。这是一个好的解决方案还是更好的解决方案?
p.s。我的 hadoop 工作是在 HDInsight 上 运行ning。话虽如此,仍然可以从我的驱动程序中查询 SQL 服务器?
您可以使用日期时间重命名结果文档,然后您的程序可以根据文档名称处理文档。
驱动程序检查最后的 运行 时间戳是个好方法,但要存储最后的 运行 时间戳,您可以使用 HDFS 中的临时文件。
我们已经在 AWS(亚马逊网络服务)中解决了我们的工作流程 运行 中存储在 S3 中的数据的这个问题。
我们的设置:
- 数据存储:AWS S3
- 数据摄取机制:Flume
- 工作流管理:Oozie
- 文件状态存储:MySQL
问题:
我们使用 Flume 将数据提取到 Amazon S3 中。所有摄取的数据都在同一个文件夹中(S3 是一个 key/value 存储,没有文件夹的概念。这里的文件夹意味着,所有数据都有相同的前缀。例如 /tmp/1.txt,[=163= .txt 等。这里 /tmp/ 是键前缀)。
我们有一个 ETL 工作流程,计划 运行 一小时一次。但是,由于所有数据都被提取到同一个文件夹中,我们必须区分 Processed 和 Un-Processed 文件。
例如摄取的第一个小时数据是:
/tmp/1.txt
/tmp/2.txt
当工作流第一次启动时,它应该处理来自“1.txt”和“2.txt”的数据,并将它们标记为已处理。
如果第二个小时,摄取的数据是:
/tmp/3.txt
/tmp/4.txt
/tmp/5.txt
那么,2小时后文件夹中的总数据将为:
/tmp/1.txt
/tmp/2.txt
/tmp/3.txt
/tmp/4.txt
/tmp/5.txt
因为“1.txt”和“2.txt”已经处理并标记为已处理,在第二个运行期间,作业应该只处理“3.txt”、“4.txt”和“5.txt”。
解法:
我们开发了一个库(我们称之为 FileManager
),用于管理已处理文件的列表。我们将此库作为 Java 操作插入到 Oozie 工作流程中。这是工作流程的第一步。
该库还负责忽略 Flume 当前正在写入的文件。当 Flume 正在将数据写入文件时,这些文件具有“_current”后缀。所以,这些文件被忽略处理,直到它们被完全写入。
提取的文件是用时间戳作为后缀生成的。例如"hourly_feed.1234567"。因此,文件名按其创建顺序升序排列。
为了获取未处理文件的列表,我们使用了S3的使用标记查询的功能(例如,如果您在一个文件夹中有10,000个文件,如果您将标记指定为第5,000个文件的名称,那么S3将return 你的文件从 5001 到 10,000)。
每个文件都有以下 3 个状态:
- SUCCESS - 已成功处理的文件
- 错误 - 拾取文件进行处理,但在处理这些文件时出错。因此,需要重新提取这些文件进行处理
- IN_PROGRESS - 已拾取进行处理且当前正在由作业处理的文件
对于每个文件,我们在 MySQL 数据库中存储了以下详细信息:
- 文件名
- 上次修改时间 - 我们用它来处理一些特殊情况
- 文件状态(IN_PROGRESS、成功、错误)
FileManager
公开了以下接口:
GetLatestFiles
: Return 最新的未处理 文件列表UpdateFileStatus
: 处理文件后,更新文件状态
以下是识别尚未处理的文件所遵循的步骤:
- 查询数据库 (MySql),检查状态为 SUCCESS 的 last 文件(查询:
order by created desc
). - 如果第一步 return 是文件,则查询 S3,文件标记设置为最后成功处理的文件。这将 return 所有文件,在最后一个成功处理的文件之后摄取。
- 同时查询数据库,检查是否有任何文件处于错误状态。这些文件需要重新处理,因为以前的工作流程没有成功处理它们。
- Return 从第 2 步和第 3 步获得的文件列表(在 returning 它们之前,将它们的状态标记为 IN_PROGRESS) .
- 作业成功完成后,将所有已处理文件的状态更新为SUCCESS。如果在处理文件时出现错误,则将所有文件的状态更新为 ERROR(以便下次可以提取它们进行处理)
我们使用 Oozie 进行工作流管理。 Oozie 工作流有以下步骤:
- 第 1 步:获取下一组要处理的文件,将它们的每个状态标记为 IN_PROGRESS 并将它们传递到下一阶段
- 第 2 步:处理文件
- 第 3 步:更新处理状态(SUCCESS 或 ERROR)
去重: 当您实现这样的库时,可能会出现重复记录(在某些极端情况下,同一文件可能会被拾取两次进行处理)。我们实施了重复数据删除逻辑来删除重复记录。