有没有办法并行化 time.Sleep 但在 Go 中保持有效执行时间?

Is there a way to parallelise time.Sleep but keeping the effective execution time in Go?

我正在开发一个与 golang 中的慢查询日志重放器关联的慢查询日志解析器包。对于重放器,有以下代码(为了便于阅读,我在其中添加了注释):

for {
        // method from my package that returns a Query object, containing headers values
        // and the query itself
        q := p.GetNext()
        if q == (query.Query{}) {
            break
        }
        db.logger.Tracef("query: %s", q.Query)

        // we send the SQL query to a chan that is read by workers.
        // workers just execute the query on the database, that's all.
        // results from the db query are handled in another piece of the code, it doesn't really
        // matter here
        queries <- q.Query

        // We need a reference time
        if firstPass {
            firstPass = false
            previousDate = q.Time
            continue
        }
        
        // Time field contains the Time: field value in the query header
        now := q.Time
        sleeping := now.Sub(previousDate)
        db.logger.Tracef("next sleeping time: %s", sleeping)
        time.Sleep(sleeping) // Here is my issue. For MariaDB the value is < 0 so no sleep is done

        // For MariaDB, when there is multiple queries in a short amount of
        // time, the Time field is not repeated, so we do not have to update
        // the previous date.
        if now != (time.Time{}) {
            previousDate = now
        }
    }

我运行遇到一个有趣的问题: 在 MariaDB 慢查询日志中,如果 2 个(或更多)查询彼此接近,header 中没有 Time: 字段,这减少了前面代码片段中 time.Sleep(sleeping) 的数量。 但是,对于 MySQL-style 慢查询日志,查询中总是有一个 Time: 字段 header,这意味着每个查询都会进行睡眠(即使是 µs 睡眠持续时间)。

我注意到 MariaDB 和 MySQL 日志之间存在巨大的重播时间差异; MariaDB 的重放持续时间与实时(日志文件的第一个查询和最后一个查询之间的时间差)非常相似,但另一方面 MySQL 重放时间比 IRL 高得多。在玩了 pprof 之后,我注意到问题出在 time.Sleep,尤其是 runtime.Futex,这 CPU 很耗时。

我做了一些基准测试,持续时间结果与完成的 time.Sleep 数量相关(MySQL 比 MariaDB 更高)。

因此,我没有在单个线程中执行所有 time.Sleep,而是在寻找一种不同的方式来并行执行它们而不改变有效时间,但我想不出办法这个。

我提出的解决方案如下:

type job struct {
    query string
    idle  time.Time
}

...
    var reference time.Time
    start := time.Now()
    for {
        q := p.GetNext()
        if q == (query.Query{}) {
            s.Stop()
            break
        }
        db.logger.Tracef("query: %s", q.Query)

        r.queries++
        s.Suffix = " queries replayed: " + strconv.Itoa(r.queries)

        // We need a reference time
        if firstPass {
            firstPass = false
            reference = q.Time
        }

        var j job
        delta := q.Time.Sub(reference)
        j.idle = start.Add(delta)
        j.query = q.Query
        db.logger.Tracef("next sleeping time: %s", j.idle)
        jobs <- j
    }

...

func (db database) worker(jobs chan job, errors chan error, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        j, ok := <-jobs
        if !ok {
            db.logger.Trace("channel closed, worker exiting")
            return
        }
        sleep := time.Until(j.idle)
        if sleep > 0 {
            time.Sleep(sleep)
        }
        rows, err := db.drv.Query(j.query)
        if err != nil {
            errors <- err
            db.logger.Debugf("failed to execute query:\n%s\nerror: %s", j.query, err)
        }
        if rows != nil {
            rows.Close()
        }
    }
}

解释:

我们将程序的开始保存在一个变量中(此处start)。 接下来,我们设置一个参考时间(in reference),这是慢查询日志文件的第一个时间戳。它永远不会改变。

然后,在每个新查询中,我们计算 reference 和当前查询时间戳 q.Time 之间的持续时间。让我们把它存储在 delta.

我们将 delta 添加到 start 并且我们的时间线中有一个时间戳(不像慢查询日志文件中的过去那样)。我们将这个时间戳发送给我们创建的名为 job.

的新结构中查询旁边的工作人员

当工作人员通过通道接收到作业时,他计算等待他可以进行查询的时间。如果它 <= 0,它会立即执行查询,否则他会等待。