使用 MesosExecutor 在 Airflow 上自定义任务资源
Customize task resources on Airflow using MesosExecutor
是否可以在使用 MesosExecutor 时为 DAG 的每个算子指定资源(CPU、内存、GPU、磁盘space)?
我知道您可以为任务的资源指定全局值。
例如,我有几个 CPU 昂贵的运算符,而其他则不是。我想在第一个时间执行一个,但许多与非 CPU 昂贵的并行执行。
从 code(mesos_executor.py 第 67 行)看来,这似乎是不可能的,因为 cpu 和内存值在初始化期间传递给调度程序:
def __init__(self,
task_queue,
result_queue,
task_cpu=1,
task_mem=256):
self.task_queue = task_queue
self.result_queue = result_queue
self.task_cpu = task_cpu
self.task_mem = task_mem
和那些values are used没有修改:
cpus = task.resources.add()
cpus.name = "cpus"
cpus.type = mesos_pb2.Value.SCALAR
cpus.scalar.value = self.task_cpu
mem = task.resources.add()
mem.name = "mem"
mem.type = mesos_pb2.Value.SCALAR
mem.scalar.value = self.task_mem
需要自定义执行器实现才能实现
是否可以在使用 MesosExecutor 时为 DAG 的每个算子指定资源(CPU、内存、GPU、磁盘space)?
我知道您可以为任务的资源指定全局值。
例如,我有几个 CPU 昂贵的运算符,而其他则不是。我想在第一个时间执行一个,但许多与非 CPU 昂贵的并行执行。
从 code(mesos_executor.py 第 67 行)看来,这似乎是不可能的,因为 cpu 和内存值在初始化期间传递给调度程序:
def __init__(self,
task_queue,
result_queue,
task_cpu=1,
task_mem=256):
self.task_queue = task_queue
self.result_queue = result_queue
self.task_cpu = task_cpu
self.task_mem = task_mem
和那些values are used没有修改:
cpus = task.resources.add()
cpus.name = "cpus"
cpus.type = mesos_pb2.Value.SCALAR
cpus.scalar.value = self.task_cpu
mem = task.resources.add()
mem.name = "mem"
mem.type = mesos_pb2.Value.SCALAR
mem.scalar.value = self.task_mem
需要自定义执行器实现才能实现