将入站消息的等待延迟限制为每秒多条消息
throttling an await delay for inbound messages to a number of messages per second
我正在尝试将循环(正在发送消息)限制为每秒特定数量的消息。 _throttle
是每秒的消息数。
我的初始算法如下所示,但延迟并不平滑。
我可以做哪些改进来消除相当颠簸的延迟和突发消息。
我试过 tick 和间隔最大值,但入站计数太大,很难补偿。在我的实现中关闭节流阀可以达到的最大速率约为 15000/秒。我正在以每秒 300 到 1000 之间的速率进行测试,因此我试图将其放慢一点。
private class ThrottleCalculator
{
private readonly int _throttle;
private DateTime _lastCalculation = DateTime.Now;
private int _count = 0;
private int _interval = 0;
public ThrottleCalculator(int throttle)
{
this._throttle = throttle;
}
public async Task CalculateThrottle()
{
this._count += 1;
var elapsed = DateTime.Now.Subtract(this._lastCalculation).TotalMilliseconds;
var tick = 50;
if (elapsed > tick)
{
this._lastCalculation = DateTime.Now;
int projection = this._count * (1000 / tick);
var errorTerm = this._throttle - projection;
this._interval = this._interval - errorTerm;
if (this._interval < 0)
this._interval = 0;
// this is often several thousand, so I have to limit.
if (this._interval > 100)
this._interval = 100;
await Task.Delay(this._interval);
this._count = 0;
}
}
}
使用它的代码每次迭代都会调用它。
var throttle = new ThrottleCalculator(600); // 600/s
while (message = getMessage())
{
... // do stuff with message.
if (throttle != null)
await throttle.CalculateThrottle();
PID 控制器算法
对于其他尝试此操作的人,正确的方法是 PID controller algorithm。
Proportional / Integral / Derivative Controller
我使用维基底部的算法作为基础。
我的 kp / ki / kd
似乎与此处的值配合得很好,使它们成比例似乎会产生稳定的消息流和非常紧凑的延迟值。
private class ThrottleCalculator
{
private readonly int _throttle;
private DateTime _lastCalculationTime;
private double _measured = 0;
private double _totalError = 0;
private double _integral = 0;
private double _lastError = 0;
public ThrottleCalculator(int throttle)
{
this._throttle = throttle;
this._lastCalculationTime = DateTime.MinValue;
}
public async Task CalculateThrottle()
{
var kp = -.1d; // proportional gain
var ki = -.1d; // integral gain
var kd = -.1d; // derivative gain
var dt = 30d; // rate of change of time. calculcations every ms;
this._measured += 1;
if (this._lastCalculationTime == DateTime.MinValue)
this._lastCalculationTime = DateTime.Now;
var elapsed = (double)DateTime.Now.Subtract(this._lastCalculationTime)
.TotalMilliseconds;
if (elapsed > dt)
{
this._lastCalculationTime = DateTime.Now;
var error = ((double)this._throttle / (1000d / dt)) - this._measured;
this._totalError += error;
var integral = this._totalError;
var derivative = (error - this._lastError) / elapsed;
var actual = (kp * error) + (ki * integral) + (kd * derivative);
var output = actual;
if (output < 1)
output = 0;
// i don't like this, but it seems necessary
// so that wild wait values are never used.
if (output > dt * 4)
output = dt * 4;
if (output > 0)
await Task.Delay((int)output);
this._measured = 0;
this._lastError = error;
}
}
}
我的价值观是这样的:
Actual: 19.2000 Output: 19.2000 Integral: -209 Derivative: .0000 Error: 17
Actual: 17.5000 Output: 17.5000 Integral: -192 Derivative: .0000 Error: 17
Actual: 15.8000 Output: 15.8000 Integral: -175 Derivative: .0000 Error: 17
Actual: 33.8104 Output: 33.8104 Integral: -255 Derivative: -3.1040 Error: -80
Actual: 21.8931 Output: 21.8931 Integral: -238 Derivative: 2.0686 Error: 17
Actual: 20.4000 Output: 20.4000 Integral: -221 Derivative: .0000 Error: 17
Actual: 18.7000 Output: 18.7000 Integral: -204 Derivative: .0000 Error: 17
Actual: 17.0000 Output: 17.0000 Integral: -187 Derivative: .0000 Error: 17
Actual: 15.3000 Output: 15.3000 Integral: -170 Derivative: .0000 Error: 17
Actual: 31.0752 Output: 31.0752 Integral: -239 Derivative: -2.7520 Error: -69
我正在尝试将循环(正在发送消息)限制为每秒特定数量的消息。 _throttle
是每秒的消息数。
我的初始算法如下所示,但延迟并不平滑。
我可以做哪些改进来消除相当颠簸的延迟和突发消息。
我试过 tick 和间隔最大值,但入站计数太大,很难补偿。在我的实现中关闭节流阀可以达到的最大速率约为 15000/秒。我正在以每秒 300 到 1000 之间的速率进行测试,因此我试图将其放慢一点。
private class ThrottleCalculator
{
private readonly int _throttle;
private DateTime _lastCalculation = DateTime.Now;
private int _count = 0;
private int _interval = 0;
public ThrottleCalculator(int throttle)
{
this._throttle = throttle;
}
public async Task CalculateThrottle()
{
this._count += 1;
var elapsed = DateTime.Now.Subtract(this._lastCalculation).TotalMilliseconds;
var tick = 50;
if (elapsed > tick)
{
this._lastCalculation = DateTime.Now;
int projection = this._count * (1000 / tick);
var errorTerm = this._throttle - projection;
this._interval = this._interval - errorTerm;
if (this._interval < 0)
this._interval = 0;
// this is often several thousand, so I have to limit.
if (this._interval > 100)
this._interval = 100;
await Task.Delay(this._interval);
this._count = 0;
}
}
}
使用它的代码每次迭代都会调用它。
var throttle = new ThrottleCalculator(600); // 600/s
while (message = getMessage())
{
... // do stuff with message.
if (throttle != null)
await throttle.CalculateThrottle();
PID 控制器算法
对于其他尝试此操作的人,正确的方法是 PID controller algorithm。
Proportional / Integral / Derivative Controller
我使用维基底部的算法作为基础。
我的 kp / ki / kd
似乎与此处的值配合得很好,使它们成比例似乎会产生稳定的消息流和非常紧凑的延迟值。
private class ThrottleCalculator
{
private readonly int _throttle;
private DateTime _lastCalculationTime;
private double _measured = 0;
private double _totalError = 0;
private double _integral = 0;
private double _lastError = 0;
public ThrottleCalculator(int throttle)
{
this._throttle = throttle;
this._lastCalculationTime = DateTime.MinValue;
}
public async Task CalculateThrottle()
{
var kp = -.1d; // proportional gain
var ki = -.1d; // integral gain
var kd = -.1d; // derivative gain
var dt = 30d; // rate of change of time. calculcations every ms;
this._measured += 1;
if (this._lastCalculationTime == DateTime.MinValue)
this._lastCalculationTime = DateTime.Now;
var elapsed = (double)DateTime.Now.Subtract(this._lastCalculationTime)
.TotalMilliseconds;
if (elapsed > dt)
{
this._lastCalculationTime = DateTime.Now;
var error = ((double)this._throttle / (1000d / dt)) - this._measured;
this._totalError += error;
var integral = this._totalError;
var derivative = (error - this._lastError) / elapsed;
var actual = (kp * error) + (ki * integral) + (kd * derivative);
var output = actual;
if (output < 1)
output = 0;
// i don't like this, but it seems necessary
// so that wild wait values are never used.
if (output > dt * 4)
output = dt * 4;
if (output > 0)
await Task.Delay((int)output);
this._measured = 0;
this._lastError = error;
}
}
}
我的价值观是这样的:
Actual: 19.2000 Output: 19.2000 Integral: -209 Derivative: .0000 Error: 17
Actual: 17.5000 Output: 17.5000 Integral: -192 Derivative: .0000 Error: 17
Actual: 15.8000 Output: 15.8000 Integral: -175 Derivative: .0000 Error: 17
Actual: 33.8104 Output: 33.8104 Integral: -255 Derivative: -3.1040 Error: -80
Actual: 21.8931 Output: 21.8931 Integral: -238 Derivative: 2.0686 Error: 17
Actual: 20.4000 Output: 20.4000 Integral: -221 Derivative: .0000 Error: 17
Actual: 18.7000 Output: 18.7000 Integral: -204 Derivative: .0000 Error: 17
Actual: 17.0000 Output: 17.0000 Integral: -187 Derivative: .0000 Error: 17
Actual: 15.3000 Output: 15.3000 Integral: -170 Derivative: .0000 Error: 17
Actual: 31.0752 Output: 31.0752 Integral: -239 Derivative: -2.7520 Error: -69