当数据从服务器按顺序到达时异步处理数据?
Process data asynchronously as it arrives from a server in order?
我正在使用 NetCoreServer 连接到 XMPP 聊天服务器。一切都按预期工作。每当服务器发送一条消息时,我都会使用正常方法处理它 processData(string data)
。问题是,如果该方法花费的时间超过特定时间,服务器将关闭连接。
我正在考虑异步执行该方法,但问题是来自服务器的消息可能会被拆分成多个部分。方法处理数据检测到这一点,如果收到的消息只是整个消息的一部分,它将存储它。下次调用它时将新消息附加到旧消息,检查新消息是否完成它或者是否需要等待下一条消息等等,直到它有一个完整的消息。然后它会继续处理它,所以如果它被异步调用,调用必须等待之前的调用才能执行,而不会阻塞 NetCoreServer 的 OnReceive。
我正在考虑在来自服务器的新数据到达时将 var task=new Task(() => { ProcessData(result); });
添加到队列,但我不知道如何链接它们的执行或如何继续。或者我可以在数据到达时将其存储在队列中,并在新消息添加到队列时以某种方式触发事件调用 ProcessData
。但是我遇到了同样的问题,除了不知道如何触发事件应该等待之前的事件完成。
ProcessData 看起来像这样:
public Class DataProcessor
{
private string Buffer;
public void processData(string data)
{
if(PartialData(data)) {
Buffer+=data;
return;
}
else //continue processing
}
有很多工具可以用来解决这个问题。在这里,我将展示一个 TPL Dataflow solution. You will need two ActionBlock<T>
s,一个用于连接消息的拆分部分,一个用于处理完整的消息。我在下面以相反的顺序写它们,因为第一个块在构建过程中需要知道第二个块。此示例假定每条完整消息的最后部分以点字符结尾:
var block2 = new ActionBlock<string[]>(parts =>
{
string completeMessage = String.Join(" ", parts);
Console.WriteLine($"Processing message: {completeMessage}");
});
var parts = new List<string>();
var block1 = new ActionBlock<string>(rawMessage =>
{
if (rawMessage is null) { block2.Complete(); return; }
parts.Add(rawMessage);
if (rawMessage.EndsWith("."))
{
block2.Post(parts.ToArray());
parts.Clear();
}
});
block1.Post("Hello");
block1.Post("world.");
block1.Post("The quick");
block1.Post("brown fox.");
block1.Post(null); // Signal that there are no more messages
block1.Complete();
await block2.Completion;
Console.WriteLine("Processing terminated");
输出:
Processing message: Hello world.
Processing message: The quick brown fox.
Processing terminated
如您所见,有一个 List<string>
保存了当前收到的不完整消息的各个部分。每次消息完成时,部分都会传播到 block2
,并清除列表。
一条值为 null
的特殊消息表示将不会再收到任何消息,因此您可以 Complete
the block2
and await
for its Completion
干净、优雅地终止进程。
这两个 ActionBlock<T>
正在并行工作。它们中的每一个都包含自己的内部输入队列,其中包含要处理的消息(该队列是无界的)。 Post
方法只是将消息发送到目标块的输入队列,而不等待消息的处理。如果消息被目标块接受,则此方法 return true
,否则 false
。不接受消息的常见原因是调用了 Complete
方法,或者由于发生未处理的异常而导致块失败。
TPL 数据流库是 .NET 6 中标准库的一部分。您无需安装任何东西即可使用它。除非您的目标是旧的 .NET Framework,您需要在其中安装 this NuGet 包。
我正在使用 NetCoreServer 连接到 XMPP 聊天服务器。一切都按预期工作。每当服务器发送一条消息时,我都会使用正常方法处理它 processData(string data)
。问题是,如果该方法花费的时间超过特定时间,服务器将关闭连接。
我正在考虑异步执行该方法,但问题是来自服务器的消息可能会被拆分成多个部分。方法处理数据检测到这一点,如果收到的消息只是整个消息的一部分,它将存储它。下次调用它时将新消息附加到旧消息,检查新消息是否完成它或者是否需要等待下一条消息等等,直到它有一个完整的消息。然后它会继续处理它,所以如果它被异步调用,调用必须等待之前的调用才能执行,而不会阻塞 NetCoreServer 的 OnReceive。
我正在考虑在来自服务器的新数据到达时将 var task=new Task(() => { ProcessData(result); });
添加到队列,但我不知道如何链接它们的执行或如何继续。或者我可以在数据到达时将其存储在队列中,并在新消息添加到队列时以某种方式触发事件调用 ProcessData
。但是我遇到了同样的问题,除了不知道如何触发事件应该等待之前的事件完成。
ProcessData 看起来像这样:
public Class DataProcessor
{
private string Buffer;
public void processData(string data)
{
if(PartialData(data)) {
Buffer+=data;
return;
}
else //continue processing
}
有很多工具可以用来解决这个问题。在这里,我将展示一个 TPL Dataflow solution. You will need two ActionBlock<T>
s,一个用于连接消息的拆分部分,一个用于处理完整的消息。我在下面以相反的顺序写它们,因为第一个块在构建过程中需要知道第二个块。此示例假定每条完整消息的最后部分以点字符结尾:
var block2 = new ActionBlock<string[]>(parts =>
{
string completeMessage = String.Join(" ", parts);
Console.WriteLine($"Processing message: {completeMessage}");
});
var parts = new List<string>();
var block1 = new ActionBlock<string>(rawMessage =>
{
if (rawMessage is null) { block2.Complete(); return; }
parts.Add(rawMessage);
if (rawMessage.EndsWith("."))
{
block2.Post(parts.ToArray());
parts.Clear();
}
});
block1.Post("Hello");
block1.Post("world.");
block1.Post("The quick");
block1.Post("brown fox.");
block1.Post(null); // Signal that there are no more messages
block1.Complete();
await block2.Completion;
Console.WriteLine("Processing terminated");
输出:
Processing message: Hello world.
Processing message: The quick brown fox.
Processing terminated
如您所见,有一个 List<string>
保存了当前收到的不完整消息的各个部分。每次消息完成时,部分都会传播到 block2
,并清除列表。
一条值为 null
的特殊消息表示将不会再收到任何消息,因此您可以 Complete
the block2
and await
for its Completion
干净、优雅地终止进程。
这两个 ActionBlock<T>
正在并行工作。它们中的每一个都包含自己的内部输入队列,其中包含要处理的消息(该队列是无界的)。 Post
方法只是将消息发送到目标块的输入队列,而不等待消息的处理。如果消息被目标块接受,则此方法 return true
,否则 false
。不接受消息的常见原因是调用了 Complete
方法,或者由于发生未处理的异常而导致块失败。
TPL 数据流库是 .NET 6 中标准库的一部分。您无需安装任何东西即可使用它。除非您的目标是旧的 .NET Framework,您需要在其中安装 this NuGet 包。