Airflow SQSSensor 消息过滤

Airflow SQSSensor message filtering

下面给出 json:

{ "Model" : "level1" }

message_filtering_match_valuesmessage_filtering_config 值的正确组合是什么?我在下面尝试但失败了:

model_operator = SQSSensor(
  task_id='model_operator',
  dag=dag,
  sqs_queue='https://sqs.somewhere/somequeue.fifo',
  aws_conn_id='aws_default',
  message_filtering='jsonpath',
  message_filtering_config='Model[*]',
  message_filtering_match_values=['level1'],
  mode='reschedule')

错误信息是:

Broken DAG: [/usr/local/airflow/dags/test_dag.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 94, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 414, in __init__
    "arguments were:\n**kwargs: {k}".format(c=self.__class__.__name__, k=kwargs, t=task_id),
airflow.exceptions.AirflowException: Invalid arguments were passed to SQSSensor (task_id: model_operator). Invalid arguments were:
**kwargs: {'message_filtering': 'jsonpath', 'message_filtering_config': 'Model[*]', 'message_filtering_match_values': ['level1']}

message_filtering / message_filtering_config / message_filtering_match_values 最近添加到 PR it was released in Amazon provider version 2.2.0 从回溯中我们可以看出,这些参数无法被运营商识别,这意味着您 运行 是亚马逊提供商的旧版本。

您应该将亚马逊提供商升级到最新版本。

pip install apache-airflow-providers-amazon --upgrade

还建议阅读有关 constraint 个文件的文档。

你没有提到你的 Airflow 版本 运行 也没有提到亚马逊供应商的版本所以请注意阅读更改日志以防你升级主要版本。