无法打开 Airflow 的智能传感器功能(自定义传感器)
Cannot turn on Airflow's Smart Sensors feature (custom sensor)
我正在尝试在自定义传感器运算符(即 BaseSensorOperator
的子类)上使用 Airflow 的 Smart Sensors feature。目前有关此功能的文档非常少。
分片作业 (smart_sensor_group_shard_[x]
) 是 运行,但我认为它们没有接收到我的传感器。那些日志说 Loaded 0 sensor_works
.
我认为问题是 BaseSensorOperator.is_smart_sensor_compatible()
返回 False,即使我在我的配置 中打开了该功能。这是我的配置:
[smart_sensor]
sensors_enabled = MyCustomSensor
use_smart_sensor = True
但这里是来自 MySensorOperator
的日志:
INFO - self.sensor_service_enabled=False
INFO - self.sensors_support_sensor_service={'NamedHivePartitionSensor'}
INFO - Sensor is NOT Smart Sensor compatible
如您所见,操作员仍会看到 Airflow 对这些配置值的默认设置。我不知道为什么会出现这种不一致,因为我可以在 UI.
中看到正确设置的配置
我的其余代码
来自MyCustomSensor的相关代码:
class MyCustomSensor(BaseSensorOperator):
poke_context_fields = ['some_arg', 'use_smart_sensor']
def __init__(self, some_arg,
use_smart_sensor=False,
*args, **kwargs):
self.some_arg = some_arg
self.use_smart_sensor = use_smart_sensor
super(MyCustomSensor, self).__init__(*args, **kwargs)
def is_smart_sensor_compatible(self):
# If we have turned it off.
if not self.use_smart_sensor:
is_compatible = False
else:
self.soft_fail = False
# super() should be BaseSensorOperator
is_compatible = super().is_smart_sensor_compatible()
log.info(f'{self.sensor_service_enabled=}')
log.info(f'{self.sensors_support_sensor_service=}')
if is_compatible:
log.info('Sensor IS Smart Sensor compatible')
else:
log.info('Sensor is NOT Smart Sensor compatible')
return is_compatible
我如何创建传感器任务:
# NOTE: I think that poke_interval may
# be ignored when we are using Smart
# Sensors.
my_sensor = MyCustomSensor(
task_id='some_name',
prior_task='some_other_name',
timeout=518400,
mode='reschedule',
poke_interval=30,
use_smart_sensor=True,
dag=dag
)
我正在使用 Cloud Composer,特别是版本 composer-1.17.1-airflow-2.1.2
。我已验证这些不是 Cloud Composer 的阻止配置。
当我在现有的 Cloud Composer 实例上使用命令 gcloud composer environments update
更新 sensors_enabled
和 use_smart_sensor
的值时,我能够使用您的代码重现您的错误。 Cloud Composer 似乎没有在 运行 时间应用新配置。
但我找到了解决方法。请参阅以下步骤:
- 我创建了一个新的 composer 实例,并且在创建页面我已经定义了 Airflow 配置覆盖。
- 使用你的 DAG,我 运行 测试并正确应用了 Airflow 配置。
我创建了一个 public issue tracker 以将此报告给 Cloud Composer 工程团队。
更新:
另一种解决方法是设置虚拟环境变量以强制工作人员重新启动并应用对 Airflow 配置的更改。
gcloud composer environments update <composer_env_name> \
--location <location> \
--update-env-variables=DUMMY=dummy
更新 2:
从 Airflow 2.2 开始,Deferrable Operators 作为智能传感器的首选解决方案。您应该首先查看该功能。
我正在尝试在自定义传感器运算符(即 BaseSensorOperator
的子类)上使用 Airflow 的 Smart Sensors feature。目前有关此功能的文档非常少。
分片作业 (smart_sensor_group_shard_[x]
) 是 运行,但我认为它们没有接收到我的传感器。那些日志说 Loaded 0 sensor_works
.
我认为问题是 BaseSensorOperator.is_smart_sensor_compatible()
返回 False,即使我在我的配置 中打开了该功能。这是我的配置:
[smart_sensor]
sensors_enabled = MyCustomSensor
use_smart_sensor = True
但这里是来自 MySensorOperator
的日志:
INFO - self.sensor_service_enabled=False
INFO - self.sensors_support_sensor_service={'NamedHivePartitionSensor'}
INFO - Sensor is NOT Smart Sensor compatible
如您所见,操作员仍会看到 Airflow 对这些配置值的默认设置。我不知道为什么会出现这种不一致,因为我可以在 UI.
中看到正确设置的配置我的其余代码
来自MyCustomSensor的相关代码:
class MyCustomSensor(BaseSensorOperator):
poke_context_fields = ['some_arg', 'use_smart_sensor']
def __init__(self, some_arg,
use_smart_sensor=False,
*args, **kwargs):
self.some_arg = some_arg
self.use_smart_sensor = use_smart_sensor
super(MyCustomSensor, self).__init__(*args, **kwargs)
def is_smart_sensor_compatible(self):
# If we have turned it off.
if not self.use_smart_sensor:
is_compatible = False
else:
self.soft_fail = False
# super() should be BaseSensorOperator
is_compatible = super().is_smart_sensor_compatible()
log.info(f'{self.sensor_service_enabled=}')
log.info(f'{self.sensors_support_sensor_service=}')
if is_compatible:
log.info('Sensor IS Smart Sensor compatible')
else:
log.info('Sensor is NOT Smart Sensor compatible')
return is_compatible
我如何创建传感器任务:
# NOTE: I think that poke_interval may
# be ignored when we are using Smart
# Sensors.
my_sensor = MyCustomSensor(
task_id='some_name',
prior_task='some_other_name',
timeout=518400,
mode='reschedule',
poke_interval=30,
use_smart_sensor=True,
dag=dag
)
我正在使用 Cloud Composer,特别是版本 composer-1.17.1-airflow-2.1.2
。我已验证这些不是 Cloud Composer 的阻止配置。
当我在现有的 Cloud Composer 实例上使用命令 gcloud composer environments update
更新 sensors_enabled
和 use_smart_sensor
的值时,我能够使用您的代码重现您的错误。 Cloud Composer 似乎没有在 运行 时间应用新配置。
但我找到了解决方法。请参阅以下步骤:
- 我创建了一个新的 composer 实例,并且在创建页面我已经定义了 Airflow 配置覆盖。
- 使用你的 DAG,我 运行 测试并正确应用了 Airflow 配置。
我创建了一个 public issue tracker 以将此报告给 Cloud Composer 工程团队。
更新:
另一种解决方法是设置虚拟环境变量以强制工作人员重新启动并应用对 Airflow 配置的更改。
gcloud composer environments update <composer_env_name> \
--location <location> \
--update-env-variables=DUMMY=dummy
更新 2:
从 Airflow 2.2 开始,Deferrable Operators 作为智能传感器的首选解决方案。您应该首先查看该功能。