如何 post json 到 Durable Azure Functions (Python)?
How to post json to Durable Azure Functions (Python)?
我已经根据以下教程使用了 Durable Function。我还没有修改代码。
https://docs.microsoft.com/en-us/azure/azure-functions/durable/quickstart-python-vscode
如何将 json 文件发送到 Activity 函数。
出于调试目的,如何在 Activity 函数中为 json 执行 logging.info。
此函数是 Durable Functions 的 HTTP 启动函数。
import logging
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
instance_id = await client.start_new(req.route_params["functionName"], None, None)
logging.info(f"Started orchestration (Ken) with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
此函数是编排函数
import logging
import json
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
logging.info(f"CalcOrc")
result1 = yield context.call_activity('CalculateActivity', "Tokyo")
result2 = yield context.call_activity('CalculateActivity', "Seattle")
result3 = yield context.call_activity('CalculateActivity', "London")
return [result1, result2, result3]
main = df.Orchestrator.create(orchestrator_function)
这个函数是一个Activity函数
import logging
def main(name: str) -> str:
logging.info(f"CalcAct") # Could log contents of json sent by HTTP Post
return f"Hello {name}!"
此修改版本具有HTTP启动器功能。它适用于 Get 但不适用于 Post.
import logging
import json
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) ->
func.HttpResponse:
#Added for testing
jsoninput = req.params.get('jsoninput')
client = df.DurableOrchestrationClient(starter)
#instance_id = await
client.start_new(req.route_params["functionName"], None, None)
instance_id = await
client.start_new(req.route_params["functionName"], jsoninput, None)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
logging.info(f"jsonInput = '{jsoninput}'.")
return client.create_check_status_response(req, instance_id)
以下代码将帮助您将 json 对象数据传递给 activity 函数:
Activity 函数:
def main(req: func.HttpRequest) -> func.HttpResponse:
req_body = req.get_json()
return func.HttpResponse(f"description is {req_body.get('description')}")
我已经根据以下教程使用了 Durable Function。我还没有修改代码。 https://docs.microsoft.com/en-us/azure/azure-functions/durable/quickstart-python-vscode
如何将 json 文件发送到 Activity 函数。 出于调试目的,如何在 Activity 函数中为 json 执行 logging.info。
此函数是 Durable Functions 的 HTTP 启动函数。
import logging
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
instance_id = await client.start_new(req.route_params["functionName"], None, None)
logging.info(f"Started orchestration (Ken) with ID = '{instance_id}'.")
return client.create_check_status_response(req, instance_id)
此函数是编排函数
import logging
import json
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
logging.info(f"CalcOrc")
result1 = yield context.call_activity('CalculateActivity', "Tokyo")
result2 = yield context.call_activity('CalculateActivity', "Seattle")
result3 = yield context.call_activity('CalculateActivity', "London")
return [result1, result2, result3]
main = df.Orchestrator.create(orchestrator_function)
这个函数是一个Activity函数
import logging
def main(name: str) -> str:
logging.info(f"CalcAct") # Could log contents of json sent by HTTP Post
return f"Hello {name}!"
此修改版本具有HTTP启动器功能。它适用于 Get 但不适用于 Post.
import logging
import json
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) ->
func.HttpResponse:
#Added for testing
jsoninput = req.params.get('jsoninput')
client = df.DurableOrchestrationClient(starter)
#instance_id = await
client.start_new(req.route_params["functionName"], None, None)
instance_id = await
client.start_new(req.route_params["functionName"], jsoninput, None)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
logging.info(f"jsonInput = '{jsoninput}'.")
return client.create_check_status_response(req, instance_id)
以下代码将帮助您将 json 对象数据传递给 activity 函数:
Activity 函数:
def main(req: func.HttpRequest) -> func.HttpResponse:
req_body = req.get_json()
return func.HttpResponse(f"description is {req_body.get('description')}")