回滚不适用于 Go 语言事务包装器

Rollback does not work well with Go language transactional wrapper


我发现了以下 Github 数据库事务处理包装器的实现,并决定尝试一下。


我正在使用 PostgreSQL 作为数据库。


testdb=> select * from products;
 product_id | price
 0001       |   200
 0002       |   100
 0003       |   150
 0004       |   300
(4 rows)

进程A成功后,故意让进程B失败,期望事务A回滚。但是,当我们 运行 它时,回滚不会发生,我们最终得到以下



package main

import (

    _ "github.com/jackc/pgx/v4/stdlib"

// transaction-wrapper-start
type txAdmin struct {

type Service struct {
    tx txAdmin

func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
    tx, err := t.BeginTx(ctx, nil)
    if err != nil {
        return err
    defer tx.Rollback()
    if err := f(ctx); err != nil {
        log.Printf("transaction err")
        return fmt.Errorf("transaction query failed: %w", err)

    return tx.Commit()

func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
    updateFunc := func(ctx context.Context) error {
        log.Printf("first process")
        // Process A
        if _, err := s.tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = ", productID); err != nil {
            log.Printf("first err")
            return err
        log.Printf("second process")

        // Process B(They are intentionally failing.)
        if _, err := s.tx.ExecContext(ctx, "...", productID); err != nil {
            log.Printf("second err")
            return err
        return nil
    return s.tx.Transaction(ctx, updateFunc)

// transaction-wrapper-end
func main() {

    data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
    if nil != err {

    database := Service {tx: txAdmin{data}}

    ctx := context.Background()

    database.UpdateProduct(ctx, "0004")


2022/05/26 13:28:55 update     
2022/05/26 13:28:55 transaction
2022/05/26 13:28:55 first process
2022/05/26 13:28:55 second process
2022/05/26 13:28:55 second err
2022/05/26 13:28:55 transaction err

数据库更改(如果回滚有效,id 0004 的价格应保持为 300。)

testdb=> select * from products;
 product_id | price
 0001       |   200
 0002       |   100
 0003       |   150
 0004       |   200
(4 rows)


========= PS。 以下没有包装器的代码可以正常工作。

package main

import (

    _ "github.com/jackc/pgx/v4/stdlib"

// transaction-defer-start
type Service struct {
    db *sql.DB

func (s *Service) UpdateProduct(ctx context.Context, productID string) (err error) {
    tx, err := s.db.Begin()
    if err != nil {
        return err
    defer tx.Rollback()

    if _, err = tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = ", productID); err != nil {
        log.Println("update err")
        return err

    if _, err = tx.ExecContext(ctx, "...", productID); err != nil {
        log.Println("update err")
        return err

    return tx.Commit()

// transaction-defer-end
func main() {
    var database Service
    dbConn, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=passs sslmode=disable")
    if nil != err {
    database.db = dbConn

    ctx := context.Background()

    database.UpdateProduct(ctx, "0004")

正如@Richard Huxton 所说,将 tx 传递给函数 f


  1. struct txAdmin 上添加一个字段以容纳 *sql.Tx,因此 txAdminDBTx 个字段
  2. Transaction 内将 tx 设置为 *txAdmin.Tx
  3. UpdateProduct 中对每个查询使用 *Service.tx.Tx


package main

import (

    _ "github.com/jackc/pgx/v4/stdlib"

// transaction-wrapper-start
type txAdmin struct {

type Service struct {
    tx txAdmin

func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
    tx, err := t.DB.BeginTx(ctx, nil)
    if err != nil {
        return err

    // set tx to Tx
    t.Tx = tx

    defer tx.Rollback()
    if err := f(ctx); err != nil {
        log.Printf("transaction err")
        return fmt.Errorf("transaction query failed: %w", err)

    return tx.Commit()

func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
    updateFunc := func(ctx context.Context) error {
        log.Printf("first process")
        // Process A
        if _, err := s.tx.Tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = ", productID); err != nil {
            log.Printf("first err")
            return err
        log.Printf("second process")

        // Process B(They are intentionally failing.)
        if _, err := s.tx.Tx.ExecContext(ctx, "...", productID); err != nil {
            log.Printf("second err")
            return err
        return nil
    return s.tx.Transaction(ctx, updateFunc)

// transaction-wrapper-end
func main() {
    data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
    if nil != err {

    database := Service{tx: txAdmin{DB: data}}

    ctx := context.Background()

    database.UpdateProduct(ctx, "0004")