一个作业更新另一个作业输出的最佳方式

Best way for a job to update output from another job

这是我的场景。我有一份处理大量 csv 数据并使用 Avro 将其写出到按日期划分的文件中的工作。我得到了一个小文件,我想用它来更新其中一些文件和第二项工作的附加条目我可以 运行 只要需要发生这种情况,而不是再次重新处理整个数据集。

这个想法大概是这样的:

因此,我必须以这种方式编写 Java 作业。我的第一份工作似乎很顺利。 3 也是。我不确定如何处理工作 2。

这是我的想法:

这就是实现方式吗?如果不是更好的方法是什么?组合器在这里有意义吗?我觉得答案是否定的。

提前致谢。

您可以按照以下方法:

1) 运行 所有 csv 文件上的 job1

2) 运行 job2 在小文件上并创建新输出

3) 对于更新,您需要再运行 一个作业,在这个作业中,在setup() 方法中加载job2 的输出并将job1 的输出作为map() 输入。然后编写update的逻辑,生成最终的输出。

4) 然后 运行 你的 job3 进行处理。

根据我的说法,这会起作用。

对于Job2,您可以读取更新文件过滤Driver代码中的输入数据分区,并在Input paths中设置。您可以按照当前的方法将更新文件读取为分发缓存 file.In 如果您希望在无法读取更新文件的情况下使作业失败,则在设置方法本身中抛出异常。

如果您的更新逻辑不需要在 reduce 端进行聚合,则将 Job2 设置为仅映射 job.You 可能需要构建逻辑来识别 Job3 中更新的输入分区,因为它将接收 Job1 输出和 Job2 输出。

只是一个疯狂的想法:为什么你需要实际更新 job1 输出?

  • JOB1 完成了为日期生成一个文件的工作。为什么不添加像随机 UUID 这样的唯一后缀?
  • JOB2 处理 'update' 信息。也许好几次。输出文件命名的逻辑是相同的:基于日期的名称和唯一的后缀。
  • JOB3 收集 JOB1 和 JOB2 输出,将它们按日期前缀和所有后缀分组并作为输入。

如果基于日期的分组是目标,那么对于我来说你有很多优势,显而易见的优势:

  • 你不在乎 'if you have output from JOB1 for this date'。
  • 您甚至不关心是否需要用多个 JOB2 结果更新一个 JOB1 输出。
  • 您不会破坏具有 'no file update' 限制的 HDFS 方法,具有 'write once' 直接处理的全部功能。
  • 您的 JOB3 只需要一些特定的 InputFormat。看起来没那么复杂。
  • 如果您需要合并来自不同来源的数据,没问题。
  • JOB3 本身可以忽略它从多个来源接收数据的事实。 InputFormat要小心。
  • 多个 JOB1 输出可以相同的方式组合。

限制:

  • 对于大型数据集和多次传递,这可能会产生更多的小文件。
  • 您需要自定义 InputFormat.

对于我来说,如果我正确理解你的情况并且你可以/需要按日期对文件进行分组作为 JOB3 的输入,那么我是个不错的选择。

希望对您有所帮助。