当数据从服务器按顺序到达时异步处理数据?

Process data asynchronously as it arrives from a server in order?

我正在使用 NetCoreServer 连接到 XMPP 聊天服务器。一切都按预期工作。每当服务器发送一条消息时,我都会使用正常方法处理它 processData(string data)。问题是,如果该方法花费的时间超过特定时间,服务器将关闭连接。

我正在考虑异步执行该方法,但问题是来自服务器的消息可能会被拆分成多个部分。方法处理数据检测到这一点,如果收到的消息只是整个消息的一部分,它将存储它。下次调用它时将新消息附加到旧消息,检查新消息是否完成它或者是否需要等待下一条消息等等,直到它有一个完整的消息。然后它会继续处理它,所以如果它被异步调用,调用必须等待之前的调用才能执行,而不会阻塞 NetCoreServer 的 OnReceive。

我正在考虑在来自服务器的新数据到达时将 var task=new Task(() => { ProcessData(result); }); 添加到队列,但我不知道如何链接它们的执行或如何继续。或者我可以在数据到达时将其存储在队列中,并在新消息添加到队列时以某种方式触发事件调用 ProcessData。但是我遇到了同样的问题,除了不知道如何触发事件应该等待之前的事件完成。

ProcessData 看起来像这样:

public Class DataProcessor
{
private string Buffer;
public void processData(string data)
{
    if(PartialData(data)) {
        Buffer+=data;
        return;
    }
    else //continue processing
}

有很多工具可以用来解决这个问题。在这里,我将展示一个 TPL Dataflow solution. You will need two ActionBlock<T>s,一个用于连接消息的拆分部分,一个用于处理完整的消​​息。我在下面以相反的顺序写它们,因为第一个块在构建过程中需要知道第二个块。此示例假定每条完整消息的最后部分以点字符结尾:

var block2 = new ActionBlock<string[]>(parts =>
{
    string completeMessage = String.Join(" ", parts);
    Console.WriteLine($"Processing message: {completeMessage}");
});

var parts = new List<string>();
var block1 = new ActionBlock<string>(rawMessage =>
{
    if (rawMessage is null) { block2.Complete(); return; }
    parts.Add(rawMessage);
    if (rawMessage.EndsWith("."))
    {
        block2.Post(parts.ToArray());
        parts.Clear();
    }
});

block1.Post("Hello");
block1.Post("world.");
block1.Post("The quick");
block1.Post("brown fox.");

block1.Post(null); // Signal that there are no more messages
block1.Complete();
await block2.Completion;
Console.WriteLine("Processing terminated");

输出:

Processing message: Hello world.
Processing message: The quick brown fox.
Processing terminated

(Live demo)

如您所见,有一个 List<string> 保存了当前收到的不完整消息的各个部分。每次消息完成时,部分都会传播到 block2,并清除列表。

一条值为 null 的特殊消息表示将不会再收到任何消息,因此您可以 Complete the block2 and await for its Completion 干净、优雅地终止进程。

这两个 ActionBlock<T> 正在并行工作。它们中的每一个都包含自己的内部输入队列,其中包含要处理的消息(该队列是无界的)。 Post方法只是将消息发送到目标块的输入队列,而不等待消息的处理。如果消息被目标块接受,则此方法 return true,否则 false。不接受消息的常见原因是调用了 Complete 方法,或者由于发生未处理的异常而导致块失败。

TPL 数据流库是 .NET 6 中标准库的一部分。您无需安装任何东西即可使用它。除非您的目标是旧的 .NET Framework,您需要在其中安装 this NuGet 包。