Python 中的 Azure Durable Functions:为什么未触发函数 2?
Azure Durable Functions in Python: why is function 2 not triggered?
我的用例如下:
- F1:生成一些数据并将它们写入 CosmosDB(使用时间触发器)
- F2:读取刚刚写入的数据并添加用户名
- Orchestrator:控制工作流,F1完成后调用F2
我的问题:只有F1能用,F2根本不触发。为什么? F1 是否必须 return 触发器或其他东西?
这就是我知道只有 F1 被执行的方式:
F1
import logging
import hashlib
import time
import datetime
from azure.cosmos import CosmosClient
import azure.functions as func
def generate_id(string=None, length=10):
'''This function generates a hash id to be attached to each new row'''
ts = time.time()
guid = hashlib.shake_128((str(string) + str(ts)).encode()).hexdigest(10)
return guid
def main(mytimer: func.TimerRequest, outputDocument: func.Out[func.Document]) -> None:
utc_timestamp = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()
if mytimer.past_due:
logging.info('The timer is past due')
logging.info('Python timer trigger function ran at %s', utc_timestamp)
result1 = {
"first_letter": "A",
"second_letter": "B",
"third_letter": "C",
"score": 0.001,
}
result1['id'] = generate_id()
outputDocument.set(func.Document.from_dict(result1))
return
F1function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "mytimer",
"type": "timerTrigger",
"direction": "in",
"schedule": "*/30 * * * * *"
},
{
"type": "cosmosDB",
"direction": "out",
"name": "outputDocument",
"databaseName": myCosmosDB,
"collectionName": myContainer,
"createIfNotExists": "true",
"connectionStringSetting": myConnString,
"partitionKey": "id"
}
]
}
F2
import logging
import azure.functions as func
from azure.cosmos import CosmosClient
def add_username(string=None):
'''Generate username'''
name = "MyName"
surname = "MySurname"
username = name+" "+surname
return username
def main(F1activitytrigger, inputDocument: func.DocumentList) -> str:
if inputDocument:
logging.info('Document id: %s', inputDocument[0]['id'])
result2 = inputDocument[0].data
result2['username'] = add_username()
return result2
F2function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "F1activitytrigger",
"type": "activityTrigger",
"direction": "in"
},
{
"type": "cosmosDB",
"direction": "in",
"name": "inputDocument",
"databaseName": myCosmosDB,
"collectionName": myContainer,
"createIfNotExists": "true",
"connectionStringSetting": myConnString,
"partitionKey": "id"
}
]
}
管弦乐队
import logging
import json
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
result1 = yield context.call_activity('Test-F1')
result2 = yield context.call_activity('Test-F2')
return [result1, result2]
main = df.Orchestrator.create(orchestrator_function)
管弦乐队 function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}
根据你的描述,F1是一个定时触发的函数(类型是'timerTrigger')。
F1 不需要 Orchestrator 调用,但 F2 需要。
需要在Orchestrator中调用F2,因为F2的类型是'activityTrigger'。
因此,您的持久函数应如下所示:
F1
import logging
import azure.functions as func
import azure.durable_functions as df
async def main(mytimer: func.TimerRequest, starter: str) -> None:
client = df.DurableOrchestrationClient(starter)
instance_id = await client.start_new("YourOrchestratorName", None, None)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "mytimer",
"type": "timerTrigger",
"direction": "in",
"schedule": "* * * * * *"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
]
}
YourOrchestratorName
import logging
import json
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
result1 = yield context.call_activity('F2', "Tokyo")
result2 = yield context.call_activity('F2', "Seattle")
result3 = yield context.call_activity('F2', "London")
return [result1, result2, result3]
main = df.Orchestrator.create(orchestrator_function)
function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}
F2
import logging
def main(name: str) -> str:
logging.info(f"Hello {name}!")
return f"Hello {name}!"
function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "name",
"type": "activityTrigger",
"direction": "in"
}
]
}
我的用例如下:
- F1:生成一些数据并将它们写入 CosmosDB(使用时间触发器)
- F2:读取刚刚写入的数据并添加用户名
- Orchestrator:控制工作流,F1完成后调用F2
我的问题:只有F1能用,F2根本不触发。为什么? F1 是否必须 return 触发器或其他东西?
这就是我知道只有 F1 被执行的方式:
F1
import logging
import hashlib
import time
import datetime
from azure.cosmos import CosmosClient
import azure.functions as func
def generate_id(string=None, length=10):
'''This function generates a hash id to be attached to each new row'''
ts = time.time()
guid = hashlib.shake_128((str(string) + str(ts)).encode()).hexdigest(10)
return guid
def main(mytimer: func.TimerRequest, outputDocument: func.Out[func.Document]) -> None:
utc_timestamp = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()
if mytimer.past_due:
logging.info('The timer is past due')
logging.info('Python timer trigger function ran at %s', utc_timestamp)
result1 = {
"first_letter": "A",
"second_letter": "B",
"third_letter": "C",
"score": 0.001,
}
result1['id'] = generate_id()
outputDocument.set(func.Document.from_dict(result1))
return
F1function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "mytimer",
"type": "timerTrigger",
"direction": "in",
"schedule": "*/30 * * * * *"
},
{
"type": "cosmosDB",
"direction": "out",
"name": "outputDocument",
"databaseName": myCosmosDB,
"collectionName": myContainer,
"createIfNotExists": "true",
"connectionStringSetting": myConnString,
"partitionKey": "id"
}
]
}
F2
import logging
import azure.functions as func
from azure.cosmos import CosmosClient
def add_username(string=None):
'''Generate username'''
name = "MyName"
surname = "MySurname"
username = name+" "+surname
return username
def main(F1activitytrigger, inputDocument: func.DocumentList) -> str:
if inputDocument:
logging.info('Document id: %s', inputDocument[0]['id'])
result2 = inputDocument[0].data
result2['username'] = add_username()
return result2
F2function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "F1activitytrigger",
"type": "activityTrigger",
"direction": "in"
},
{
"type": "cosmosDB",
"direction": "in",
"name": "inputDocument",
"databaseName": myCosmosDB,
"collectionName": myContainer,
"createIfNotExists": "true",
"connectionStringSetting": myConnString,
"partitionKey": "id"
}
]
}
管弦乐队
import logging
import json
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
result1 = yield context.call_activity('Test-F1')
result2 = yield context.call_activity('Test-F2')
return [result1, result2]
main = df.Orchestrator.create(orchestrator_function)
管弦乐队 function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}
根据你的描述,F1是一个定时触发的函数(类型是'timerTrigger')。
F1 不需要 Orchestrator 调用,但 F2 需要。
需要在Orchestrator中调用F2,因为F2的类型是'activityTrigger'。
因此,您的持久函数应如下所示:
F1
import logging
import azure.functions as func
import azure.durable_functions as df
async def main(mytimer: func.TimerRequest, starter: str) -> None:
client = df.DurableOrchestrationClient(starter)
instance_id = await client.start_new("YourOrchestratorName", None, None)
logging.info(f"Started orchestration with ID = '{instance_id}'.")
function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "mytimer",
"type": "timerTrigger",
"direction": "in",
"schedule": "* * * * * *"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
]
}
YourOrchestratorName
import logging
import json
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
result1 = yield context.call_activity('F2', "Tokyo")
result2 = yield context.call_activity('F2', "Seattle")
result3 = yield context.call_activity('F2', "London")
return [result1, result2, result3]
main = df.Orchestrator.create(orchestrator_function)
function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}
F2
import logging
def main(name: str) -> str:
logging.info(f"Hello {name}!")
return f"Hello {name}!"
function.json
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "name",
"type": "activityTrigger",
"direction": "in"
}
]
}