使用 Google Cloud Composer 将 excel 工作簿写入 Google Cloud Storage 存储桶
Writing excel work books to Google Cloud Storage bucket using Google Cloud Composer
我有一个要求,我必须用 2 个不同的工作簿创建 excel 工作簿 (.xlsx)。但是将数据存储到 GCS 存储桶时,出现找不到文件的错误。我能够成功保存 .csv 文件。请找到下面的例子
import pandas as pd
a = [1, 2, 3]
b = [4, 5, 6]
af1 = pd.DataFrame(a)
bf1 = pd.DataFrame(b)
af1.columns = ['A']
bf1.columns = ['B']
with pd.ExcelWriter('gs://<bucket-name>/output.xlsx') as writer:
af1.to_excel(writer, sheet_name="A", index=False)
bf1.to_excel(writer, sheet_name="B", index=False)
但是找不到文件。而如果我尝试写入 csv 文件(使用 .to_csv("samepath")),我能够看到该文件。请帮助
您正在尝试不使用 Google Cloud Storage API Client Libraries 直接访问存储桶。这不是推荐的方法。因此,请尝试使用 Google 云存储 API 客户端库并按照以下步骤满足您的要求:
第 1 步:在触发 DAG 之前在 Cloud Composer 中添加 xlsxwriter 包:
环境详细信息 -> PYPI 包 -> 编辑 -> 包名称 -> 输入 xlsxwriter
-> 点击保存
第 2 步:尝试以下代码:
import airflow
from airflow import DAG
from airflow.utils import timezone
from airflow.operators.python import PythonOperator
from google.cloud import storage
import pandas as pd
from xlsxwriter import Workbook
def invoke_cloud_storage():
a = [1, 2, 3]
b = [4, 5, 6]
af1 = pd.DataFrame(a)
bf1 = pd.DataFrame(b)
af1.columns = ['A']
bf1.columns = ['B']
writer=pd.ExcelWriter('file-name.xlsx')
af1.to_excel(writer, sheet_name="A", index=False)
bf1.to_excel(writer, sheet_name="B", index=False)
writer.save()
storage_client = storage.Client()
bucket = storage_client.bucket('bucket-name')
blob = bucket.blob('file-name.xlsx')
blob.upload_from_filename('file-name.xlsx')
with DAG(
'pandas_storage',
description='Upload file in Cloud Storage',
schedule_interval=None,
start_date=airflow.utils.dates.days_ago(2),
max_active_runs=1,
catchup=False
) as dag:
# Invoke cloud run
process_file = PythonOperator(
task_id='invoke_cloud_storage',
python_callable=invoke_cloud_storage,
dag=dag
)
process_file
如果您仍然需要在不使用 Google 云存储 API 客户端库的情况下访问存储桶,请在 Cloud Composer 中添加 gcsfs and fsspec 库作为依赖项。但是这两个库不是由 Google 管理的,这不是推荐的方法,使用它需要您自担风险。按照以下步骤满足您的要求:
第 1 步:在触发 DAG 之前在 Cloud Composer 中添加 xlsxwriter
、gcsfs
和 fsspec
包:
环境详细信息 -> PYPI 包 -> 编辑 -> 添加包 -> 点击保存。
第 2 步:尝试以下代码:
import airflow
from airflow import DAG
from airflow.utils import timezone
from airflow.operators.python import PythonOperator
import pandas as pd
from xlsxwriter import Workbook
def invoke_cloud_storage():
a = [1, 2, 3]
b = [4, 5, 6]
af1 = pd.DataFrame(a)
bf1 = pd.DataFrame(b)
af1.columns = ['A']
bf1.columns = ['B']
with pd.ExcelWriter('gs://bucket-name/file-name.xlsx') as writer:
af1.to_excel(writer, sheet_name="A", index=False)
bf1.to_excel(writer, sheet_name="B", index=False)
with DAG(
'pandas_storage_nr',
description='Upload file in Cloud Storage',
schedule_interval=None,
start_date=airflow.utils.dates.days_ago(2),
max_active_runs=1,
catchup=False
) as dag:
# Invoke cloud run
process_file = PythonOperator(
task_id='invoke_cloud_storage',
python_callable=invoke_cloud_storage,
dag=dag
)
process_file
我有一个要求,我必须用 2 个不同的工作簿创建 excel 工作簿 (.xlsx)。但是将数据存储到 GCS 存储桶时,出现找不到文件的错误。我能够成功保存 .csv 文件。请找到下面的例子
import pandas as pd
a = [1, 2, 3]
b = [4, 5, 6]
af1 = pd.DataFrame(a)
bf1 = pd.DataFrame(b)
af1.columns = ['A']
bf1.columns = ['B']
with pd.ExcelWriter('gs://<bucket-name>/output.xlsx') as writer:
af1.to_excel(writer, sheet_name="A", index=False)
bf1.to_excel(writer, sheet_name="B", index=False)
但是找不到文件。而如果我尝试写入 csv 文件(使用 .to_csv("samepath")),我能够看到该文件。请帮助
您正在尝试不使用 Google Cloud Storage API Client Libraries 直接访问存储桶。这不是推荐的方法。因此,请尝试使用 Google 云存储 API 客户端库并按照以下步骤满足您的要求:
第 1 步:在触发 DAG 之前在 Cloud Composer 中添加 xlsxwriter 包:
环境详细信息 -> PYPI 包 -> 编辑 -> 包名称 -> 输入 xlsxwriter
-> 点击保存
第 2 步:尝试以下代码:
import airflow
from airflow import DAG
from airflow.utils import timezone
from airflow.operators.python import PythonOperator
from google.cloud import storage
import pandas as pd
from xlsxwriter import Workbook
def invoke_cloud_storage():
a = [1, 2, 3]
b = [4, 5, 6]
af1 = pd.DataFrame(a)
bf1 = pd.DataFrame(b)
af1.columns = ['A']
bf1.columns = ['B']
writer=pd.ExcelWriter('file-name.xlsx')
af1.to_excel(writer, sheet_name="A", index=False)
bf1.to_excel(writer, sheet_name="B", index=False)
writer.save()
storage_client = storage.Client()
bucket = storage_client.bucket('bucket-name')
blob = bucket.blob('file-name.xlsx')
blob.upload_from_filename('file-name.xlsx')
with DAG(
'pandas_storage',
description='Upload file in Cloud Storage',
schedule_interval=None,
start_date=airflow.utils.dates.days_ago(2),
max_active_runs=1,
catchup=False
) as dag:
# Invoke cloud run
process_file = PythonOperator(
task_id='invoke_cloud_storage',
python_callable=invoke_cloud_storage,
dag=dag
)
process_file
如果您仍然需要在不使用 Google 云存储 API 客户端库的情况下访问存储桶,请在 Cloud Composer 中添加 gcsfs and fsspec 库作为依赖项。但是这两个库不是由 Google 管理的,这不是推荐的方法,使用它需要您自担风险。按照以下步骤满足您的要求:
第 1 步:在触发 DAG 之前在 Cloud Composer 中添加 xlsxwriter
、gcsfs
和 fsspec
包:
环境详细信息 -> PYPI 包 -> 编辑 -> 添加包 -> 点击保存。
第 2 步:尝试以下代码:
import airflow
from airflow import DAG
from airflow.utils import timezone
from airflow.operators.python import PythonOperator
import pandas as pd
from xlsxwriter import Workbook
def invoke_cloud_storage():
a = [1, 2, 3]
b = [4, 5, 6]
af1 = pd.DataFrame(a)
bf1 = pd.DataFrame(b)
af1.columns = ['A']
bf1.columns = ['B']
with pd.ExcelWriter('gs://bucket-name/file-name.xlsx') as writer:
af1.to_excel(writer, sheet_name="A", index=False)
bf1.to_excel(writer, sheet_name="B", index=False)
with DAG(
'pandas_storage_nr',
description='Upload file in Cloud Storage',
schedule_interval=None,
start_date=airflow.utils.dates.days_ago(2),
max_active_runs=1,
catchup=False
) as dag:
# Invoke cloud run
process_file = PythonOperator(
task_id='invoke_cloud_storage',
python_callable=invoke_cloud_storage,
dag=dag
)
process_file