如何使用 Scala Operator 运行 Airflow 中的 Scala 代码

How to run Scala code in Airflow using Scala Operator

我刚刚为 Aerospike 编写了一个恢复过程,它看起来非常适合 Airflow,我正在寻找一些适用于 Scala 的 Airflow Operator。

当前实施:

 // Register UDF for LUT
  aerospikeService.registerUDFs(
    """
      |function getLUT(r)
      |    return record.last_update_time(r)
      |end
      |""".stripMargin
  )

  // Pause Connectors
  k8sService.pauseConnectors()

  // Get Connectors, Current Offsets and LUTs
  val connectors = k8sService.getConnectors()
  val originalState = kafkaService.getCurrentState()
  val startTime = aerospikeService.calculateCurrentLUTs()

  // Delete Connectors
  k8sService.deleteConnectors()
  kafkaService.resetOffsets(originalState)

  // Recreate Connectors
  k8sService.createConnectors(connectors)

  // Wait until Offset Reached
  kafkaService.waitTillOriginalOffsetsReached(originalState)

  // Truncate
  aerospikeService.truncate(startTime, durableDelete)

  // Cleanup
  aerospikeService.cleanup()

Airflow to 运行 Scala 代码中没有“ScalaOperator”。 Python 不是 JVM 语言,因此您需要构建一个 jar 文件,它可以从另一个进程执行。例如,在 Airflow 中使用 BashOperator:

scala_task = BashOperator(
    task_id="scala_task",
    dag=dag,
    bash_command="java -jar myjar.jar",
)

另一种流行的解决方案是将代码构建到 Docker 容器中,然后使用 KubernetesPodOperator 在 Kubernetes 集群上启动它。

请注意,BashOperator (1) 要求 JVM 存在于 Airflow 工作节点上,并且 (2) 如果使用 BashOperator 触发,进程将 运行 在工作节点上,因此请确保有有足够的资源来处理它。如果没有,请将繁重的处理“外包”到其他地方,例如K8S 或 Spark 集群。