sqlx.Next 和 sqlx.StructScan 可以同时使用吗?
Are sqlx.Next and sqlx.StructScan safe for concurrent use?
我在 MySQL 数据库中有一个很大的 table,我正在尝试尽可能高效地读取它。我正在考虑通过添加多个工作人员来加速代码,但是当我这样做时,我在 运行 开始时遇到编组错误(并且仅在开始时)它看起来像这样:
{"caller":"mysql.go:repository.(*MySQLRepo).GetNextBatch#428","error":"DBGetRecordException:
could not marshal episode comments: sql: Scan error on column index 4,
name "created_at": unsupported Scan, storing driver.Value type
[]uint8 into type
*time.Time","level":"error","ts":"2020-07-13T20:42:03.9621 658Z"}
如果我从 ImportLegacyComments 中删除工作代码并正常循环它,我不会得到这个错误。 sqlx.next 和 sqlx.StructScan 对多线程安全吗?如果不是,是否有安全的替代方法?
import (
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)
type BatchResult struct {
rows *sqlx.Rows
}
func (m *MySQLRepo) GetNextBatch(b *BatchResult) ([]model.EpisodeComment, error) {
var episodeComments []model.EpisodeComment
for i := 0; i < 1000 && b.rows.Next(); i++ {
var episodeComment model.EpisodeComment
err := b.rows.StructScan(&episodeComment)
if err != nil {
return nil, err
}
episodeComments = append(episodeComments, episodeComment)
}
return episodeComments, nil
}
func (m *MySQLRepo) FetchAllEpisodeComments() (*BatchResult, error) {
rows, err := m.db.Queryx("SELECT * FROM episode_comment")
if err != nil {
return nil, err
}
return &BatchResult{
rows: rows,
}, nil
}
func (svc *ImportService) ImportLegacyComments(ctx context.Context) error {
batchResult, err := svc.legacyCommentsRepo.FetchAllEpisodeComments()
var wg sync.WaitGroup
processor := func() {
comments, err := svc.legacyCommentsRepo.GetNextBatch(batchResult)
if err != nil {
svc.logger.Error(err)
}
for len(comments) > 0 {
comments, err = svc.legacyCommentsRepo.GetNextBatch(batchResult)
if err != nil {
svc.logger.Error(err)
}
svc.logger.Info("batch", "completed 1000")
}
wg.Done()
}
for i := 0; i < 20; i++ {
go processor()
wg.Add(1)
}
wg.Wait()
return err
}
sqlx.Next
和 sqlx.StructScan
不能同时使用。
如果您将代码的简单单元测试和 运行 与竞争检测器 go test -race
放在一起,它将报告 "database/sql".Rows
的未导出字段上的竞争条件结构:
Write at 0x00c00000e080 by goroutine 22:
github.com/lib/pq.(*rows).Next()
/Users/blackgreen/go/pkg/mod/github.com/lib/pq@v1.2.0/conn.go:1464 +0x8ec
...
Previous read at 0x00c00000e080 by goroutine 20:
database/sql.(*Rows).Scan()
/usr/local/go/src/database/sql/sql.go:3041 +0x2fa
...
如果我们找出导致竞争检测器抱怨的字段,我们可以看到正确记录了反对并发使用的指示:
// lastcols is only used in Scan, Next, and NextResultSet which are expected
// not to be called concurrently.
lastcols []driver.Value
我在 MySQL 数据库中有一个很大的 table,我正在尝试尽可能高效地读取它。我正在考虑通过添加多个工作人员来加速代码,但是当我这样做时,我在 运行 开始时遇到编组错误(并且仅在开始时)它看起来像这样:
{"caller":"mysql.go:repository.(*MySQLRepo).GetNextBatch#428","error":"DBGetRecordException: could not marshal episode comments: sql: Scan error on column index 4, name "created_at": unsupported Scan, storing driver.Value type []uint8 into type *time.Time","level":"error","ts":"2020-07-13T20:42:03.9621 658Z"}
如果我从 ImportLegacyComments 中删除工作代码并正常循环它,我不会得到这个错误。 sqlx.next 和 sqlx.StructScan 对多线程安全吗?如果不是,是否有安全的替代方法?
import (
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)
type BatchResult struct {
rows *sqlx.Rows
}
func (m *MySQLRepo) GetNextBatch(b *BatchResult) ([]model.EpisodeComment, error) {
var episodeComments []model.EpisodeComment
for i := 0; i < 1000 && b.rows.Next(); i++ {
var episodeComment model.EpisodeComment
err := b.rows.StructScan(&episodeComment)
if err != nil {
return nil, err
}
episodeComments = append(episodeComments, episodeComment)
}
return episodeComments, nil
}
func (m *MySQLRepo) FetchAllEpisodeComments() (*BatchResult, error) {
rows, err := m.db.Queryx("SELECT * FROM episode_comment")
if err != nil {
return nil, err
}
return &BatchResult{
rows: rows,
}, nil
}
func (svc *ImportService) ImportLegacyComments(ctx context.Context) error {
batchResult, err := svc.legacyCommentsRepo.FetchAllEpisodeComments()
var wg sync.WaitGroup
processor := func() {
comments, err := svc.legacyCommentsRepo.GetNextBatch(batchResult)
if err != nil {
svc.logger.Error(err)
}
for len(comments) > 0 {
comments, err = svc.legacyCommentsRepo.GetNextBatch(batchResult)
if err != nil {
svc.logger.Error(err)
}
svc.logger.Info("batch", "completed 1000")
}
wg.Done()
}
for i := 0; i < 20; i++ {
go processor()
wg.Add(1)
}
wg.Wait()
return err
}
sqlx.Next
和 sqlx.StructScan
不能同时使用。
如果您将代码的简单单元测试和 运行 与竞争检测器 go test -race
放在一起,它将报告 "database/sql".Rows
的未导出字段上的竞争条件结构:
Write at 0x00c00000e080 by goroutine 22:
github.com/lib/pq.(*rows).Next()
/Users/blackgreen/go/pkg/mod/github.com/lib/pq@v1.2.0/conn.go:1464 +0x8ec
...
Previous read at 0x00c00000e080 by goroutine 20:
database/sql.(*Rows).Scan()
/usr/local/go/src/database/sql/sql.go:3041 +0x2fa
...
如果我们找出导致竞争检测器抱怨的字段,我们可以看到正确记录了反对并发使用的指示:
// lastcols is only used in Scan, Next, and NextResultSet which are expected
// not to be called concurrently.
lastcols []driver.Value