使用来自 Airflow 的 Python 在 Pub/Sub 中发布消息时遇到问题

Having problem to publish a message in Pub/Sub using Python from Airflow

我正尝试在 Pub/Sub 中发布消息,但出现此错误:

[2021-08-30 23:10:55,317] {taskinstance.py:1049} ERROR - 'Future' object has no attribute '_condition'
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 1046, in _run_raw_tas
    task.on_success_callback(context
  File "/home/airflow/gcs/plugins/logger.py", line 70, in on_success_task_instanc
    log_monitoring(DAG_STATE_SUCCESS, context=context
  File "/home/airflow/gcs/plugins/logger.py", line 220, in log_monitorin
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED
  File "/opt/python3.6/lib/python3.6/concurrent/futures/_base.py", line 284, in wai
    with _AcquireFutures(fs)
  File "/opt/python3.6/lib/python3.6/concurrent/futures/_base.py", line 146, in __enter_
    future._condition.acquire(
AttributeError: 'Future' object has no attribute '_condition

代码如下:

# Publishes multiple messages to a Pub/Sub topic with an error handler. 
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Any, Callable

topic_path = 'projects/.../topics/test_log_monitoring'
publish_futures = []

publisher = pubsub_v1.PublisherClient()

def get_callback(
    publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
    def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
        try:
            # Wait 60 seconds for the publish call to succeed.
            print(publish_future.result(timeout=60))
        except futures.TimeoutError:
            print(f"Publishing {data} timed out.")

    return callback

record = {
    'Key1': 'Value1',
    'Key2': 'Value2',
    'Key3': 'Value3'
}

data = json.dumps(record).encode("utf-8")
# When you publish a message, the client returns a future.
publish_future = publisher.publish(topic_path, data)
# on-blocking. Publish failures are handled in the callback function.
publish_future.add_done_callback(get_callback(publish_future, data))
publish_futures.append(publish_future)

# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with error handler to {topic_path}.")

顺便说一句,我正在学习这个官方教程:https://cloud.google.com/pubsub/docs/samples/pubsub-publish-with-error-handler

你知道哪里出了问题吗?

最后,有什么不同的等待消息发布的方法吗?

我能够使用代码片段重现您的问题。我使用了 Composer 版本 1.16.15Airflow 版本 1.10.15。我没有安装任何额外的 python 库。

要解决此问题,请将您的 Pubsub 更新到 Cloud Composer 实例中的最新版本 2.7.1。您可以使用命令 gcloud composer environments update 更新它。有关详细信息,请参阅 Installing a Python dependency from PyPI

为了能够顺利更新 Pubsub 库,请在 requirements.txt 中明确定义 Google 库。这是因为 Google 库依赖于其他 Google 库,请参阅 Pubsub library dependencies. You can get the pre installed Google libraries in Cloud Composer pre installed packages 参考。但是,如果您更新了 Google 库,则可以只包含您在 requirements.txt.

中使用的版本

requirements.txt

google-ads==4.0.0
google-api-core==1.26.1
google-api-python-client==1.12.8
google-apitools==0.5.31
google-auth==1.28.0
google-auth-httplib2==0.1.0
google-auth-oauthlib==0.4.3
google-cloud-automl==2.2.0
google-cloud-bigquery==2.13.0
google-cloud-bigquery-datatransfer==3.1.0
google-cloud-bigquery-storage==2.1.0
google-cloud-bigtable==1.7.0
google-cloud-build==2.0.0
google-cloud-container==1.0.1
google-cloud-core==1.6.0
google-cloud-datacatalog==3.1.0
google-cloud-dataproc==2.3.0
google-cloud-datastore==1.15.3
google-cloud-dlp==1.0.0
google-cloud-kms==2.2.0
google-cloud-language==1.3.0
google-cloud-logging==2.2.0
google-cloud-memcache==0.3.0
google-cloud-monitoring==2.0.0
google-cloud-os-login==2.1.0
google-cloud-pubsub==2.7.1
google-cloud-pubsublite==0.3.0
google-cloud-redis==2.1.0
google-cloud-secret-manager==1.0.0
google-cloud-spanner==1.19.1
google-cloud-speech==1.3.2
google-cloud-storage==1.36.2
google-cloud-tasks==2.2.0
google-cloud-texttospeech==1.0.1
google-cloud-translate==1.7.0
google-cloud-videointelligence==1.16.1
google-cloud-vision==1.0.0
google-cloud-workflows==0.2.0
google-crc32c==1.1.2
google-pasta==0.2.0
google-resumable-media==1.2.0
googleapis-common-protos==1.53.0
graphviz==0.16
greenlet==1.0.0
grpc-google-iam-v1==0.12.3
grpcio==1.38.1
grpcio-gcp==0.2.2

更新命令:

gcloud composer environments update your-environrment-name --update-pypi-packages-from-file requirements.txt --location your-composer-location

安装后它将 return 完成:

在 GCP Console-> Composer-> your-environment -> PYPI Packages 中检查版本:

气流测试运行:

气流日志:

使用的DAG:

import datetime

import airflow
from airflow.operators import bash_operator
from airflow.operators import python_operator
import json
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Any, Callable

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

def publish_error_handle():
    topic_path = 'projects/your-project-id/topics/test-topic'
    publish_futures = []

    publisher = pubsub_v1.PublisherClient()

    def get_callback(
        publish_future: pubsub_v1.publisher.futures.Future, data: str
    ) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
        def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
            try:
                # Wait 60 seconds for the publish call to succeed.
                print(publish_future.result(timeout=60))
            except futures.TimeoutError:
                print(f"Publishing {data} timed out.")

        return callback

    record = {
        'Key1': 'Value1',
        'Key2': 'Value2',
        'Key3': 'Value3'
    }

    data = json.dumps(record).encode("utf-8")
    # When you publish a message, the client returns a future.
    publish_future = publisher.publish(topic_path, data)
    # on-blocking. Publish failures are handled in the callback function.
    publish_future.add_done_callback(get_callback(publish_future, data))
    publish_futures.append(publish_future)

    # Wait for all the publish futures to resolve before exiting.
    futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

    print(f"Published messages with error handler to {topic_path}.")

with airflow.DAG(
        'composer_sample_dag',
        'catchup=False',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    publish_handle = python_operator.PythonOperator(
        task_id='publish_handle',
        python_callable=publish_error_handle
    )

    publish_handle