DurableOrchestrationClient.GetStatusAsync() returns 具有相同实例 ID 的实例即使在完成后仍在执行
DurableOrchestrationClient.GetStatusAsync() returns that an instance with the same Instance Id is still executing even after its completion
我在 consumption plan
上使用 durable function
singleton pattern
。我只想为队列消息中的相同 ID 执行该函数的一个实例。
我正在 local
环境中测试我的代码。尽管该函数成功完成,但具有相同 Instance Id 的实例仍在执行 DurableOrchestrationClient.GetStatusAsync() returns。
使用Storage Explorer
,我发现我在Table>DurableFunctionHubInstances
RuntimeStatus
中的函数行也被标记为Completed
。
为什么 DurableOrchestrationClient.GetStatusAsync()
报告存在具有相同实例 ID 的实例?我该如何解决这个问题?
[FunctionName("OF_QueueStart")]
public static async void QueueStart(
[QueueTrigger("queue-name", Connection = "connection")]MyObject req,
[OrchestrationClient]DurableOrchestrationClient starter,
ILogger log)
{
log.LogInformation("some logging here");
string instanceId = req.MyTenant + "-" + req.MyId;
log.LogInformation($"Checking if instance with instance ID {instanceId} already exists.");
var existingInstance = await starter.GetStatusAsync(instanceId);
if (existingInstance == null)
{
log.LogInformation($"Instance with instance ID {instanceId} does not exist.");
await starter.StartNewAsync("OF", instanceId, req);
log.LogInformation($"Started orchestration with ID = {instanceId}.");
}
else
{
log.LogInformation($"OF instance with Instance ID {instanceId} already exists.");
}
}
[FunctionName("OF")]
public static async void RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
log.LogInformation($"Executing orchestration with instance id {context.InstanceId}");
var input = context.GetInput<MyObject>();
var myObject2= await context.CallActivityAsync<MyObject2>("OF_F1", input.MyTenant);
var myObject3 = await context.CallActivityAsync<MyObject3>("OF_F2", input.MyId);
SqlConnection conn = new SqlConnection(GetConnectionString(tenantInfo.DbConnection));
/*Here will call OF_F3 and pass conn as a parameter. Currently this line is not written*/
conn.Close();
}
existingInstance
似乎充满了 DurableFunctionHubInstances
table 中具有相同 Partition Key
的行。我们还需要检查它的 RuntimeStatus
。
var existingInstance = await starter.GetStatusAsync(instanceId);
if (existingInstance == null || existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
{
log.LogInformation($"Instance with instance ID {instanceId} does not exist.");
await starter.StartNewAsync("OF", instanceId, req);
log.LogInformation($"Started orchestration with ID = {instanceId}.");
}
Python 代码:在使用新实例 ID
执行之前检查状态
async def main(event: func.EventGridEvent, SingletonOrchestration: str):
client = df.DurableOrchestrationClient(SingletonOrchestration)
instance_id = client_name + engagement_name + trigger_pattern
logging.info(f"Instance id '{instance_id}'")
logging.info(input_instanc_dict)
flag_set = False
loop_count = 0
while flag_set == False:
existing_instance = await **client.get_status(instance_id,False,False,False)**
logging.info(f"Orchestration with ID '{instance_id}' status ")
logging.info(existing_instance.__dict__)
if existing_instance._runtime_status == "Completed" or existing_instance._runtime_status == None or existing_instance._runtime_status == "Failed" or existing_instance._runtime_status == "Terminated" :
instance_id = **await client.start_new("GenericEventGridOrchestrator", instance_id, input_instanc_dict)**
logging.info(f"Started orchestration with ID = '{instance_id}'.")
flag_set = True
我在 consumption plan
上使用 durable function
singleton pattern
。我只想为队列消息中的相同 ID 执行该函数的一个实例。
我正在 local
环境中测试我的代码。尽管该函数成功完成,但具有相同 Instance Id 的实例仍在执行 DurableOrchestrationClient.GetStatusAsync() returns。
使用Storage Explorer
,我发现我在Table>DurableFunctionHubInstances
RuntimeStatus
中的函数行也被标记为Completed
。
为什么 DurableOrchestrationClient.GetStatusAsync()
报告存在具有相同实例 ID 的实例?我该如何解决这个问题?
[FunctionName("OF_QueueStart")]
public static async void QueueStart(
[QueueTrigger("queue-name", Connection = "connection")]MyObject req,
[OrchestrationClient]DurableOrchestrationClient starter,
ILogger log)
{
log.LogInformation("some logging here");
string instanceId = req.MyTenant + "-" + req.MyId;
log.LogInformation($"Checking if instance with instance ID {instanceId} already exists.");
var existingInstance = await starter.GetStatusAsync(instanceId);
if (existingInstance == null)
{
log.LogInformation($"Instance with instance ID {instanceId} does not exist.");
await starter.StartNewAsync("OF", instanceId, req);
log.LogInformation($"Started orchestration with ID = {instanceId}.");
}
else
{
log.LogInformation($"OF instance with Instance ID {instanceId} already exists.");
}
}
[FunctionName("OF")]
public static async void RunOrchestrator(
[OrchestrationTrigger] DurableOrchestrationContext context, ILogger log)
{
log.LogInformation($"Executing orchestration with instance id {context.InstanceId}");
var input = context.GetInput<MyObject>();
var myObject2= await context.CallActivityAsync<MyObject2>("OF_F1", input.MyTenant);
var myObject3 = await context.CallActivityAsync<MyObject3>("OF_F2", input.MyId);
SqlConnection conn = new SqlConnection(GetConnectionString(tenantInfo.DbConnection));
/*Here will call OF_F3 and pass conn as a parameter. Currently this line is not written*/
conn.Close();
}
existingInstance
似乎充满了 DurableFunctionHubInstances
table 中具有相同 Partition Key
的行。我们还需要检查它的 RuntimeStatus
。
var existingInstance = await starter.GetStatusAsync(instanceId);
if (existingInstance == null || existingInstance.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
{
log.LogInformation($"Instance with instance ID {instanceId} does not exist.");
await starter.StartNewAsync("OF", instanceId, req);
log.LogInformation($"Started orchestration with ID = {instanceId}.");
}
Python 代码:在使用新实例 ID
执行之前检查状态async def main(event: func.EventGridEvent, SingletonOrchestration: str):
client = df.DurableOrchestrationClient(SingletonOrchestration)
instance_id = client_name + engagement_name + trigger_pattern
logging.info(f"Instance id '{instance_id}'")
logging.info(input_instanc_dict)
flag_set = False
loop_count = 0
while flag_set == False:
existing_instance = await **client.get_status(instance_id,False,False,False)**
logging.info(f"Orchestration with ID '{instance_id}' status ")
logging.info(existing_instance.__dict__)
if existing_instance._runtime_status == "Completed" or existing_instance._runtime_status == None or existing_instance._runtime_status == "Failed" or existing_instance._runtime_status == "Terminated" :
instance_id = **await client.start_new("GenericEventGridOrchestrator", instance_id, input_instanc_dict)**
logging.info(f"Started orchestration with ID = '{instance_id}'.")
flag_set = True