从 Python 气流 dag 代码调用 Spark Scala 函数
Calling Spark Scala Function from Python airflow dag code
我的 Spark 管道是用 Scala 编写的,Airflow DAG 是用 Python 编写的。
我正在开发 运行 任务的功能,当 dag 触发日期等于 table 中的 run_date 时,否则跳过它。
我的计划是为此使用 ShortCircuitOperator。
我在 Scala 中编写了一个函数来从配置单元 table 中获取该日期。我想从我的 Python DAG 文件中调用此函数,以便获取日期值并在 ShortCircuitOperator 中使用它。所以我正在寻找一种从 Python DAG 文件调用我的 Scala 函数的方法。另外请指教有没有更好的方法。
skip_if_not_run_date = ShortCircuitOperator(
task_id='skip_if_not_run_date',
python_callable=getLatestRunDate, #need to call scala function here
dag=dag,
provide_context=True,
)
我需要调用的 Spark Scala 函数
def getLatestRunDate(df: DataFrame): DataFrame = {
val df = spark.table("my_hive_schema.my_run_catalog_table")
df
.filter(col("date_of_job_run").leq(current_date())
and col("month_nbr").geq(month(current_date())-2)
and col("yr_nbr").equalTo(year()))
.select("date_of_job_run")
}
我建议将其分成两个任务。在第一个任务中,使用 XCOM 调用 Scala 函数和 return 值。第二个任务可以是短路运算符,可以从第一个任务接收 XCOM 值作为参数。
我的 Spark 管道是用 Scala 编写的,Airflow DAG 是用 Python 编写的。
我正在开发 运行 任务的功能,当 dag 触发日期等于 table 中的 run_date 时,否则跳过它。 我的计划是为此使用 ShortCircuitOperator。
我在 Scala 中编写了一个函数来从配置单元 table 中获取该日期。我想从我的 Python DAG 文件中调用此函数,以便获取日期值并在 ShortCircuitOperator 中使用它。所以我正在寻找一种从 Python DAG 文件调用我的 Scala 函数的方法。另外请指教有没有更好的方法。
skip_if_not_run_date = ShortCircuitOperator(
task_id='skip_if_not_run_date',
python_callable=getLatestRunDate, #need to call scala function here
dag=dag,
provide_context=True,
)
我需要调用的 Spark Scala 函数
def getLatestRunDate(df: DataFrame): DataFrame = {
val df = spark.table("my_hive_schema.my_run_catalog_table")
df
.filter(col("date_of_job_run").leq(current_date())
and col("month_nbr").geq(month(current_date())-2)
and col("yr_nbr").equalTo(year()))
.select("date_of_job_run")
}
我建议将其分成两个任务。在第一个任务中,使用 XCOM 调用 Scala 函数和 return 值。第二个任务可以是短路运算符,可以从第一个任务接收 XCOM 值作为参数。