使用来自 Airflow 的 Python 在 Pub/Sub 中发布消息时遇到问题
Having problem to publish a message in Pub/Sub using Python from Airflow
我正尝试在 Pub/Sub 中发布消息,但出现此错误:
[2021-08-30 23:10:55,317] {taskinstance.py:1049} ERROR - 'Future' object has no attribute '_condition'
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 1046, in _run_raw_tas
task.on_success_callback(context
File "/home/airflow/gcs/plugins/logger.py", line 70, in on_success_task_instanc
log_monitoring(DAG_STATE_SUCCESS, context=context
File "/home/airflow/gcs/plugins/logger.py", line 220, in log_monitorin
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED
File "/opt/python3.6/lib/python3.6/concurrent/futures/_base.py", line 284, in wai
with _AcquireFutures(fs)
File "/opt/python3.6/lib/python3.6/concurrent/futures/_base.py", line 146, in __enter_
future._condition.acquire(
AttributeError: 'Future' object has no attribute '_condition
代码如下:
# Publishes multiple messages to a Pub/Sub topic with an error handler.
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Any, Callable
topic_path = 'projects/.../topics/test_log_monitoring'
publish_futures = []
publisher = pubsub_v1.PublisherClient()
def get_callback(
publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
try:
# Wait 60 seconds for the publish call to succeed.
print(publish_future.result(timeout=60))
except futures.TimeoutError:
print(f"Publishing {data} timed out.")
return callback
record = {
'Key1': 'Value1',
'Key2': 'Value2',
'Key3': 'Value3'
}
data = json.dumps(record).encode("utf-8")
# When you publish a message, the client returns a future.
publish_future = publisher.publish(topic_path, data)
# on-blocking. Publish failures are handled in the callback function.
publish_future.add_done_callback(get_callback(publish_future, data))
publish_futures.append(publish_future)
# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
print(f"Published messages with error handler to {topic_path}.")
顺便说一句,我正在学习这个官方教程:https://cloud.google.com/pubsub/docs/samples/pubsub-publish-with-error-handler
你知道哪里出了问题吗?
最后,有什么不同的等待消息发布的方法吗?
我能够使用代码片段重现您的问题。我使用了 Composer 版本 1.16.15 和 Airflow 版本 1.10.15。我没有安装任何额外的 python 库。
要解决此问题,请将您的 Pubsub 更新到 Cloud Composer 实例中的最新版本 2.7.1。您可以使用命令 gcloud composer environments update
更新它。有关详细信息,请参阅 Installing a Python dependency from PyPI。
为了能够顺利更新 Pubsub 库,请在 requirements.txt
中明确定义 Google 库。这是因为 Google 库依赖于其他 Google 库,请参阅 Pubsub library dependencies. You can get the pre installed Google libraries in Cloud Composer pre installed packages 参考。但是,如果您更新了 Google 库,则可以只包含您在 requirements.txt
.
中使用的版本
requirements.txt
google-ads==4.0.0
google-api-core==1.26.1
google-api-python-client==1.12.8
google-apitools==0.5.31
google-auth==1.28.0
google-auth-httplib2==0.1.0
google-auth-oauthlib==0.4.3
google-cloud-automl==2.2.0
google-cloud-bigquery==2.13.0
google-cloud-bigquery-datatransfer==3.1.0
google-cloud-bigquery-storage==2.1.0
google-cloud-bigtable==1.7.0
google-cloud-build==2.0.0
google-cloud-container==1.0.1
google-cloud-core==1.6.0
google-cloud-datacatalog==3.1.0
google-cloud-dataproc==2.3.0
google-cloud-datastore==1.15.3
google-cloud-dlp==1.0.0
google-cloud-kms==2.2.0
google-cloud-language==1.3.0
google-cloud-logging==2.2.0
google-cloud-memcache==0.3.0
google-cloud-monitoring==2.0.0
google-cloud-os-login==2.1.0
google-cloud-pubsub==2.7.1
google-cloud-pubsublite==0.3.0
google-cloud-redis==2.1.0
google-cloud-secret-manager==1.0.0
google-cloud-spanner==1.19.1
google-cloud-speech==1.3.2
google-cloud-storage==1.36.2
google-cloud-tasks==2.2.0
google-cloud-texttospeech==1.0.1
google-cloud-translate==1.7.0
google-cloud-videointelligence==1.16.1
google-cloud-vision==1.0.0
google-cloud-workflows==0.2.0
google-crc32c==1.1.2
google-pasta==0.2.0
google-resumable-media==1.2.0
googleapis-common-protos==1.53.0
graphviz==0.16
greenlet==1.0.0
grpc-google-iam-v1==0.12.3
grpcio==1.38.1
grpcio-gcp==0.2.2
更新命令:
gcloud composer environments update your-environrment-name --update-pypi-packages-from-file requirements.txt --location your-composer-location
安装后它将 return 完成:
在 GCP Console-> Composer-> your-environment -> PYPI Packages 中检查版本:
气流测试运行:
气流日志:
使用的DAG:
import datetime
import airflow
from airflow.operators import bash_operator
from airflow.operators import python_operator
import json
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Any, Callable
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
def publish_error_handle():
topic_path = 'projects/your-project-id/topics/test-topic'
publish_futures = []
publisher = pubsub_v1.PublisherClient()
def get_callback(
publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
try:
# Wait 60 seconds for the publish call to succeed.
print(publish_future.result(timeout=60))
except futures.TimeoutError:
print(f"Publishing {data} timed out.")
return callback
record = {
'Key1': 'Value1',
'Key2': 'Value2',
'Key3': 'Value3'
}
data = json.dumps(record).encode("utf-8")
# When you publish a message, the client returns a future.
publish_future = publisher.publish(topic_path, data)
# on-blocking. Publish failures are handled in the callback function.
publish_future.add_done_callback(get_callback(publish_future, data))
publish_futures.append(publish_future)
# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
print(f"Published messages with error handler to {topic_path}.")
with airflow.DAG(
'composer_sample_dag',
'catchup=False',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
publish_handle = python_operator.PythonOperator(
task_id='publish_handle',
python_callable=publish_error_handle
)
publish_handle
我正尝试在 Pub/Sub 中发布消息,但出现此错误:
[2021-08-30 23:10:55,317] {taskinstance.py:1049} ERROR - 'Future' object has no attribute '_condition'
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 1046, in _run_raw_tas
task.on_success_callback(context
File "/home/airflow/gcs/plugins/logger.py", line 70, in on_success_task_instanc
log_monitoring(DAG_STATE_SUCCESS, context=context
File "/home/airflow/gcs/plugins/logger.py", line 220, in log_monitorin
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED
File "/opt/python3.6/lib/python3.6/concurrent/futures/_base.py", line 284, in wai
with _AcquireFutures(fs)
File "/opt/python3.6/lib/python3.6/concurrent/futures/_base.py", line 146, in __enter_
future._condition.acquire(
AttributeError: 'Future' object has no attribute '_condition
代码如下:
# Publishes multiple messages to a Pub/Sub topic with an error handler.
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Any, Callable
topic_path = 'projects/.../topics/test_log_monitoring'
publish_futures = []
publisher = pubsub_v1.PublisherClient()
def get_callback(
publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
try:
# Wait 60 seconds for the publish call to succeed.
print(publish_future.result(timeout=60))
except futures.TimeoutError:
print(f"Publishing {data} timed out.")
return callback
record = {
'Key1': 'Value1',
'Key2': 'Value2',
'Key3': 'Value3'
}
data = json.dumps(record).encode("utf-8")
# When you publish a message, the client returns a future.
publish_future = publisher.publish(topic_path, data)
# on-blocking. Publish failures are handled in the callback function.
publish_future.add_done_callback(get_callback(publish_future, data))
publish_futures.append(publish_future)
# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
print(f"Published messages with error handler to {topic_path}.")
顺便说一句,我正在学习这个官方教程:https://cloud.google.com/pubsub/docs/samples/pubsub-publish-with-error-handler
你知道哪里出了问题吗?
最后,有什么不同的等待消息发布的方法吗?
我能够使用代码片段重现您的问题。我使用了 Composer 版本 1.16.15 和 Airflow 版本 1.10.15。我没有安装任何额外的 python 库。
要解决此问题,请将您的 Pubsub 更新到 Cloud Composer 实例中的最新版本 2.7.1。您可以使用命令 gcloud composer environments update
更新它。有关详细信息,请参阅 Installing a Python dependency from PyPI。
为了能够顺利更新 Pubsub 库,请在 requirements.txt
中明确定义 Google 库。这是因为 Google 库依赖于其他 Google 库,请参阅 Pubsub library dependencies. You can get the pre installed Google libraries in Cloud Composer pre installed packages 参考。但是,如果您更新了 Google 库,则可以只包含您在 requirements.txt
.
requirements.txt
google-ads==4.0.0
google-api-core==1.26.1
google-api-python-client==1.12.8
google-apitools==0.5.31
google-auth==1.28.0
google-auth-httplib2==0.1.0
google-auth-oauthlib==0.4.3
google-cloud-automl==2.2.0
google-cloud-bigquery==2.13.0
google-cloud-bigquery-datatransfer==3.1.0
google-cloud-bigquery-storage==2.1.0
google-cloud-bigtable==1.7.0
google-cloud-build==2.0.0
google-cloud-container==1.0.1
google-cloud-core==1.6.0
google-cloud-datacatalog==3.1.0
google-cloud-dataproc==2.3.0
google-cloud-datastore==1.15.3
google-cloud-dlp==1.0.0
google-cloud-kms==2.2.0
google-cloud-language==1.3.0
google-cloud-logging==2.2.0
google-cloud-memcache==0.3.0
google-cloud-monitoring==2.0.0
google-cloud-os-login==2.1.0
google-cloud-pubsub==2.7.1
google-cloud-pubsublite==0.3.0
google-cloud-redis==2.1.0
google-cloud-secret-manager==1.0.0
google-cloud-spanner==1.19.1
google-cloud-speech==1.3.2
google-cloud-storage==1.36.2
google-cloud-tasks==2.2.0
google-cloud-texttospeech==1.0.1
google-cloud-translate==1.7.0
google-cloud-videointelligence==1.16.1
google-cloud-vision==1.0.0
google-cloud-workflows==0.2.0
google-crc32c==1.1.2
google-pasta==0.2.0
google-resumable-media==1.2.0
googleapis-common-protos==1.53.0
graphviz==0.16
greenlet==1.0.0
grpc-google-iam-v1==0.12.3
grpcio==1.38.1
grpcio-gcp==0.2.2
更新命令:
gcloud composer environments update your-environrment-name --update-pypi-packages-from-file requirements.txt --location your-composer-location
安装后它将 return 完成:
在 GCP Console-> Composer-> your-environment -> PYPI Packages 中检查版本:
气流测试运行:
气流日志:
使用的DAG:
import datetime
import airflow
from airflow.operators import bash_operator
from airflow.operators import python_operator
import json
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Any, Callable
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': YESTERDAY,
}
def publish_error_handle():
topic_path = 'projects/your-project-id/topics/test-topic'
publish_futures = []
publisher = pubsub_v1.PublisherClient()
def get_callback(
publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
try:
# Wait 60 seconds for the publish call to succeed.
print(publish_future.result(timeout=60))
except futures.TimeoutError:
print(f"Publishing {data} timed out.")
return callback
record = {
'Key1': 'Value1',
'Key2': 'Value2',
'Key3': 'Value3'
}
data = json.dumps(record).encode("utf-8")
# When you publish a message, the client returns a future.
publish_future = publisher.publish(topic_path, data)
# on-blocking. Publish failures are handled in the callback function.
publish_future.add_done_callback(get_callback(publish_future, data))
publish_futures.append(publish_future)
# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
print(f"Published messages with error handler to {topic_path}.")
with airflow.DAG(
'composer_sample_dag',
'catchup=False',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)) as dag:
publish_handle = python_operator.PythonOperator(
task_id='publish_handle',
python_callable=publish_error_handle
)
publish_handle