DAG 不接受参数化消息
DAG not taking the parameterzied message
我有一个 DAG 需要执行 Python 运算符并将生成的字符串作为消息传递给 PubsubPublish 运算符。
我下面的代码完美地打印了消息,但是当我将这个 DAG 上传到气流时,它没有加载。我认为这是我的 DAG 的结构,pubsubpublish 运算符无法读取参数 'messages'
我尝试将消息用作模板化字段,但这也无济于事。
def download_yaml():
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(source_blob_name)
content_blob=blob.download_as_string()
encoded_string = base64.b64encode(content_blob)
return encoded_string
encoded_string = download_yaml()
messages = [
{'data': b64encode(encoded_string)},
]
print messages
dag= DAG('pubsub-message-docker', default_args=default_args,schedule_interval=timedelta(days=1))
t2 = PubSubPublishOperator(project=project,topic=topic,task_id='publish-messages', messages=messages,dag=dag)
t1= PythonOperator(task_id='download_yaml_as_string',provide_context=True,python_callable=download_yaml,dag=dag)
t1.set_downstream(t2)
我可以打印 'encoded string',但是我需要将 encoded_string 作为消息传递到我的 pubsubpublish 运算符中,以便发布它。
如果您在 UI 中看到 DAG,但收到您在评论中提到的错误(主调度程序不知道它的存在),那么我建议快速查看调度程序第一的!确保调度程序可以访问您的 DAG,然后尝试重新启动调度程序。当您遇到该错误时...您将在网络中看到 DAG UI,但无法 运行 它或查看它的日志。
至于您关于将消息传递给 PubSubPublishOperator 的其他问题,我相信应该没问题!
这里有两点供您参考。
1. dag运营者之间的信息交换,Xcom应该是更官方的方式。
XComs let tasks exchange messages, allowing more nuanced forms
of control and shared state. The name is an abbreviation of
“cross-communication”. ...... Any object that
can be pickled can be used as an XCom value, so users should make sure
to use objects of appropriate size.
XComs can be “pushed” (sent) or “pulled” (received). .....
Tasks call xcom_pull() to retrieve XComs, optionally applying filters
based on criteria like key, source task_ids, and source dag_id. ......
https://airflow.apache.org/concepts.html#xcoms
- 您 python 文件可能 运行 并得到未知结果,因为消息与任务 t1 没有关系。它只是在开始时由函数 download_yml 初始化。虽然 t1 再次调用 download_yml,但没有更改消息。因此,T2 仅获取具有初始值的消息。要解决这个问题,你必须将t1中的消息推送到Xcom,并从Xcom中拉取t2中的消息。
祝你好运。
王勇
终于解决了:)
消息必须传递给 pubsubpublish 操作员,如下所示:
消息={'data':("{{task_instance.xcom_pull(键='encoded_string',task_ids='download_yaml')}}")}
然后传消息给参数messages。
t2 = PubSubPublishOperator(project=project,topic=topic,task_id='publish-messages',messages=messages,dag=dag)
最好的问候
萨克什
只是想分享一个解决方案,用于当存在需要填充的嵌套模板值时,例如当消息的有效负载(“数据”)为 json 时。 Airflow v1.10.x 中的内置 PubSubPublishOperator 不支持填充嵌套值,因此我创建了一个自定义运算符。附带好处:您可以传入正常的 dictionary/string 值,而不必担心编码问题:
import json
from airflow.models import BaseOperator
from google.cloud import pubsub_v1
class BetterPubSubPublishOperator(BaseOperator):
"""Publish Pub/Sub messages to a topic.
Allows for templated values to be filled in nested json messages,
which won't get filled using the official `PubSubPublishOperator`.
Args:
project: Name of the project where the `topic` resides.
topic: Name of the Pub/Sub topic to publish to.
messages: A list of dictionaries, where each dictionary reflects a message to be published.
Each message must define one or both of "data" and/or "attributes" keys.
Example:
```
with DAG(
"some_dag",
schedule_interval=None,
max_active_runs=1,
catchup=False,
) as dag:
t_publish_pubsub = BetterPubSubPublishOperator(
task_id="publish_pubsub",
project="your-project",
topic="your-topic-name",
messages=[
{
"data": {
"messageId": str(uuid4()),
"publishTime": "{{ ti.start_date }}",
"youKey1": "key1 value",
"youKey2": "key2 value",
},
"attributes": {"labelKey1": "label1 value", "labelKey2": "label2 value"},
}
],
)
```
"""
template_fields = ("project", "topic", "messages")
ui_color = "#0273d4"
@apply_defaults
def __init__(self, project: str, topic: str, messages: List[Dict], *args, **kwargs):
super().__init__(*args, **kwargs)
self.project = project
self.topic = topic
self.messages = messages
@staticmethod
def publish(project: str, topic: str, messages: List[Dict], **context) -> None:
client = pubsub_v1.PublisherClient()
topic_path = client.topic_path(project, topic)
for message in messages:
raw_data = message.get("data")
if isinstance(raw_data, (dict, list)):
data = json.dumps(raw_data).encode("utf-8")
elif isinstance(raw_data, str):
data = raw_data.encode("utf-8")
elif raw_data is None:
data = b""
else:
raise TypeError(
f"Message's 'data' value is not an acceptable type. "
f"Type is {type(raw_data)}, but must be one of dict, list, str, or None"
)
if "attributes" in message:
attrs = message.get("attributes", {})
future = client.publish(topic=topic_path, data=data, **attrs)
messade_id = future.result() # blocking operation
logging.info(
f"Published the following message to topic {topic_path}:\n"
f"{message}\n"
f"The messade_id is {messade_id}"
)
def execute(self, context):
# Export context to make it available for callables to use.
airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
self.log.debug(
"Exporting the following env vars:\n%s",
"\n".join(["{}={}".format(k, v) for k, v in airflow_context_vars.items()]),
)
os.environ.update(airflow_context_vars)
self.publish(
project=self.project, topic=self.topic, messages=self.messages, **context
)
注意:使用 pubsub v1 客户端,但可以轻松更新为新版本。
我有一个 DAG 需要执行 Python 运算符并将生成的字符串作为消息传递给 PubsubPublish 运算符。
我下面的代码完美地打印了消息,但是当我将这个 DAG 上传到气流时,它没有加载。我认为这是我的 DAG 的结构,pubsubpublish 运算符无法读取参数 'messages'
我尝试将消息用作模板化字段,但这也无济于事。
def download_yaml():
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(source_blob_name)
content_blob=blob.download_as_string()
encoded_string = base64.b64encode(content_blob)
return encoded_string
encoded_string = download_yaml()
messages = [
{'data': b64encode(encoded_string)},
]
print messages
dag= DAG('pubsub-message-docker', default_args=default_args,schedule_interval=timedelta(days=1))
t2 = PubSubPublishOperator(project=project,topic=topic,task_id='publish-messages', messages=messages,dag=dag)
t1= PythonOperator(task_id='download_yaml_as_string',provide_context=True,python_callable=download_yaml,dag=dag)
t1.set_downstream(t2)
我可以打印 'encoded string',但是我需要将 encoded_string 作为消息传递到我的 pubsubpublish 运算符中,以便发布它。
如果您在 UI 中看到 DAG,但收到您在评论中提到的错误(主调度程序不知道它的存在),那么我建议快速查看调度程序第一的!确保调度程序可以访问您的 DAG,然后尝试重新启动调度程序。当您遇到该错误时...您将在网络中看到 DAG UI,但无法 运行 它或查看它的日志。
至于您关于将消息传递给 PubSubPublishOperator 的其他问题,我相信应该没问题!
这里有两点供您参考。 1. dag运营者之间的信息交换,Xcom应该是更官方的方式。
XComs let tasks exchange messages, allowing more nuanced forms of control and shared state. The name is an abbreviation of “cross-communication”. ...... Any object that can be pickled can be used as an XCom value, so users should make sure to use objects of appropriate size.
XComs can be “pushed” (sent) or “pulled” (received). .....
Tasks call xcom_pull() to retrieve XComs, optionally applying filters based on criteria like key, source task_ids, and source dag_id. ......
https://airflow.apache.org/concepts.html#xcoms
- 您 python 文件可能 运行 并得到未知结果,因为消息与任务 t1 没有关系。它只是在开始时由函数 download_yml 初始化。虽然 t1 再次调用 download_yml,但没有更改消息。因此,T2 仅获取具有初始值的消息。要解决这个问题,你必须将t1中的消息推送到Xcom,并从Xcom中拉取t2中的消息。
祝你好运。
王勇
终于解决了:) 消息必须传递给 pubsubpublish 操作员,如下所示:
消息={'data':("{{task_instance.xcom_pull(键='encoded_string',task_ids='download_yaml')}}")}
然后传消息给参数messages。 t2 = PubSubPublishOperator(project=project,topic=topic,task_id='publish-messages',messages=messages,dag=dag)
最好的问候 萨克什
只是想分享一个解决方案,用于当存在需要填充的嵌套模板值时,例如当消息的有效负载(“数据”)为 json 时。 Airflow v1.10.x 中的内置 PubSubPublishOperator 不支持填充嵌套值,因此我创建了一个自定义运算符。附带好处:您可以传入正常的 dictionary/string 值,而不必担心编码问题:
import json
from airflow.models import BaseOperator
from google.cloud import pubsub_v1
class BetterPubSubPublishOperator(BaseOperator):
"""Publish Pub/Sub messages to a topic.
Allows for templated values to be filled in nested json messages,
which won't get filled using the official `PubSubPublishOperator`.
Args:
project: Name of the project where the `topic` resides.
topic: Name of the Pub/Sub topic to publish to.
messages: A list of dictionaries, where each dictionary reflects a message to be published.
Each message must define one or both of "data" and/or "attributes" keys.
Example:
```
with DAG(
"some_dag",
schedule_interval=None,
max_active_runs=1,
catchup=False,
) as dag:
t_publish_pubsub = BetterPubSubPublishOperator(
task_id="publish_pubsub",
project="your-project",
topic="your-topic-name",
messages=[
{
"data": {
"messageId": str(uuid4()),
"publishTime": "{{ ti.start_date }}",
"youKey1": "key1 value",
"youKey2": "key2 value",
},
"attributes": {"labelKey1": "label1 value", "labelKey2": "label2 value"},
}
],
)
```
"""
template_fields = ("project", "topic", "messages")
ui_color = "#0273d4"
@apply_defaults
def __init__(self, project: str, topic: str, messages: List[Dict], *args, **kwargs):
super().__init__(*args, **kwargs)
self.project = project
self.topic = topic
self.messages = messages
@staticmethod
def publish(project: str, topic: str, messages: List[Dict], **context) -> None:
client = pubsub_v1.PublisherClient()
topic_path = client.topic_path(project, topic)
for message in messages:
raw_data = message.get("data")
if isinstance(raw_data, (dict, list)):
data = json.dumps(raw_data).encode("utf-8")
elif isinstance(raw_data, str):
data = raw_data.encode("utf-8")
elif raw_data is None:
data = b""
else:
raise TypeError(
f"Message's 'data' value is not an acceptable type. "
f"Type is {type(raw_data)}, but must be one of dict, list, str, or None"
)
if "attributes" in message:
attrs = message.get("attributes", {})
future = client.publish(topic=topic_path, data=data, **attrs)
messade_id = future.result() # blocking operation
logging.info(
f"Published the following message to topic {topic_path}:\n"
f"{message}\n"
f"The messade_id is {messade_id}"
)
def execute(self, context):
# Export context to make it available for callables to use.
airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
self.log.debug(
"Exporting the following env vars:\n%s",
"\n".join(["{}={}".format(k, v) for k, v in airflow_context_vars.items()]),
)
os.environ.update(airflow_context_vars)
self.publish(
project=self.project, topic=self.topic, messages=self.messages, **context
)
注意:使用 pubsub v1 客户端,但可以轻松更新为新版本。