Thread.Sleep 的延迟导致在 C# 中生成时间戳时出现问题
Delay with Thread.Sleep is causing issues when producing Timestamps in C#
由于一个项目,我正在生成一些合成数据来构建用于分析目的的基础结构。
基础设施是通过数据模拟构建的,用于生成合成数据。此数据通过 UDP 发送到 node-red 服务器,然后通过 mqtt 移交给 kafka。
我只是生成一个布尔值和时间戳。现在我想分析这些时间戳之间的时间。数据应该在两者之间的某个时间生成。所以这是一个例子:
Data A:
{
isActivated: false,
Timestamp: "xxxxx"
}
Data B:
{
isActivated: true,
Timestamp: "xxxxx+deltaTime"
}
所以B.Timestamp-A.Timestamp = deltaTime
。到目前为止一切顺利,但是当我添加 Thread.Sleep(delay)
时,延迟将被添加到从 Kafka-Consumer 计算的 deltaTime 中......(我在计算消息中得到了正确的时间戳,而不是那些产生的时间戳来自 kafka 本身。我通过在 DataGenerator 中生成的时间戳中添加 2 天来测试它)
这里是代码示例:
public class CustomData
{
public DateTime Timestamp { get; set; }
public bool isActivated { get; set; }
}
public class DataGenerator
{
private bool DataAActivated { get; set; }
private IPAddress ipAddress = IPAddress.Parse("XXX.XXX.XXX.XXX");
private UdpClient udpClient = new UdpClient();
private IPEndPoint iPEndPoint = new IPEndPoint(ipAddress, XXXXX);
public DataGenerator (bool dataAActivated)
{
DataAActivated = dataAActivated;
}
public void GenerateData(double delay, int deltaTime)
{
DateTime ts0 = DateTime.Now;
DateTime ts2 = ts0.AddMilliseconds(deltaTime);
if (DataAActivated)
{
CustomData dataA = new CustomData();
dataA.isActivated = false;
dataA.Timestamp = ts0;
CustomData dataB = new CustomData();
dataB.isActivated = true;
dataB.Timestamp = ts2;
}
else
{
CustomData dataB = new CustomData();
dataB.isActivated = false;
dataB.Timestamp = ts0;
CustomData dataA = new CustomData();
dataA.isActivated = true;
dataA.Timestamp = ts2;
}
// this is causing issues
Thread.Sleep((int)delay);
SendData(dataA);
SendData(dataB);
}
private void SendData(CustomData data)
{
udpClient.Connect(iPEndPoint);
byte[] jsonUtf8Bytes;
var options = new JsonSerializerOptions
{
WriteIndented = true
};
jsonUtf8Bytes = System.Text.Json.
JsonSerializer.SerializeToUtf8Bytes(data, options);
udpClient.Send(jsonUtf8Bytes,jsonUtf8Bytes.Length);
udpClient.Close();
}
}
---------------- within Kafka-Consumer ----------------
public double CalcDuration( CustomData dataA, CustomData dataB)
{
double duration = dataB.Timestamp.Subtract(dataA.Timestamp).TotalMilliseconds;
Console.WriteLine($"duration: {duration}");
return duration;
}
如果 deltaTime 为 100 毫秒,延迟为 500 毫秒,输出将如下所示
不符合 Thread.Sleep(delay):
duration: 100ms
duration: 100ms
...
与 Thread.Sleep(延迟):
duration: 600ms
duration: 600ms
...
有人能告诉我如何解决这个问题吗?
我希望我能说清楚,但不要犹豫让我知道编辑这个 post。
非常感谢,
问候
我发现了问题。正如彼得和玛丽安所提到的,我在问题中发布的代码不会导致失败。
这让我开始了解其他服务的代码。通过清理整个kafka-cluster相关主题并重新开始生产,我看到在Producer/Consumer链的第二个服务中我设置了错误的初始状态。
所以第一条消息被跳过,因此时间戳之间的计算是延迟之前的一个和延迟之后的一个。那是因为我为计算持续时间设置的约束来自相同的偏移量。
所以要说清楚:
发送到卡夫卡的数据到达主题“摄取”
消息会是这样的:
ingestion:
(1)dataA
(2)dataB
(3)dataA
(4)dataB
...
从那以后,我将它们分配到两个主题“已激活”、“未激活”,因为来自 notActivated 数据的时间戳每次都大于来自激活的时间戳。为了确保没有丢包或发生其他事情,我通过在调度程序中保存最后一个状态(与 CustomData 的 isActive 属性相关)来证明这一点。那个状态最初设置错误,所以发生的事情是:
notActivated:
(1)dataA //--> this entry was not there because it was skipped refering to false state matching
(4)dataB
activated:
(2)dataB
(3)dataA
所以持续时间的计算是
duration = (4)dataB - (2)dataB
因此持续时间为:
duration = deltaTime + delay
再次感谢彼得和玛丽安
莫迪斯
由于一个项目,我正在生成一些合成数据来构建用于分析目的的基础结构。
基础设施是通过数据模拟构建的,用于生成合成数据。此数据通过 UDP 发送到 node-red 服务器,然后通过 mqtt 移交给 kafka。
我只是生成一个布尔值和时间戳。现在我想分析这些时间戳之间的时间。数据应该在两者之间的某个时间生成。所以这是一个例子:
Data A:
{
isActivated: false,
Timestamp: "xxxxx"
}
Data B:
{
isActivated: true,
Timestamp: "xxxxx+deltaTime"
}
所以B.Timestamp-A.Timestamp = deltaTime
。到目前为止一切顺利,但是当我添加 Thread.Sleep(delay)
时,延迟将被添加到从 Kafka-Consumer 计算的 deltaTime 中......(我在计算消息中得到了正确的时间戳,而不是那些产生的时间戳来自 kafka 本身。我通过在 DataGenerator 中生成的时间戳中添加 2 天来测试它)
这里是代码示例:
public class CustomData
{
public DateTime Timestamp { get; set; }
public bool isActivated { get; set; }
}
public class DataGenerator
{
private bool DataAActivated { get; set; }
private IPAddress ipAddress = IPAddress.Parse("XXX.XXX.XXX.XXX");
private UdpClient udpClient = new UdpClient();
private IPEndPoint iPEndPoint = new IPEndPoint(ipAddress, XXXXX);
public DataGenerator (bool dataAActivated)
{
DataAActivated = dataAActivated;
}
public void GenerateData(double delay, int deltaTime)
{
DateTime ts0 = DateTime.Now;
DateTime ts2 = ts0.AddMilliseconds(deltaTime);
if (DataAActivated)
{
CustomData dataA = new CustomData();
dataA.isActivated = false;
dataA.Timestamp = ts0;
CustomData dataB = new CustomData();
dataB.isActivated = true;
dataB.Timestamp = ts2;
}
else
{
CustomData dataB = new CustomData();
dataB.isActivated = false;
dataB.Timestamp = ts0;
CustomData dataA = new CustomData();
dataA.isActivated = true;
dataA.Timestamp = ts2;
}
// this is causing issues
Thread.Sleep((int)delay);
SendData(dataA);
SendData(dataB);
}
private void SendData(CustomData data)
{
udpClient.Connect(iPEndPoint);
byte[] jsonUtf8Bytes;
var options = new JsonSerializerOptions
{
WriteIndented = true
};
jsonUtf8Bytes = System.Text.Json.
JsonSerializer.SerializeToUtf8Bytes(data, options);
udpClient.Send(jsonUtf8Bytes,jsonUtf8Bytes.Length);
udpClient.Close();
}
}
---------------- within Kafka-Consumer ----------------
public double CalcDuration( CustomData dataA, CustomData dataB)
{
double duration = dataB.Timestamp.Subtract(dataA.Timestamp).TotalMilliseconds;
Console.WriteLine($"duration: {duration}");
return duration;
}
如果 deltaTime 为 100 毫秒,延迟为 500 毫秒,输出将如下所示 不符合 Thread.Sleep(delay):
duration: 100ms
duration: 100ms
...
与 Thread.Sleep(延迟):
duration: 600ms
duration: 600ms
...
有人能告诉我如何解决这个问题吗?
我希望我能说清楚,但不要犹豫让我知道编辑这个 post。
非常感谢,
问候
我发现了问题。正如彼得和玛丽安所提到的,我在问题中发布的代码不会导致失败。
这让我开始了解其他服务的代码。通过清理整个kafka-cluster相关主题并重新开始生产,我看到在Producer/Consumer链的第二个服务中我设置了错误的初始状态。
所以第一条消息被跳过,因此时间戳之间的计算是延迟之前的一个和延迟之后的一个。那是因为我为计算持续时间设置的约束来自相同的偏移量。
所以要说清楚: 发送到卡夫卡的数据到达主题“摄取” 消息会是这样的:
ingestion:
(1)dataA
(2)dataB
(3)dataA
(4)dataB
...
从那以后,我将它们分配到两个主题“已激活”、“未激活”,因为来自 notActivated 数据的时间戳每次都大于来自激活的时间戳。为了确保没有丢包或发生其他事情,我通过在调度程序中保存最后一个状态(与 CustomData 的 isActive 属性相关)来证明这一点。那个状态最初设置错误,所以发生的事情是:
notActivated:
(1)dataA //--> this entry was not there because it was skipped refering to false state matching
(4)dataB
activated:
(2)dataB
(3)dataA
所以持续时间的计算是
duration = (4)dataB - (2)dataB
因此持续时间为:
duration = deltaTime + delay
再次感谢彼得和玛丽安
莫迪斯