如何将 API 响应写入 Azure Blob 存储

How to write API response to Azure Blobstorage

我正在开发一个 Azure 函数 blob 触发器以从 blob 中的 CSV 中读取一列并将列值传递给 API 请求以获得每个值的 JSON 响应,我想写每个在新的 .JSON 文件中响应 Azure blob 存储。

这是我的代码

    import logging
    import azure.functions as func
    from azure.storage.blob import BlobServiceClient,BlobClient
    import pandas as pd
    import requests
    import json
    import os, io
    
    
    def main(inputBlob: func.InputStream, outputBlob: func.Out[str]):
        logging.info(f"Blob trigger executed!")
        logging.info(f"Blob Name: {inputBlob.name} ({inputBlob.length}) bytes")
        logging.info(f"Full Blob URI: {inputBlob.uri}")
    
    
        # connections
        connection_string  = "DefaultEndpointsProtocol=https;AccountName=nhtsa;AccountKey=*********==;EndpointSuffix=core.windows.net"
        containerName = "myblobcontainer"
        blobName = "input/NHTSA_Inbound.csv"
        out_blob = "output"
        blob = BlobClient.from_connection_string(conn_str=connection_string, container_name=containerName, blob_name=blobName)
      
        # reading csv column values and pass into   request to get each value response 

        blobStream = blob.download_blob().content_as_bytes()
        logging.info(blobStream)
        df = pd.read_csv(io.BytesIO(blobStream), sep=',', dtype=str)
        vin_df = pd.DataFrame(df['vin'])
        vin_tup = list(vin_df.to_records(index=False))
        for i,t in enumerate(vin_tup):
            nhtsa_url = 'https://vpic.nhtsa.dot.gov/api/vehicles/DecodeVinValuesExtended/%s?format=json&modelyear=2011'%t[0]
            nhtsa_req_content = requests.get(nhtsa_url)
            nhtsa_data = nhtsa_req_content.json()
            with open(os.path.join(outputBlob,'Veh_%s.json'%t[0]),"w+") as output_file:
                json.dump(list(list(nhtsa_data.values())[0]),output_file)

除文件写入部分外,一切正常。我正在我的 VS 代码中调试函数,我的断点在这个位置抛出错误

with open(os.path.join(outputBlob,'Veh_%s.json'%t[0]),"w+") 作为 output_file: json.dump(列表(列表(nhtsa_data.values())[0]),output_file)

error
Executed 'Functions.BlobTrigger1' (Failed, Id=8c1dd2bb-7fb9-4670-9f44-4fb290530325, Duration=34741ms)
[2021-05-15T15:51:01.978Z] System.Private.CoreLib: Exception while executing function: Functions.BlobTrigger1. System.Private.CoreLib: Result: Failure
Exception: TypeError: expected str, bytes or os.PathLike object, not Out
Stack:   File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python.7/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 372, in _handle__invocation_request
    self.__run_sync_func, invocation_id, fi.func, args)
  File "C:\Program Files\Python37\lib\concurrent\futures\thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python.7/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 548, in __run_sync_func
    return func(**params)
  File "C:\Users\yyy1u39\Desktop\Campaign Group Dashboard\Portfolio\Azure Data Factory\BlobTrigger1\__init__.py", line 35, in main
    with open(os.path.join(outputBlob,'Veh_%s.json'%t[0]),"w+") as output_file:
  File "C:\Program Files\Python37\lib\ntpath.py", line 76, in join
    path = os.fspath(path)
.

 

您可以使用 Azure 存储 blob SDK:https://docs.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-python

建议使用Azure存储blob SDK上传多个请求结果,而不是Azure函数blob输出绑定。

只需尝试以下代码:

import requests
from azure.storage.blob import ContainerClient

#skip reading csv 

#Call demo API and save result individually 
requestURLs =['https://reqres.in/api/users?page=1','https://reqres.in/api/users?page=2']
storageConnStr = ''
containerName =''
container_client = ContainerClient.from_connection_string(storageConnStr,containerName)
count = 0

for req in requestURLs:
    result = requests.get(req).text
    container_client.get_blob_client('output/req'+ str(count) + '.json').upload_blob(result)
    count+=1

结果:

如果您还有其他问题,请告诉我。