在 .Net Core 3.1 应用程序的 ExecuteAsync 中等待后后台服务块
Background service blocks after await in ExecuteAsync in .Net Core 3.1 application
我需要一个服务,它可以连续轮询来自 Kafka 主题的消息,将消息提供给处理任务,从该任务中获取结果并将其推送到另一个 Kafka 主题。我使用 .Net Core 3.1 的 BackgroundService 作为基本基础设施,同时我编写了一个 ConsumerProducer class 封装了具有以下 public 方法的 Kafka 内容:
public async Task ReceiveAndReply(ReplyCallback replyCallback)
{
try
{
var msg = consumer.Pull();
var reply = await replyCallback(msg);
producer.PushAsync((message)reply);
}
catch (Exception e)
{
throw new Exception(e.InnerException.Message);
};
}
class派生的BackgroundService如下:
public class Worker : BackgroundService{
// initialization stuff....
protected ReplyCallback processDelegate = new ReplyCallback(ProcessQuery);
protected static Task<message> ProcessQuery(message source)
{
return new Task<message>>(() =>
{
try
{
var messagePayload= ProcessPulledMessage(source);
var replyMessage = new Message(messagePayload);
return replyMessage;
}
catch(Exception ex)
{
Console.WriteLine(ex.Message);
return null;
}
});
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await connector.ReceiveAndReply(processDelegate);
}
catch (Exception ex)
{
throw new Exception(ex.InnerException.Message);
}
}
}
}
代码进行了简化,但描述了情况。虽然 运行 我能够使用来自主题的消息,但是当涉及到等待 ProcessQuery 中的任务时,执行退出,应用程序继续 运行,但等待的任务永远不会执行。
也许我做的事情完全错了,我不是复杂异步工作的专家,所以任何提示都将非常受欢迎。
谢谢
您正在返回一个未开始的任务。
Task
constructor 的文档指出:
Rather than calling this constructor, the most common way to instantiate a Task object and launch a task is by calling the static Task.Run(Action) or TaskFactory.StartNew(Action) method.
我需要一个服务,它可以连续轮询来自 Kafka 主题的消息,将消息提供给处理任务,从该任务中获取结果并将其推送到另一个 Kafka 主题。我使用 .Net Core 3.1 的 BackgroundService 作为基本基础设施,同时我编写了一个 ConsumerProducer class 封装了具有以下 public 方法的 Kafka 内容:
public async Task ReceiveAndReply(ReplyCallback replyCallback)
{
try
{
var msg = consumer.Pull();
var reply = await replyCallback(msg);
producer.PushAsync((message)reply);
}
catch (Exception e)
{
throw new Exception(e.InnerException.Message);
};
}
class派生的BackgroundService如下:
public class Worker : BackgroundService{
// initialization stuff....
protected ReplyCallback processDelegate = new ReplyCallback(ProcessQuery);
protected static Task<message> ProcessQuery(message source)
{
return new Task<message>>(() =>
{
try
{
var messagePayload= ProcessPulledMessage(source);
var replyMessage = new Message(messagePayload);
return replyMessage;
}
catch(Exception ex)
{
Console.WriteLine(ex.Message);
return null;
}
});
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await connector.ReceiveAndReply(processDelegate);
}
catch (Exception ex)
{
throw new Exception(ex.InnerException.Message);
}
}
}
}
代码进行了简化,但描述了情况。虽然 运行 我能够使用来自主题的消息,但是当涉及到等待 ProcessQuery 中的任务时,执行退出,应用程序继续 运行,但等待的任务永远不会执行。 也许我做的事情完全错了,我不是复杂异步工作的专家,所以任何提示都将非常受欢迎。 谢谢
您正在返回一个未开始的任务。
Task
constructor 的文档指出:
Rather than calling this constructor, the most common way to instantiate a Task object and launch a task is by calling the static Task.Run(Action) or TaskFactory.StartNew(Action) method.