Cloud Composer 将文件写入其他存储桶问题
Cloud Composer Write file to other bucket Issues
Airflow 新手。我正在尝试将结果保存到另一个存储桶(不是气流存储桶)中的文件中。
我可以保存到“/home/airflow/gcs/data/test.json”中的文件,然后使用 gcs_hook.GoogleCloudStorageHook 复制到另一个存储桶。这是代码:
def write_file_func(**context):
file = f'/home/airflow/gcs/data/test.json'
with open(file, 'w') as f:
f.write(json.dumps('{"name":"aaa", "age":"10"}'))
def upload_file_func(**context):
conn = gcs_hook.GoogleCloudStorageHook()
source_bucket = 'source_bucket'
source_object = 'data/test.json'
target_bucket = 'target_bucket'
target_object = 'test.json'
conn.copy(source_bucket, source_object, target_bucket, target_object)
conn.delete(source_bucket, source_object)
我的问题是:
我们可以直接写入目标存储桶中的文件吗?我没有在 gcs_hook.
中找到任何方法
我试过用google.cloud.storagebucket.blob('test.json').upload_from_string(),但是气流一直说"The DAG isn't available in the server's DAGBag" ,很烦人,难道我们不允许在DAG中使用那个API吗?
如果可以直接使用google.cloud.storage/bigquery API,那和Airflow有什么区别API,比如gcs_hook/bigquery_hook?
谢谢
不,您不能“直接写入目标存储桶中的文件”。要修改存储在 GCS 中的文件,您需要将其下载到本地,进行文件更改,然后将修改后的文件上传回 GCS。有关详细信息,请参阅 [Google 云存储][1] 和 [方法][2]。
我在 Apache Airflow 中成功编译了以下代码。放心使用吧。
import pip
import logging
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
import json
from datetime import datetime
def write_file_func():
file = f'/home/airflow/gcs/data/test.json'
with open(file, 'w') as f:
f.write(json.dumps('{"name":"aaa", "age":"10"}'))
def upload_file_func():
conn = GoogleCloudStorageHook()
source_bucket = 'source_bucket'
source_object = 'data/test.json'
target_bucket = 'target_bucket'
target_object = 'test.json'
conn.copy(source_bucket, source_object, target_bucket, target_object)
#conn.delete(source_bucket, source_object)
with DAG('load_gcs_file', description='DAG', schedule_interval=None, start_date=datetime(2018, 11, 1)) as dag:
create_file = PythonOperator(task_id='create_file', python_callable=write_file_func)
copy_file = PythonOperator(task_id='copy_file', python_callable=upload_file_func)
create_file >> copy_file
注意:-) 请更改 source_bucket 名称值以反映您的源存储桶名称。
-) 请更改 target_bucket 名称值以反映您的目标存储桶名称。
- Airflow 挂钩是外部库(例如 google.cloud.storage)的可重用接口,因此许多不同的操作员可以以一致的方式与这些外部 API 对话。
一个通用的例子是更新外部库时:不需要在每个使用外部库的地方更新代码,只需要更改钩子代码。
Airflow 新手。我正在尝试将结果保存到另一个存储桶(不是气流存储桶)中的文件中。 我可以保存到“/home/airflow/gcs/data/test.json”中的文件,然后使用 gcs_hook.GoogleCloudStorageHook 复制到另一个存储桶。这是代码:
def write_file_func(**context):
file = f'/home/airflow/gcs/data/test.json'
with open(file, 'w') as f:
f.write(json.dumps('{"name":"aaa", "age":"10"}'))
def upload_file_func(**context):
conn = gcs_hook.GoogleCloudStorageHook()
source_bucket = 'source_bucket'
source_object = 'data/test.json'
target_bucket = 'target_bucket'
target_object = 'test.json'
conn.copy(source_bucket, source_object, target_bucket, target_object)
conn.delete(source_bucket, source_object)
我的问题是:
我们可以直接写入目标存储桶中的文件吗?我没有在 gcs_hook.
中找到任何方法
我试过用google.cloud.storagebucket.blob('test.json').upload_from_string(),但是气流一直说"The DAG isn't available in the server's DAGBag" ,很烦人,难道我们不允许在DAG中使用那个API吗?
如果可以直接使用google.cloud.storage/bigquery API,那和Airflow有什么区别API,比如gcs_hook/bigquery_hook?
谢谢
不,您不能“直接写入目标存储桶中的文件”。要修改存储在 GCS 中的文件,您需要将其下载到本地,进行文件更改,然后将修改后的文件上传回 GCS。有关详细信息,请参阅 [Google 云存储][1] 和 [方法][2]。
我在 Apache Airflow 中成功编译了以下代码。放心使用吧。
import pip
import logging
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
import json
from datetime import datetime
def write_file_func():
file = f'/home/airflow/gcs/data/test.json'
with open(file, 'w') as f:
f.write(json.dumps('{"name":"aaa", "age":"10"}'))
def upload_file_func():
conn = GoogleCloudStorageHook()
source_bucket = 'source_bucket'
source_object = 'data/test.json'
target_bucket = 'target_bucket'
target_object = 'test.json'
conn.copy(source_bucket, source_object, target_bucket, target_object)
#conn.delete(source_bucket, source_object)
with DAG('load_gcs_file', description='DAG', schedule_interval=None, start_date=datetime(2018, 11, 1)) as dag:
create_file = PythonOperator(task_id='create_file', python_callable=write_file_func)
copy_file = PythonOperator(task_id='copy_file', python_callable=upload_file_func)
create_file >> copy_file
注意:-) 请更改 source_bucket 名称值以反映您的源存储桶名称。 -) 请更改 target_bucket 名称值以反映您的目标存储桶名称。
- Airflow 挂钩是外部库(例如 google.cloud.storage)的可重用接口,因此许多不同的操作员可以以一致的方式与这些外部 API 对话。
一个通用的例子是更新外部库时:不需要在每个使用外部库的地方更新代码,只需要更改钩子代码。