回滚不适用于 Go 语言事务包装器

Rollback does not work well with Go language transactional wrapper

最近开始学习围棋

我发现了以下 Github 数据库事务处理包装器的实现,并决定尝试一下。

(来源)https://github.com/oreilly-japan/practical-go-programming/blob/master/ch09/transaction/wrapper/main.go

我正在使用 PostgreSQL 作为数据库。

最初,它包含以下数据。

testdb=> select * from products;
 product_id | price
------------+-------
 0001       |   200
 0002       |   100
 0003       |   150
 0004       |   300
(4 rows)

进程A成功后,故意让进程B失败,期望事务A回滚。但是,当我们 运行 它时,回滚不会发生,我们最终得到以下

事实上,因为B失败了,应该回滚进程A,数据库值应该没有变化。

我已经在一些地方插入了日志来确认这一点,但我不确定。为什么回滚没有执行?

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"

    _ "github.com/jackc/pgx/v4/stdlib"
)

// transaction-wrapper-start
type txAdmin struct {
    *sql.DB
}

type Service struct {
    tx txAdmin
}

func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
    log.Printf("transaction")
    tx, err := t.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    if err := f(ctx); err != nil {
        log.Printf("transaction err")
        return fmt.Errorf("transaction query failed: %w", err)
    }

    log.Printf("commit")
    return tx.Commit()
}

func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
    updateFunc := func(ctx context.Context) error {
        log.Printf("first process")
        // Process A
        if _, err := s.tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = ", productID); err != nil {
            log.Printf("first err")
            return err
        }
        log.Printf("second process")

        // Process B(They are intentionally failing.)
        if _, err := s.tx.ExecContext(ctx, "...", productID); err != nil {
            log.Printf("second err")
            return err
        }
        return nil
    }
    log.Printf("update")
    return s.tx.Transaction(ctx, updateFunc)
}

// transaction-wrapper-end
func main() {

    data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
    if nil != err {
        log.Fatal(err)
    }

    database := Service {tx: txAdmin{data}}

    ctx := context.Background()

    database.UpdateProduct(ctx, "0004")
}

输出

2022/05/26 13:28:55 update     
2022/05/26 13:28:55 transaction
2022/05/26 13:28:55 first process
2022/05/26 13:28:55 second process
2022/05/26 13:28:55 second err
2022/05/26 13:28:55 transaction err

数据库更改(如果回滚有效,id 0004 的价格应保持为 300。)

testdb=> select * from products;
 product_id | price
------------+-------
 0001       |   200
 0002       |   100
 0003       |   150
 0004       |   200
(4 rows)

请告诉我如何使用包装器正确处理交易。

========= PS。 以下没有包装器的代码可以正常工作。

package main

import (
    "context"
    "database/sql"
    "log"

    _ "github.com/jackc/pgx/v4/stdlib"
)

// transaction-defer-start
type Service struct {
    db *sql.DB
}

func (s *Service) UpdateProduct(ctx context.Context, productID string) (err error) {
    tx, err := s.db.Begin()
    if err != nil {
        return err
    }
    defer tx.Rollback()

    if _, err = tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = ", productID); err != nil {
        log.Println("update err")
        return err
    }

    if _, err = tx.ExecContext(ctx, "...", productID); err != nil {
        log.Println("update err")
        return err
    }

    return tx.Commit()
}

// transaction-defer-end
func main() {
    var database Service
    dbConn, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=passs sslmode=disable")
    if nil != err {
        log.Fatal(err)
    }
    database.db = dbConn

    ctx := context.Background()

    database.UpdateProduct(ctx, "0004")
    
}

正如@Richard Huxton 所说,将 tx 传递给函数 f

步骤如下:

  1. struct txAdmin 上添加一个字段以容纳 *sql.Tx,因此 txAdminDBTx 个字段
  2. Transaction 内将 tx 设置为 *txAdmin.Tx
  3. UpdateProduct 中对每个查询使用 *Service.tx.Tx

因此最终代码如下所示:

package main

import (
    "context"
    "database/sql"
    "fmt"
    "log"

    _ "github.com/jackc/pgx/v4/stdlib"
)

// transaction-wrapper-start
type txAdmin struct {
    *sql.DB
    *sql.Tx
}

type Service struct {
    tx txAdmin
}

func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
    log.Printf("transaction")
    tx, err := t.DB.BeginTx(ctx, nil)
    if err != nil {
        return err
    }

    // set tx to Tx
    t.Tx = tx

    defer tx.Rollback()
    if err := f(ctx); err != nil {
        log.Printf("transaction err")
        return fmt.Errorf("transaction query failed: %w", err)
    }

    log.Printf("commit")
    return tx.Commit()
}

func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
    updateFunc := func(ctx context.Context) error {
        log.Printf("first process")
        // Process A
        if _, err := s.tx.Tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = ", productID); err != nil {
            log.Printf("first err")
            return err
        }
        log.Printf("second process")

        // Process B(They are intentionally failing.)
        if _, err := s.tx.Tx.ExecContext(ctx, "...", productID); err != nil {
            log.Printf("second err")
            return err
        }
        return nil
    }
    log.Printf("update")
    return s.tx.Transaction(ctx, updateFunc)
}

// transaction-wrapper-end
func main() {
    data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
    if nil != err {
        log.Fatal(err)
    }

    database := Service{tx: txAdmin{DB: data}}

    ctx := context.Background()

    database.UpdateProduct(ctx, "0004")
}