使用 Python 在 Azure 存储 blob 中创建 PDF 文件的最佳方法是什么?

What is the best way to create PDF files in Azure storage blob using Python?

我是 Python 的新手,我提出了使用 Python 脚本在 SQL 服务器中使用可用数据创建 PDF 文件的要求。正如我所研究的,有许多库可用于此目的,但它们的大部分方法是生成 HTML 字符串,然后将其转换为本地目录中的 PDF 文件。但是 none 他们建议如何在 Azure 存储上执行此操作。
我有一个连接到数据库并读取数据的 Azure 函数应用程序,现在我需要使用此数据创建 PDF。由于 Azure 函数应用程序是无服务器源,根本无法在物理目录上进行中继。
所以我应该能够创建一个包含数据的字符串并将它们转换为 PDF 并直接上传到 Azre 存储。

import logging

import azure.functions as func
from fpdf import FPDF

def generate_PDF():
    pdf = FPDF()
    pdf.add_page()
    pdf.set_font("Arial", 12)
    pdf.cell(w=0,h=0,txt="This is sample pdf",align="L")
    pdf.output('demo.pdf')

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    name = req.params.get('name')
    if not name:
        try:
            req_body = req.get_json()
        except ValueError:
            pass
        else:
            name = req_body.get('name')
            c_pdf = req_body.get('cpdf')
            if c_pdf == 'Y':
                generate_PDF()

    if name:
        return func.HttpResponse(f"Hello {name}!, create PDF (cpdf) is set to {c_pdf}")
    else:
        return func.HttpResponse("Please pass a name on the query string or in the request body", status_code=400)

我正在使用 FPDF 库,它会在当前工作目录中创建 pdf 文件。
为我的方法建议最好的方法。
在此先感谢。

问题请参考以下代码

SDK

pip install azure-storage-blob fpdf aiohttp

代码

from azure.storage.blob import ContentSettings
from azure.storage.blob.aio import BlobServiceClient
from fpdf import FPDF


async def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')
    connection_string = 'DefaultEndpointsProtocol=https;AccountName=jimtestdiag924;AccountKey=uxz4AtF0A4tWBcPHwgbFAfdinvLEZpJtAu1MYVWD/xYCYDcLLRb8Zhp5lxR2/2rQ2P1OrxZwWarEoWyDSZ7Q+A==;EndpointSuffix=core.windows.net'
    pdf = FPDF()
    pdf.add_page()
    pdf.set_font('Arial', 'B', 16)
    pdf.cell(40, 10, 'Hello World!')
    s = pdf.output(dest='S').encode('latin-1')
    logging.info(s)
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)
    async with blob_service_client:
            container_client = blob_service_client.get_container_client('testupload')
            try:
                # Create new Container in the Service
                await container_client.create_container()
            except Exception as ex:
                pass
            # Get a new BlobClient
            blob_client = container_client.get_blob_client('demo.pdf')
            await blob_client.upload_blob(s, blob_type="BlockBlob",content_settings=ContentSettings( content_type='application/pdf'))
            



    name = req.params.get('name')
    if not name:
        try:
            req_body = req.get_json()
        except ValueError:
            pass
        else:
            name = req_body.get('name')

    if name:
        return func.HttpResponse(f"Hello {name}!")
    else:
        return func.HttpResponse(
             "Please pass a name on the query string or in the request body",
             status_code=400
        )