如何将 API 响应写入 Azure Blob 存储
How to write API response to Azure Blobstorage
我正在开发一个 Azure 函数 blob 触发器以从 blob 中的 CSV 中读取一列并将列值传递给 API 请求以获得每个值的 JSON 响应,我想写每个在新的 .JSON 文件中响应 Azure blob 存储。
这是我的代码
import logging
import azure.functions as func
from azure.storage.blob import BlobServiceClient,BlobClient
import pandas as pd
import requests
import json
import os, io
def main(inputBlob: func.InputStream, outputBlob: func.Out[str]):
logging.info(f"Blob trigger executed!")
logging.info(f"Blob Name: {inputBlob.name} ({inputBlob.length}) bytes")
logging.info(f"Full Blob URI: {inputBlob.uri}")
# connections
connection_string = "DefaultEndpointsProtocol=https;AccountName=nhtsa;AccountKey=*********==;EndpointSuffix=core.windows.net"
containerName = "myblobcontainer"
blobName = "input/NHTSA_Inbound.csv"
out_blob = "output"
blob = BlobClient.from_connection_string(conn_str=connection_string, container_name=containerName, blob_name=blobName)
# reading csv column values and pass into request to get each value response
blobStream = blob.download_blob().content_as_bytes()
logging.info(blobStream)
df = pd.read_csv(io.BytesIO(blobStream), sep=',', dtype=str)
vin_df = pd.DataFrame(df['vin'])
vin_tup = list(vin_df.to_records(index=False))
for i,t in enumerate(vin_tup):
nhtsa_url = 'https://vpic.nhtsa.dot.gov/api/vehicles/DecodeVinValuesExtended/%s?format=json&modelyear=2011'%t[0]
nhtsa_req_content = requests.get(nhtsa_url)
nhtsa_data = nhtsa_req_content.json()
with open(os.path.join(outputBlob,'Veh_%s.json'%t[0]),"w+") as output_file:
json.dump(list(list(nhtsa_data.values())[0]),output_file)
除文件写入部分外,一切正常。我正在我的 VS 代码中调试函数,我的断点在这个位置抛出错误
with open(os.path.join(outputBlob,'Veh_%s.json'%t[0]),"w+") 作为 output_file:
json.dump(列表(列表(nhtsa_data.values())[0]),output_file)
error
Executed 'Functions.BlobTrigger1' (Failed, Id=8c1dd2bb-7fb9-4670-9f44-4fb290530325, Duration=34741ms)
[2021-05-15T15:51:01.978Z] System.Private.CoreLib: Exception while executing function: Functions.BlobTrigger1. System.Private.CoreLib: Result: Failure
Exception: TypeError: expected str, bytes or os.PathLike object, not Out
Stack: File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python.7/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 372, in _handle__invocation_request
self.__run_sync_func, invocation_id, fi.func, args)
File "C:\Program Files\Python37\lib\concurrent\futures\thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python.7/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 548, in __run_sync_func
return func(**params)
File "C:\Users\yyy1u39\Desktop\Campaign Group Dashboard\Portfolio\Azure Data Factory\BlobTrigger1\__init__.py", line 35, in main
with open(os.path.join(outputBlob,'Veh_%s.json'%t[0]),"w+") as output_file:
File "C:\Program Files\Python37\lib\ntpath.py", line 76, in join
path = os.fspath(path)
.
您可以使用 Azure 存储 blob SDK:https://docs.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-python
建议使用Azure存储blob SDK上传多个请求结果,而不是Azure函数blob输出绑定。
只需尝试以下代码:
import requests
from azure.storage.blob import ContainerClient
#skip reading csv
#Call demo API and save result individually
requestURLs =['https://reqres.in/api/users?page=1','https://reqres.in/api/users?page=2']
storageConnStr = ''
containerName =''
container_client = ContainerClient.from_connection_string(storageConnStr,containerName)
count = 0
for req in requestURLs:
result = requests.get(req).text
container_client.get_blob_client('output/req'+ str(count) + '.json').upload_blob(result)
count+=1
结果:
如果您还有其他问题,请告诉我。
我正在开发一个 Azure 函数 blob 触发器以从 blob 中的 CSV 中读取一列并将列值传递给 API 请求以获得每个值的 JSON 响应,我想写每个在新的 .JSON 文件中响应 Azure blob 存储。
这是我的代码
import logging
import azure.functions as func
from azure.storage.blob import BlobServiceClient,BlobClient
import pandas as pd
import requests
import json
import os, io
def main(inputBlob: func.InputStream, outputBlob: func.Out[str]):
logging.info(f"Blob trigger executed!")
logging.info(f"Blob Name: {inputBlob.name} ({inputBlob.length}) bytes")
logging.info(f"Full Blob URI: {inputBlob.uri}")
# connections
connection_string = "DefaultEndpointsProtocol=https;AccountName=nhtsa;AccountKey=*********==;EndpointSuffix=core.windows.net"
containerName = "myblobcontainer"
blobName = "input/NHTSA_Inbound.csv"
out_blob = "output"
blob = BlobClient.from_connection_string(conn_str=connection_string, container_name=containerName, blob_name=blobName)
# reading csv column values and pass into request to get each value response
blobStream = blob.download_blob().content_as_bytes()
logging.info(blobStream)
df = pd.read_csv(io.BytesIO(blobStream), sep=',', dtype=str)
vin_df = pd.DataFrame(df['vin'])
vin_tup = list(vin_df.to_records(index=False))
for i,t in enumerate(vin_tup):
nhtsa_url = 'https://vpic.nhtsa.dot.gov/api/vehicles/DecodeVinValuesExtended/%s?format=json&modelyear=2011'%t[0]
nhtsa_req_content = requests.get(nhtsa_url)
nhtsa_data = nhtsa_req_content.json()
with open(os.path.join(outputBlob,'Veh_%s.json'%t[0]),"w+") as output_file:
json.dump(list(list(nhtsa_data.values())[0]),output_file)
除文件写入部分外,一切正常。我正在我的 VS 代码中调试函数,我的断点在这个位置抛出错误
with open(os.path.join(outputBlob,'Veh_%s.json'%t[0]),"w+") 作为 output_file: json.dump(列表(列表(nhtsa_data.values())[0]),output_file)
error
Executed 'Functions.BlobTrigger1' (Failed, Id=8c1dd2bb-7fb9-4670-9f44-4fb290530325, Duration=34741ms)
[2021-05-15T15:51:01.978Z] System.Private.CoreLib: Exception while executing function: Functions.BlobTrigger1. System.Private.CoreLib: Result: Failure
Exception: TypeError: expected str, bytes or os.PathLike object, not Out
Stack: File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python.7/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 372, in _handle__invocation_request
self.__run_sync_func, invocation_id, fi.func, args)
File "C:\Program Files\Python37\lib\concurrent\futures\thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python.7/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 548, in __run_sync_func
return func(**params)
File "C:\Users\yyy1u39\Desktop\Campaign Group Dashboard\Portfolio\Azure Data Factory\BlobTrigger1\__init__.py", line 35, in main
with open(os.path.join(outputBlob,'Veh_%s.json'%t[0]),"w+") as output_file:
File "C:\Program Files\Python37\lib\ntpath.py", line 76, in join
path = os.fspath(path)
.
您可以使用 Azure 存储 blob SDK:https://docs.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-python
建议使用Azure存储blob SDK上传多个请求结果,而不是Azure函数blob输出绑定。
只需尝试以下代码:
import requests
from azure.storage.blob import ContainerClient
#skip reading csv
#Call demo API and save result individually
requestURLs =['https://reqres.in/api/users?page=1','https://reqres.in/api/users?page=2']
storageConnStr = ''
containerName =''
container_client = ContainerClient.from_connection_string(storageConnStr,containerName)
count = 0
for req in requestURLs:
result = requests.get(req).text
container_client.get_blob_client('output/req'+ str(count) + '.json').upload_blob(result)
count+=1
结果:
如果您还有其他问题,请告诉我。