我可以直接在 Airflow 任务中使用 GCP Client API 吗?
Can I use GCP Client API in Airflow tasks directly?
我现在正在使用 Airflow(GCP Composer)。
我知道它有 GCS 挂钩,我可以下载一些 GCS 文件。
但我想部分读取文件。
我可以在 DAG 中将此 python 逻辑与 PythonOperator 一起使用吗?
from google.cloud import storage
def my_func():
client = storage.Client()
bucket = client.get_bucket("mybucket")
blob = bucket.get_blob("myfile")
data = blob.download_as_bytes(end=100)
return data
在 Airflow 任务中,是否禁止不使用 hooks 的直接客户端 API 调用?
你可以,但一个更 Airflowy 来处理钩子中缺少的功能是扩展钩子:
from airflow.providers.google.cloud.hooks.gcs import GCSHook
class MyGCSHook(GCSHook):
def download_bytes(
self,
bucket_name: str,
object_name: str,
end:str,
) -> bytes:
client = self.get_conn()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name=object_name)
return blob.download_as_bytes(end=end)
然后就可以在PythonOperator或者自定义算子中使用钩子函数了
请注意,GCSHook
具有您提到的 download 功能。
您可能错过的是,如果您不提供文件名,它将以字节形式下载(参见 source code)。它不允许按您的预期配置 end
参数,但如果您希望为 Airflow 开源做出贡献,这应该是对 Airflow PR 的简单修复。
我现在正在使用 Airflow(GCP Composer)。
我知道它有 GCS 挂钩,我可以下载一些 GCS 文件。
但我想部分读取文件。
我可以在 DAG 中将此 python 逻辑与 PythonOperator 一起使用吗?
from google.cloud import storage
def my_func():
client = storage.Client()
bucket = client.get_bucket("mybucket")
blob = bucket.get_blob("myfile")
data = blob.download_as_bytes(end=100)
return data
在 Airflow 任务中,是否禁止不使用 hooks 的直接客户端 API 调用?
你可以,但一个更 Airflowy 来处理钩子中缺少的功能是扩展钩子:
from airflow.providers.google.cloud.hooks.gcs import GCSHook
class MyGCSHook(GCSHook):
def download_bytes(
self,
bucket_name: str,
object_name: str,
end:str,
) -> bytes:
client = self.get_conn()
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name=object_name)
return blob.download_as_bytes(end=end)
然后就可以在PythonOperator或者自定义算子中使用钩子函数了
请注意,GCSHook
具有您提到的 download 功能。
您可能错过的是,如果您不提供文件名,它将以字节形式下载(参见 source code)。它不允许按您的预期配置 end
参数,但如果您希望为 Airflow 开源做出贡献,这应该是对 Airflow PR 的简单修复。