Thread.Sleep 的延迟导致在 C# 中生成时间戳时出现问题

Delay with Thread.Sleep is causing issues when producing Timestamps in C#

由于一个项目,我正在生成一些合成数据来构建用于分析目的的基础结构。

基础设施是通过数据模拟构建的,用于生成合成数据。此数据通过 UDP 发送到 node-red 服务器,然后通过 mqtt 移交给 kafka。

我只是生成一个布尔值和时间戳。现在我想分析这些时间戳之间的时间。数据应该在两者之间的某个时间生成。所以这是一个例子:

Data A:
{
isActivated: false,
Timestamp: "xxxxx"
}

Data B:
{
isActivated: true,
Timestamp: "xxxxx+deltaTime"
}

所以B.Timestamp-A.Timestamp = deltaTime。到目前为止一切顺利,但是当我添加 Thread.Sleep(delay) 时,延迟将被添加到从 Kafka-Consumer 计算的 deltaTime 中......(我在计算消息中得到了正确的时间戳,而不是那些产生的时间戳来自 kafka 本身。我通过在 DataGenerator 中生成的时间戳中添加 2 天来测试它)

这里是代码示例:

public class CustomData
{
    public DateTime Timestamp { get; set; }
    public bool isActivated { get; set; }
}


public class DataGenerator 
{
    private bool DataAActivated { get; set; }
    private IPAddress ipAddress = IPAddress.Parse("XXX.XXX.XXX.XXX");
    private UdpClient udpClient = new UdpClient();
    private IPEndPoint iPEndPoint = new IPEndPoint(ipAddress, XXXXX);
    
    public DataGenerator (bool dataAActivated)
    {
         DataAActivated = dataAActivated;
    }

    public void GenerateData(double delay, int deltaTime) 
    {
        DateTime ts0 = DateTime.Now;
        DateTime ts2 = ts0.AddMilliseconds(deltaTime);
    
        if (DataAActivated) 
        {
            CustomData dataA = new CustomData();
            dataA.isActivated = false;
            dataA.Timestamp = ts0;

            CustomData dataB = new CustomData();
            dataB.isActivated = true;
            dataB.Timestamp = ts2;
        } 
        else
        {
            CustomData dataB = new CustomData();
            dataB.isActivated = false;
            dataB.Timestamp = ts0;

            CustomData dataA = new CustomData();
            dataA.isActivated = true;
            dataA.Timestamp = ts2;
        }
    
        // this is causing issues
        Thread.Sleep((int)delay);

        SendData(dataA);
        SendData(dataB);
    }
    
    private void SendData(CustomData data)
        {
            udpClient.Connect(iPEndPoint);

            byte[] jsonUtf8Bytes;
            var options = new JsonSerializerOptions
            {
                WriteIndented = true
            };
            jsonUtf8Bytes = System.Text.Json.
                              JsonSerializer.SerializeToUtf8Bytes(data, options);
            
            udpClient.Send(jsonUtf8Bytes,jsonUtf8Bytes.Length);
            udpClient.Close();
        }
}
 
---------------- within Kafka-Consumer ----------------
public double CalcDuration( CustomData dataA, CustomData dataB)
{
    double duration = dataB.Timestamp.Subtract(dataA.Timestamp).TotalMilliseconds;
    Console.WriteLine($"duration: {duration}");
    return duration;
}

如果 deltaTime 为 100 毫秒,延迟为 500 毫秒,输出将如下所示 不符合 Thread.Sleep(delay):

duration: 100ms
duration: 100ms
...

与 Thread.Sleep(延迟):

duration: 600ms
duration: 600ms
...

有人能告诉我如何解决这个问题吗?

我希望我能说清楚,但不要犹豫让我知道编辑这个 post。

非常感谢,

问候

我发现了问题。正如彼得和玛丽安所提到的,我在问题中发布的代码不会导致失败。

这让我开始了解其他服务的代码。通过清理整个kafka-cluster相关主题并重新开始生产,我看到在Producer/Consumer链的第二个服务中我设置了错误的初始状态。

所以第一条消息被跳过,因此时间戳之间的计算是延迟之前的一个和延迟之后的一个。那是因为我为计算持续时间设置的约束来自相同的偏移量。

所以要说清楚: 发送到卡夫卡的数据到达主题“摄取” 消息会是这样的:

ingestion:
(1)dataA
(2)dataB
(3)dataA
(4)dataB
...

从那以后,我将它们分配到两个主题“已激活”、“未激活”,因为来自 notActivated 数据的时间戳每次都大于来自激活的时间戳。为了确保没有丢包或发生其他事情,我通过在调度程序中保存最后一个状态(与 CustomData 的 isActive 属性相关)来证明这一点。那个状态最初设置错误,所以发生的事情是:

notActivated:
(1)dataA //--> this entry was not there because it was skipped refering to false state matching
(4)dataB

activated:
(2)dataB
(3)dataA

所以持续时间的计算是

duration = (4)dataB - (2)dataB 

因此持续时间为:

duration = deltaTime + delay

再次感谢彼得和玛丽安

莫迪斯