有没有办法并行化 time.Sleep 但在 Go 中保持有效执行时间?
Is there a way to parallelise time.Sleep but keeping the effective execution time in Go?
我正在开发一个与 golang 中的慢查询日志重放器关联的慢查询日志解析器包。对于重放器,有以下代码(为了便于阅读,我在其中添加了注释):
for {
// method from my package that returns a Query object, containing headers values
// and the query itself
q := p.GetNext()
if q == (query.Query{}) {
break
}
db.logger.Tracef("query: %s", q.Query)
// we send the SQL query to a chan that is read by workers.
// workers just execute the query on the database, that's all.
// results from the db query are handled in another piece of the code, it doesn't really
// matter here
queries <- q.Query
// We need a reference time
if firstPass {
firstPass = false
previousDate = q.Time
continue
}
// Time field contains the Time: field value in the query header
now := q.Time
sleeping := now.Sub(previousDate)
db.logger.Tracef("next sleeping time: %s", sleeping)
time.Sleep(sleeping) // Here is my issue. For MariaDB the value is < 0 so no sleep is done
// For MariaDB, when there is multiple queries in a short amount of
// time, the Time field is not repeated, so we do not have to update
// the previous date.
if now != (time.Time{}) {
previousDate = now
}
}
我运行遇到一个有趣的问题:
在 MariaDB 慢查询日志中,如果 2 个(或更多)查询彼此接近,header 中没有 Time: 字段,这减少了前面代码片段中 time.Sleep(sleeping)
的数量。
但是,对于 MySQL-style 慢查询日志,查询中总是有一个 Time: 字段 header,这意味着每个查询都会进行睡眠(即使是 µs 睡眠持续时间)。
我注意到 MariaDB 和 MySQL 日志之间存在巨大的重播时间差异; MariaDB 的重放持续时间与实时(日志文件的第一个查询和最后一个查询之间的时间差)非常相似,但另一方面 MySQL 重放时间比 IRL 高得多。在玩了 pprof
之后,我注意到问题出在 time.Sleep
,尤其是 runtime.Futex
,这 CPU 很耗时。
我做了一些基准测试,持续时间结果与完成的 time.Sleep
数量相关(MySQL 比 MariaDB 更高)。
因此,我没有在单个线程中执行所有 time.Sleep
,而是在寻找一种不同的方式来并行执行它们而不改变有效时间,但我想不出办法这个。
我提出的解决方案如下:
type job struct {
query string
idle time.Time
}
...
var reference time.Time
start := time.Now()
for {
q := p.GetNext()
if q == (query.Query{}) {
s.Stop()
break
}
db.logger.Tracef("query: %s", q.Query)
r.queries++
s.Suffix = " queries replayed: " + strconv.Itoa(r.queries)
// We need a reference time
if firstPass {
firstPass = false
reference = q.Time
}
var j job
delta := q.Time.Sub(reference)
j.idle = start.Add(delta)
j.query = q.Query
db.logger.Tracef("next sleeping time: %s", j.idle)
jobs <- j
}
...
func (db database) worker(jobs chan job, errors chan error, wg *sync.WaitGroup) {
defer wg.Done()
for {
j, ok := <-jobs
if !ok {
db.logger.Trace("channel closed, worker exiting")
return
}
sleep := time.Until(j.idle)
if sleep > 0 {
time.Sleep(sleep)
}
rows, err := db.drv.Query(j.query)
if err != nil {
errors <- err
db.logger.Debugf("failed to execute query:\n%s\nerror: %s", j.query, err)
}
if rows != nil {
rows.Close()
}
}
}
解释:
我们将程序的开始保存在一个变量中(此处start
)。
接下来,我们设置一个参考时间(in reference
),这是慢查询日志文件的第一个时间戳。它永远不会改变。
然后,在每个新查询中,我们计算 reference
和当前查询时间戳 q.Time
之间的持续时间。让我们把它存储在 delta
.
我们将 delta
添加到 start
并且我们的时间线中有一个时间戳(不像慢查询日志文件中的过去那样)。我们将这个时间戳发送给我们创建的名为 job
.
的新结构中查询旁边的工作人员
当工作人员通过通道接收到作业时,他计算等待他可以进行查询的时间。如果它 <= 0,它会立即执行查询,否则他会等待。
我正在开发一个与 golang 中的慢查询日志重放器关联的慢查询日志解析器包。对于重放器,有以下代码(为了便于阅读,我在其中添加了注释):
for {
// method from my package that returns a Query object, containing headers values
// and the query itself
q := p.GetNext()
if q == (query.Query{}) {
break
}
db.logger.Tracef("query: %s", q.Query)
// we send the SQL query to a chan that is read by workers.
// workers just execute the query on the database, that's all.
// results from the db query are handled in another piece of the code, it doesn't really
// matter here
queries <- q.Query
// We need a reference time
if firstPass {
firstPass = false
previousDate = q.Time
continue
}
// Time field contains the Time: field value in the query header
now := q.Time
sleeping := now.Sub(previousDate)
db.logger.Tracef("next sleeping time: %s", sleeping)
time.Sleep(sleeping) // Here is my issue. For MariaDB the value is < 0 so no sleep is done
// For MariaDB, when there is multiple queries in a short amount of
// time, the Time field is not repeated, so we do not have to update
// the previous date.
if now != (time.Time{}) {
previousDate = now
}
}
我运行遇到一个有趣的问题:
在 MariaDB 慢查询日志中,如果 2 个(或更多)查询彼此接近,header 中没有 Time: 字段,这减少了前面代码片段中 time.Sleep(sleeping)
的数量。
但是,对于 MySQL-style 慢查询日志,查询中总是有一个 Time: 字段 header,这意味着每个查询都会进行睡眠(即使是 µs 睡眠持续时间)。
我注意到 MariaDB 和 MySQL 日志之间存在巨大的重播时间差异; MariaDB 的重放持续时间与实时(日志文件的第一个查询和最后一个查询之间的时间差)非常相似,但另一方面 MySQL 重放时间比 IRL 高得多。在玩了 pprof
之后,我注意到问题出在 time.Sleep
,尤其是 runtime.Futex
,这 CPU 很耗时。
我做了一些基准测试,持续时间结果与完成的 time.Sleep
数量相关(MySQL 比 MariaDB 更高)。
因此,我没有在单个线程中执行所有 time.Sleep
,而是在寻找一种不同的方式来并行执行它们而不改变有效时间,但我想不出办法这个。
我提出的解决方案如下:
type job struct {
query string
idle time.Time
}
...
var reference time.Time
start := time.Now()
for {
q := p.GetNext()
if q == (query.Query{}) {
s.Stop()
break
}
db.logger.Tracef("query: %s", q.Query)
r.queries++
s.Suffix = " queries replayed: " + strconv.Itoa(r.queries)
// We need a reference time
if firstPass {
firstPass = false
reference = q.Time
}
var j job
delta := q.Time.Sub(reference)
j.idle = start.Add(delta)
j.query = q.Query
db.logger.Tracef("next sleeping time: %s", j.idle)
jobs <- j
}
...
func (db database) worker(jobs chan job, errors chan error, wg *sync.WaitGroup) {
defer wg.Done()
for {
j, ok := <-jobs
if !ok {
db.logger.Trace("channel closed, worker exiting")
return
}
sleep := time.Until(j.idle)
if sleep > 0 {
time.Sleep(sleep)
}
rows, err := db.drv.Query(j.query)
if err != nil {
errors <- err
db.logger.Debugf("failed to execute query:\n%s\nerror: %s", j.query, err)
}
if rows != nil {
rows.Close()
}
}
}
解释:
我们将程序的开始保存在一个变量中(此处start
)。
接下来,我们设置一个参考时间(in reference
),这是慢查询日志文件的第一个时间戳。它永远不会改变。
然后,在每个新查询中,我们计算 reference
和当前查询时间戳 q.Time
之间的持续时间。让我们把它存储在 delta
.
我们将 delta
添加到 start
并且我们的时间线中有一个时间戳(不像慢查询日志文件中的过去那样)。我们将这个时间戳发送给我们创建的名为 job
.
当工作人员通过通道接收到作业时,他计算等待他可以进行查询的时间。如果它 <= 0,它会立即执行查询,否则他会等待。