如何使用 Scala Operator 运行 Airflow 中的 Scala 代码
How to run Scala code in Airflow using Scala Operator
我刚刚为 Aerospike 编写了一个恢复过程,它看起来非常适合 Airflow,我正在寻找一些适用于 Scala 的 Airflow Operator。
当前实施:
// Register UDF for LUT
aerospikeService.registerUDFs(
"""
|function getLUT(r)
| return record.last_update_time(r)
|end
|""".stripMargin
)
// Pause Connectors
k8sService.pauseConnectors()
// Get Connectors, Current Offsets and LUTs
val connectors = k8sService.getConnectors()
val originalState = kafkaService.getCurrentState()
val startTime = aerospikeService.calculateCurrentLUTs()
// Delete Connectors
k8sService.deleteConnectors()
kafkaService.resetOffsets(originalState)
// Recreate Connectors
k8sService.createConnectors(connectors)
// Wait until Offset Reached
kafkaService.waitTillOriginalOffsetsReached(originalState)
// Truncate
aerospikeService.truncate(startTime, durableDelete)
// Cleanup
aerospikeService.cleanup()
Airflow to 运行 Scala 代码中没有“ScalaOperator”。 Python 不是 JVM 语言,因此您需要构建一个 jar 文件,它可以从另一个进程执行。例如,在 Airflow 中使用 BashOperator:
scala_task = BashOperator(
task_id="scala_task",
dag=dag,
bash_command="java -jar myjar.jar",
)
另一种流行的解决方案是将代码构建到 Docker 容器中,然后使用 KubernetesPodOperator 在 Kubernetes 集群上启动它。
请注意,BashOperator (1) 要求 JVM 存在于 Airflow 工作节点上,并且 (2) 如果使用 BashOperator 触发,进程将 运行 在工作节点上,因此请确保有有足够的资源来处理它。如果没有,请将繁重的处理“外包”到其他地方,例如K8S 或 Spark 集群。
我刚刚为 Aerospike 编写了一个恢复过程,它看起来非常适合 Airflow,我正在寻找一些适用于 Scala 的 Airflow Operator。
当前实施:
// Register UDF for LUT
aerospikeService.registerUDFs(
"""
|function getLUT(r)
| return record.last_update_time(r)
|end
|""".stripMargin
)
// Pause Connectors
k8sService.pauseConnectors()
// Get Connectors, Current Offsets and LUTs
val connectors = k8sService.getConnectors()
val originalState = kafkaService.getCurrentState()
val startTime = aerospikeService.calculateCurrentLUTs()
// Delete Connectors
k8sService.deleteConnectors()
kafkaService.resetOffsets(originalState)
// Recreate Connectors
k8sService.createConnectors(connectors)
// Wait until Offset Reached
kafkaService.waitTillOriginalOffsetsReached(originalState)
// Truncate
aerospikeService.truncate(startTime, durableDelete)
// Cleanup
aerospikeService.cleanup()
Airflow to 运行 Scala 代码中没有“ScalaOperator”。 Python 不是 JVM 语言,因此您需要构建一个 jar 文件,它可以从另一个进程执行。例如,在 Airflow 中使用 BashOperator:
scala_task = BashOperator(
task_id="scala_task",
dag=dag,
bash_command="java -jar myjar.jar",
)
另一种流行的解决方案是将代码构建到 Docker 容器中,然后使用 KubernetesPodOperator 在 Kubernetes 集群上启动它。
请注意,BashOperator (1) 要求 JVM 存在于 Airflow 工作节点上,并且 (2) 如果使用 BashOperator 触发,进程将 运行 在工作节点上,因此请确保有有足够的资源来处理它。如果没有,请将繁重的处理“外包”到其他地方,例如K8S 或 Spark 集群。