Luigi 在 Spark 和 Redshift 中无法按预期工作

Luigi doesn't work as expected with Spark & Redshift

我运行正在使用一个 EMR Spark 集群(使用 YARN)并且我正在 运行正在直接从 EMR 主服务器处理 Luigi 任务。我有一系列工作依赖于 S3 中的数据,经过几次 SparkSubmitTasks 最终将在 Redshift 中结束。

import luigi
import luigi.format
from luigi.contrib.spark import SparkSubmitTask
from luigi.contrib.redshift import RedshiftTarget


class SomeSparkTask(SparkSubmitTask):

    # Stored in /etc/luigi/client.cfg
    host = luigi.Parameter(default='host')
    database = luigi.Parameter(default='database')
    user = luigi.Parameter(default='user')
    password = luigi.Parameter(default='password')
    table = luigi.Parameter(default='table')

    <add-more-params-here>

    app = '<app-jar>.jar'
    entry_class = '<path-to-class>'

    def app_options(self):
        return <list-of-options>

    def output(self):
        return RedshiftTarget(host=self.host, database=self.database, user=self.user, password=self.password,
                              table=self.table, update_id=<some-unique-identifier>)

    def requires(self):
        return AnotherSparkSubmitTask(<params>)

我运行遇到两个主要问题:

1) 有时 luigi 无法确定 SparkSubmitTask 何时完成 - 例如,我会看到 luigi 提交作业,然后检查 YARN,它会说应用程序是 运行 ning,但一旦完成,luigi 就会挂起,无法确定工作是否已完成。

2) 如果出于某种原因 SparkSubmitTasks 能够 运行 并且我在上面放置的任务完成了 Spark 作业,则输出任务永远不会 运行 并且标记 -table 从未创建或填充。但是,实际的 table 是在 运行 的 Spark 作业中创建的。我是否误解了我应该如何称呼 RedshiftTarget?

与此同时,我正在尝试熟悉源代码。

谢谢!

在我的 Spark 应用程序中不再使用 Luigi,因为我所有的数据现在都流式传输到 S3,我只需要一个大型单体应用程序来 运行 我所有的 Spark 聚合,这样我就可以利用中间 results/caching。