Python 中的 Azure Durable Functions:为什么未触发函数 2?

Azure Durable Functions in Python: why is function 2 not triggered?

我的用例如下:

  1. F1:生成一些数据并将它们写入 CosmosDB(使用时间触发器)
  2. F2:读取刚刚写入的数据并添加用户名
  3. Orchestrator:控制工作流,F1完成后调用F2

我的问题:只有F1能用,F2根本不触发。为什么? F1 是否必须 return 触发器或其他东西?

这就是我知道只有 F1 被执行的方式:

F1

import logging
import hashlib
import time
import datetime
from azure.cosmos import CosmosClient
import azure.functions as func

def generate_id(string=None, length=10):
    '''This function generates a hash id to be attached to each new row'''

    ts = time.time()
    guid = hashlib.shake_128((str(string) + str(ts)).encode()).hexdigest(10)
    return guid

def main(mytimer: func.TimerRequest, outputDocument: func.Out[func.Document]) -> None:

    utc_timestamp = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()

    if mytimer.past_due:
        logging.info('The timer is past due')

    logging.info('Python timer trigger function ran at %s', utc_timestamp)

    result1 = {
    "first_letter": "A",
    "second_letter": "B",
    "third_letter": "C",
    "score": 0.001,
    }

    result1['id'] = generate_id()

    outputDocument.set(func.Document.from_dict(result1))

    return

F1function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "mytimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "*/30 * * * * *"
    },
    {
      "type": "cosmosDB",
      "direction": "out",
      "name": "outputDocument",
      "databaseName": myCosmosDB,
      "collectionName": myContainer,
      "createIfNotExists": "true",
      "connectionStringSetting": myConnString,
      "partitionKey": "id"
    }
  ]
}

F2

import logging
import azure.functions as func
from azure.cosmos import CosmosClient

def add_username(string=None):
    '''Generate username'''

    name = "MyName"
    surname = "MySurname"
    username = name+" "+surname

    return username


def main(F1activitytrigger, inputDocument: func.DocumentList) -> str:

    if inputDocument:
        logging.info('Document id: %s', inputDocument[0]['id'])

    result2 = inputDocument[0].data

    result2['username'] = add_username() 

    return result2

F2function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "F1activitytrigger",
      "type": "activityTrigger",
      "direction": "in"
    },
    {
      "type": "cosmosDB",
      "direction": "in",
      "name": "inputDocument",
      "databaseName": myCosmosDB,
      "collectionName": myContainer,
      "createIfNotExists": "true",
      "connectionStringSetting": myConnString,
      "partitionKey": "id"     
    }
  ]
}

管弦乐队

import logging
import json

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    result1 = yield context.call_activity('Test-F1')
    result2 = yield context.call_activity('Test-F2')
    return [result1, result2]

main = df.Orchestrator.create(orchestrator_function)

管弦乐队 function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "context",
      "type": "orchestrationTrigger",
      "direction": "in"
    }
  ]
}

根据你的描述,F1是一个定时触发的函数(类型是'timerTrigger')。

F1 不需要 Orchestrator 调用,但 F2 需要。

需要在Orchestrator中调用F2,因为F2的类型是'activityTrigger'。

因此,您的持久函数应如下所示:

F1

import logging

import azure.functions as func
import azure.durable_functions as df


async def main(mytimer: func.TimerRequest, starter: str) -> None:
    client = df.DurableOrchestrationClient(starter)
    instance_id = await client.start_new("YourOrchestratorName", None, None)
    logging.info(f"Started orchestration with ID = '{instance_id}'.")

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "mytimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "* * * * * *"
    },
    {
      "name": "starter",
      "type": "durableClient",
      "direction": "in"
    }
  ]
}

YourOrchestratorName

import logging
import json

import azure.functions as func
import azure.durable_functions as df


def orchestrator_function(context: df.DurableOrchestrationContext):
    result1 = yield context.call_activity('F2', "Tokyo")
    result2 = yield context.call_activity('F2', "Seattle")
    result3 = yield context.call_activity('F2', "London")
    return [result1, result2, result3]

main = df.Orchestrator.create(orchestrator_function)

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "context",
      "type": "orchestrationTrigger",
      "direction": "in"
    }
  ]
}

F2

import logging


def main(name: str) -> str:
    logging.info(f"Hello {name}!")
    return f"Hello {name}!"

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "name",
      "type": "activityTrigger",
      "direction": "in"
    }
  ]
}