如何 Post JSON 到 Durable Azure Functions (Python)?

How to Post JSON to Durable Azure Function (Python)?

我想从 Azure 数据工厂调用 Durable Azure Functions。 我想 post json 到函数并在函数处理完成时获取状态。 我的最终目标是成功完成 运行 需要 10 分钟且没有超时的功能。

我已经使用 GET 方法从 ADF 成功执行了 Azure Function Activity。

现在我需要建议来修改 Orchestrator 的 Python 代码以接受 json 并使用 json 值来过滤处理哪个结果集。 {“国家”:“日本”}

当前代码库来自教程: https://docs.microsoft.com/en-us/azure/azure-functions/durable/quickstart-python-vscode

我正在按照此处的持久功能说明进行操作: http://datanrg.blogspot.com/2020/10/using-durable-functions-in-azure-data.html

# This function an HTTP starter function for Durable Functions.
# Before running this sample, please:
# - create a Durable orchestration function
# - create a Durable activity function (default name is "Hello")
# - add azure-functions-durable to requirements.txt
# - run pip install -r requirements.txt

import logging

import azure.functions as func
import azure.durable_functions as df


async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
instance_id = await client.start_new(req.route_params["functionName"], None, None)

logging.info(f"Started orchestration with ID = '{instance_id}'.")

return client.create_check_status_response(req, instance_id)

#  This function is not intended to be invoked directly. Instead it will be
# triggered by an HTTP starter function.
# Before running this sample, please:
# - create a Durable activity function (default name is "Hello")
# - create a Durable HTTP starter function
# - add azure-functions-durable to requirements.txt
# - run pip install -r requirements.txt

import logging
import json

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
result1 = yield context.call_activity('Hello', "Tokyo")
result2 = yield context.call_activity('Hello', "Seattle")
result3 = yield context.call_activity('Hello', "London")
return [result1, result2, result3]

main = df.Orchestrator.create(orchestrator_function)


# This function is not intended to be invoked directly. Instead it will be
# triggered by an orchestrator function.
# Before running this sample, please:
# - create a Durable orchestration function
# - create a Durable HTTP starter function
# - add azure-functions-durable to requirements.txt
# - run pip install -r requirements.txt

import logging


def main(name: str) -> str:
    return f"Hello {name}!"

Now I need advice to modify Python code of Orchestrator to acceppt json and use json values to filtering which Result set is processed. {"Country": "Japan"}

import azure.durable_functions as df
import azure.functions as func


async def main(documents: func.DocumentList, starter: str):
    client = df.DurableOrchestrationClient(starter)
    instance_id = await client.start_new('MyDFOrchestrator', {"doc_list": [{doc1}, {doc2}, {doc3}]})
    logging.info(f"Started orchestration ID {instance_id}")

将JSON作为输入值传递给协调器应该没问题。有一个类似的例子 here。虽然示例是使用 http 触发器,但这里的相关区域与您在 starter/triggering 函数中使用的触发器无关。

或者,您可以创建一个包含 model/entity 结构的具体可序列化 class(比原始 json 更清晰)。要创建可序列化的 classes,我们只需要您的 class 导出两个静态方法:to_json() 和 from_json()。 Durable Functions 框架将内部调用这些 classes 来序列化和反序列化您的自定义 class.