我可以直接在 Airflow 任务中使用 GCP Client API 吗?

Can I use GCP Client API in Airflow tasks directly?

我现在正在使用 Airflow(GCP Composer)。

我知道它有 GCS 挂钩,我可以下载一些 GCS 文件。

但我想部分读取文件。

我可以在 DAG 中将此 python 逻辑与 PythonOperator 一起使用吗?

from google.cloud import storage

def my_func():
    client = storage.Client()
    bucket = client.get_bucket("mybucket")
    blob = bucket.get_blob("myfile")
    data = blob.download_as_bytes(end=100)
    return data

在 Airflow 任务中,是否禁止不使用 hooks 的直接客户端 API 调用?

你可以,但一个更 Airflowy 来处理钩子中缺少的功能是扩展钩子:

from airflow.providers.google.cloud.hooks.gcs import GCSHook
class MyGCSHook(GCSHook):

    def download_bytes(
        self,
        bucket_name: str,
        object_name: str,
        end:str,
    ) -> bytes:
        client = self.get_conn()
        bucket = client.bucket(bucket_name)
        blob = bucket.blob(blob_name=object_name)
        return blob.download_as_bytes(end=end)

然后就可以在PythonOperator或者自定义算子中使用钩子函数了

请注意,GCSHook 具有您提到的 download 功能。 您可能错过的是,如果您不提供文件名,它将以字节形式下载(参见 source code)。它不允许按您的预期配置 end 参数,但如果您希望为 Airflow 开源做出贡献,这应该是对 Airflow PR 的简单修复。