回滚不适用于 Go 语言事务包装器
Rollback does not work well with Go language transactional wrapper
最近开始学习围棋
我发现了以下 Github 数据库事务处理包装器的实现,并决定尝试一下。
我正在使用 PostgreSQL 作为数据库。
最初,它包含以下数据。
testdb=> select * from products;
product_id | price
------------+-------
0001 | 200
0002 | 100
0003 | 150
0004 | 300
(4 rows)
进程A成功后,故意让进程B失败,期望事务A回滚。但是,当我们 运行 它时,回滚不会发生,我们最终得到以下
事实上,因为B失败了,应该回滚进程A,数据库值应该没有变化。
我已经在一些地方插入了日志来确认这一点,但我不确定。为什么回滚没有执行?
package main
import (
"context"
"database/sql"
"fmt"
"log"
_ "github.com/jackc/pgx/v4/stdlib"
)
// transaction-wrapper-start
type txAdmin struct {
*sql.DB
}
type Service struct {
tx txAdmin
}
func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
log.Printf("transaction")
tx, err := t.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
if err := f(ctx); err != nil {
log.Printf("transaction err")
return fmt.Errorf("transaction query failed: %w", err)
}
log.Printf("commit")
return tx.Commit()
}
func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
updateFunc := func(ctx context.Context) error {
log.Printf("first process")
// Process A
if _, err := s.tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = ", productID); err != nil {
log.Printf("first err")
return err
}
log.Printf("second process")
// Process B(They are intentionally failing.)
if _, err := s.tx.ExecContext(ctx, "...", productID); err != nil {
log.Printf("second err")
return err
}
return nil
}
log.Printf("update")
return s.tx.Transaction(ctx, updateFunc)
}
// transaction-wrapper-end
func main() {
data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
if nil != err {
log.Fatal(err)
}
database := Service {tx: txAdmin{data}}
ctx := context.Background()
database.UpdateProduct(ctx, "0004")
}
输出
2022/05/26 13:28:55 update
2022/05/26 13:28:55 transaction
2022/05/26 13:28:55 first process
2022/05/26 13:28:55 second process
2022/05/26 13:28:55 second err
2022/05/26 13:28:55 transaction err
数据库更改(如果回滚有效,id 0004 的价格应保持为 300。)
testdb=> select * from products;
product_id | price
------------+-------
0001 | 200
0002 | 100
0003 | 150
0004 | 200
(4 rows)
请告诉我如何使用包装器正确处理交易。
=========
PS。
以下没有包装器的代码可以正常工作。
package main
import (
"context"
"database/sql"
"log"
_ "github.com/jackc/pgx/v4/stdlib"
)
// transaction-defer-start
type Service struct {
db *sql.DB
}
func (s *Service) UpdateProduct(ctx context.Context, productID string) (err error) {
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if _, err = tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = ", productID); err != nil {
log.Println("update err")
return err
}
if _, err = tx.ExecContext(ctx, "...", productID); err != nil {
log.Println("update err")
return err
}
return tx.Commit()
}
// transaction-defer-end
func main() {
var database Service
dbConn, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=passs sslmode=disable")
if nil != err {
log.Fatal(err)
}
database.db = dbConn
ctx := context.Background()
database.UpdateProduct(ctx, "0004")
}
正如@Richard Huxton 所说,将 tx
传递给函数 f
步骤如下:
- 在
struct txAdmin
上添加一个字段以容纳 *sql.Tx
,因此 txAdmin
有 DB
和 Tx
个字段
- 在
Transaction
内将 tx
设置为 *txAdmin.Tx
- 在
UpdateProduct
中对每个查询使用 *Service.tx.Tx
因此最终代码如下所示:
package main
import (
"context"
"database/sql"
"fmt"
"log"
_ "github.com/jackc/pgx/v4/stdlib"
)
// transaction-wrapper-start
type txAdmin struct {
*sql.DB
*sql.Tx
}
type Service struct {
tx txAdmin
}
func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
log.Printf("transaction")
tx, err := t.DB.BeginTx(ctx, nil)
if err != nil {
return err
}
// set tx to Tx
t.Tx = tx
defer tx.Rollback()
if err := f(ctx); err != nil {
log.Printf("transaction err")
return fmt.Errorf("transaction query failed: %w", err)
}
log.Printf("commit")
return tx.Commit()
}
func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
updateFunc := func(ctx context.Context) error {
log.Printf("first process")
// Process A
if _, err := s.tx.Tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = ", productID); err != nil {
log.Printf("first err")
return err
}
log.Printf("second process")
// Process B(They are intentionally failing.)
if _, err := s.tx.Tx.ExecContext(ctx, "...", productID); err != nil {
log.Printf("second err")
return err
}
return nil
}
log.Printf("update")
return s.tx.Transaction(ctx, updateFunc)
}
// transaction-wrapper-end
func main() {
data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
if nil != err {
log.Fatal(err)
}
database := Service{tx: txAdmin{DB: data}}
ctx := context.Background()
database.UpdateProduct(ctx, "0004")
}
最近开始学习围棋
我发现了以下 Github 数据库事务处理包装器的实现,并决定尝试一下。
我正在使用 PostgreSQL 作为数据库。
最初,它包含以下数据。
testdb=> select * from products;
product_id | price
------------+-------
0001 | 200
0002 | 100
0003 | 150
0004 | 300
(4 rows)
进程A成功后,故意让进程B失败,期望事务A回滚。但是,当我们 运行 它时,回滚不会发生,我们最终得到以下
事实上,因为B失败了,应该回滚进程A,数据库值应该没有变化。
我已经在一些地方插入了日志来确认这一点,但我不确定。为什么回滚没有执行?
package main
import (
"context"
"database/sql"
"fmt"
"log"
_ "github.com/jackc/pgx/v4/stdlib"
)
// transaction-wrapper-start
type txAdmin struct {
*sql.DB
}
type Service struct {
tx txAdmin
}
func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
log.Printf("transaction")
tx, err := t.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
if err := f(ctx); err != nil {
log.Printf("transaction err")
return fmt.Errorf("transaction query failed: %w", err)
}
log.Printf("commit")
return tx.Commit()
}
func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
updateFunc := func(ctx context.Context) error {
log.Printf("first process")
// Process A
if _, err := s.tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = ", productID); err != nil {
log.Printf("first err")
return err
}
log.Printf("second process")
// Process B(They are intentionally failing.)
if _, err := s.tx.ExecContext(ctx, "...", productID); err != nil {
log.Printf("second err")
return err
}
return nil
}
log.Printf("update")
return s.tx.Transaction(ctx, updateFunc)
}
// transaction-wrapper-end
func main() {
data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
if nil != err {
log.Fatal(err)
}
database := Service {tx: txAdmin{data}}
ctx := context.Background()
database.UpdateProduct(ctx, "0004")
}
输出
2022/05/26 13:28:55 update
2022/05/26 13:28:55 transaction
2022/05/26 13:28:55 first process
2022/05/26 13:28:55 second process
2022/05/26 13:28:55 second err
2022/05/26 13:28:55 transaction err
数据库更改(如果回滚有效,id 0004 的价格应保持为 300。)
testdb=> select * from products;
product_id | price
------------+-------
0001 | 200
0002 | 100
0003 | 150
0004 | 200
(4 rows)
请告诉我如何使用包装器正确处理交易。
========= PS。 以下没有包装器的代码可以正常工作。
package main
import (
"context"
"database/sql"
"log"
_ "github.com/jackc/pgx/v4/stdlib"
)
// transaction-defer-start
type Service struct {
db *sql.DB
}
func (s *Service) UpdateProduct(ctx context.Context, productID string) (err error) {
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if _, err = tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = ", productID); err != nil {
log.Println("update err")
return err
}
if _, err = tx.ExecContext(ctx, "...", productID); err != nil {
log.Println("update err")
return err
}
return tx.Commit()
}
// transaction-defer-end
func main() {
var database Service
dbConn, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=passs sslmode=disable")
if nil != err {
log.Fatal(err)
}
database.db = dbConn
ctx := context.Background()
database.UpdateProduct(ctx, "0004")
}
正如@Richard Huxton 所说,将 tx
传递给函数 f
步骤如下:
- 在
struct txAdmin
上添加一个字段以容纳*sql.Tx
,因此txAdmin
有DB
和Tx
个字段 - 在
Transaction
内将tx
设置为*txAdmin.Tx
- 在
UpdateProduct
中对每个查询使用*Service.tx.Tx
因此最终代码如下所示:
package main
import (
"context"
"database/sql"
"fmt"
"log"
_ "github.com/jackc/pgx/v4/stdlib"
)
// transaction-wrapper-start
type txAdmin struct {
*sql.DB
*sql.Tx
}
type Service struct {
tx txAdmin
}
func (t *txAdmin) Transaction(ctx context.Context, f func(ctx context.Context) (err error)) error {
log.Printf("transaction")
tx, err := t.DB.BeginTx(ctx, nil)
if err != nil {
return err
}
// set tx to Tx
t.Tx = tx
defer tx.Rollback()
if err := f(ctx); err != nil {
log.Printf("transaction err")
return fmt.Errorf("transaction query failed: %w", err)
}
log.Printf("commit")
return tx.Commit()
}
func (s *Service) UpdateProduct(ctx context.Context, productID string) error {
updateFunc := func(ctx context.Context) error {
log.Printf("first process")
// Process A
if _, err := s.tx.Tx.ExecContext(ctx, "UPDATE products SET price = 200 WHERE product_id = ", productID); err != nil {
log.Printf("first err")
return err
}
log.Printf("second process")
// Process B(They are intentionally failing.)
if _, err := s.tx.Tx.ExecContext(ctx, "...", productID); err != nil {
log.Printf("second err")
return err
}
return nil
}
log.Printf("update")
return s.tx.Transaction(ctx, updateFunc)
}
// transaction-wrapper-end
func main() {
data, err := sql.Open("pgx", "host=localhost port=5432 user=testuser dbname=testdb password=password sslmode=disable")
if nil != err {
log.Fatal(err)
}
database := Service{tx: txAdmin{DB: data}}
ctx := context.Background()
database.UpdateProduct(ctx, "0004")
}