如何 Post JSON 到 Durable Azure Functions (Python)?

我想从 Azure 数据工厂调用 Durable Azure Functions。 我想 post json 到函数并在函数处理完成时获取状态。 我的最终目标是成功完成 运行 需要 10 分钟且没有超时的功能。

我已经使用 GET 方法从 ADF 成功执行了 Azure Function Activity。

现在我需要建议来修改 Orchestrator 的 Python 代码以接受 json 并使用 json 值来过滤处理哪个结果集。 {“国家”:“日本”}

当前代码库来自教程: https://docs.microsoft.com/en-us/azure/azure-functions/durable/quickstart-python-vscode

我正在按照此处的持久功能说明进行操作: http://datanrg.blogspot.com/2020/10/using-durable-functions-in-azure-data.html

# This function an HTTP starter function for Durable Functions.
# Before running this sample, please:
# - create a Durable orchestration function
# - create a Durable activity function (default name is "Hello")
# - add azure-functions-durable to requirements.txt
# - run pip install -r requirements.txt

import logging

import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
instance_id = await client.start_new(req.route_params["functionName"], None, None)

logging.info(f"Started orchestration with ID = '{instance_id}'.")

return client.create_check_status_response(req, instance_id)

#  This function is not intended to be invoked directly. Instead it will be
# triggered by an HTTP starter function.
# Before running this sample, please:
# - create a Durable activity function (default name is "Hello")
# - create a Durable HTTP starter function
# - add azure-functions-durable to requirements.txt
# - run pip install -r requirements.txt

import logging
import json

import azure.functions as func
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
result1 = yield context.call_activity('Hello', "Tokyo")
result2 = yield context.call_activity('Hello', "Seattle")
result3 = yield context.call_activity('Hello', "London")
return [result1, result2, result3]

main = df.Orchestrator.create(orchestrator_function)

# This function is not intended to be invoked directly. Instead it will be
# triggered by an orchestrator function.
# Before running this sample, please:
# - create a Durable orchestration function
# - create a Durable HTTP starter function
# - add azure-functions-durable to requirements.txt
# - run pip install -r requirements.txt

import logging

def main(name: str) -> str:
    return f"Hello {name}!"

import azure.durable_functions as df
import azure.functions as func

async def main(documents: func.DocumentList, starter: str):
    client = df.DurableOrchestrationClient(starter)
    instance_id = await client.start_new('MyDFOrchestrator', {"doc_list": [{doc1}, {doc2}, {doc3}]})
    logging.info(f"Started orchestration ID {instance_id}")

将JSON作为输入值传递给协调器应该没问题。有一个类似的例子 here。虽然示例是使用 http 触发器,但这里的相关区域与您在 starter/triggering 函数中使用的触发器无关。

或者,您可以创建一个包含 model/entity 结构的具体可序列化 class(比原始 json 更清晰)。要创建可序列化的 classes,我们只需要您的 class 导出两个静态方法:to_json() 和 from_json()。 Durable Functions 框架将内部调用这些 classes 来序列化和反序列化您的自定义 class.