jackc/pgx 查询错误处理避坑

前言

  最近,一个数据查询服务被业务方反馈拿不到数据,但接口响应是成功的,不报错,仔细排查后发现数据查询库用的是 pgx,但 pgx 返回的错误未被处理,导致服务接口没有响应错误。

  在后续的排查过程中,发现这其实不算是 pgx 的问题,而是 database/sql 中的坑,所有涉及用 database/sql 查询的都需要显式处理 rows.Err()。

问题篇

  服务所用 pgx 版本为 4.10.1。查询函数主要用的是 QueryRow(返回一条数据) 和 Query(返回多条数据),更近一步的测试中(人为制造查询错误,eg:锁表)发现,调用 QueryRow 函数的接口,如果发生查询错误的问题,服务接口会正常响应错误。深入 pgx 源码发现,QueryRow 本质是对 Query 的进一步封装,对应的 Scan 函数源码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// QueryRow 返回的 connRow 对应继承自 Query 返回的 connRows 对象
type connRow connRows
func (r *connRow) Scan(dest ...interface{}) (err error) {
// 显式类型转换
rows := (*connRows)(r)

if rows.Err() != nil {
return rows.Err()
}

if !rows.Next() {
if rows.Err() == nil {
return ErrNoRows
}
return rows.Err()
}

rows.Scan(dest...)
rows.Close()
// 关键的错误
return rows.Err()
}

func (rows *connRows) Scan(dest ...interface{}) error {
ci := rows.connInfo
fieldDescriptions := rows.FieldDescriptions()
values := rows.values

if len(fieldDescriptions) != len(values) {
err := errors.Errorf("number of field descriptions must equal number of values, got %d and %d", len(fieldDescriptions), len(values))
rows.fatal(err)
return err
}
if len(fieldDescriptions) != len(dest) {
err := errors.Errorf("number of field descriptions must equal number of destinations, got %d and %d", len(fieldDescriptions), len(dest))
rows.fatal(err)
return err
}

if rows.scanPlans == nil {
rows.scanPlans = make([]pgtype.ScanPlan, len(values))
for i := range dest {
rows.scanPlans[i] = ci.PlanScan(fieldDescriptions[i].DataTypeOID, fieldDescriptions[i].Format, dest[i])
}
}

for i, dst := range dest {
if dst == nil {
continue
}

err := rows.scanPlans[i].Scan(ci, fieldDescriptions[i].DataTypeOID, fieldDescriptions[i].Format, values[i], dst)
if err != nil {
err = scanArgError{col: i, err: err}
rows.fatal(err)
return err
}
}

// 由于返回多行数据,需要多次 Scan, 所以不能返回 rows.Err()
return nil
}

// 有些数据库查询错误,只有在 Close 之后,再调用 rows.Err() 捕获
func (rows *connRows) Close() {
if rows.closed {
return
}

rows.closed = true

if rows.resultReader != nil {
var closeErr error
rows.commandTag, closeErr = rows.resultReader.Close()
if rows.err == nil {
// 赋值错误
rows.err = closeErr
}
}

if rows.multiResultReader != nil {
closeErr := rows.multiResultReader.Close()
if rows.err == nil {
// 赋值错误
rows.err = closeErr
}
}

if rows.logger != nil {
if rows.err == nil {
if rows.logger.shouldLog(LogLevelInfo) {
endTime := time.Now()
rows.logger.log(rows.ctx, LogLevelInfo, "Query", map[string]interface{}{"sql": rows.sql, "args": logQueryArgs(rows.args), "time": endTime.Sub(rows.startTime), "rowCount": rows.rowCount})
}
} else {
if rows.logger.shouldLog(LogLevelError) {
rows.logger.log(rows.ctx, LogLevelError, "Query", map[string]interface{}{"err": rows.err, "sql": rows.sql, "args": logQueryArgs(rows.args)})
}
if rows.err != nil && rows.conn.stmtcache != nil {
rows.conn.stmtcache.StatementErrored(rows.sql, rows.err)
}
}
}
}

  从源码中可看出 QueryRow 的 Scan 函数有一系列的错误处理,而 Query 对应的 Scan 是更底层的函数,返回的仅是 Scan 过程中的错误,其他的错误需要在业务上层处理。Close 函数同样可能会出现错误,需要调用 rows.Err() 主动检查错误(这一步至关重要)。对于 Close 报的错,可以这样处理:

1
2
3
4
5
6
7
8
9
10
11
12
var err error
defer func() {
rows.Close()
closeErr := rows.Err()
if err != nil {
if closeErr != nil {
log.Printf("failed to close rows: %v", err)
}
return
}
err = closeErr
}

  数据库执行 sql 失败的错误(eg:canceling statement due to conflict with recovery),在 Close 后才会暴露出来,所以不处理这个错误,就不会返回错误,但数据又查不到,服务接口也表现为响应成功,导致上层业务误认为数据库里还真没数据。

  最好的方式还是避免每次都手动 Scan,pgx 其实还提供了更上层的函数 QueryFunc,该函数封装了大部分错误处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (c *Conn) QueryFunc(ctx context.Context, sql string, args []interface{}, scans []interface{}, f func(QueryFuncRow) error) (pgconn.CommandTag, error) {
rows, err := c.Query(ctx, sql, args...)
if err != nil {
return nil, err
}
defer rows.Close()

// 最后一次 Next() 会自动调用 Close()
for rows.Next() {
err = rows.Scan(scans...)
if err != nil {
return nil, err
}

err = f(rows)
if err != nil {
return nil, err
}
}

if err := rows.Err(); err != nil {
return nil, err
}

return rows.CommandTag(), nil
}

不过 QueryFunc 函数在新版本中(5.7.2)已被 ForEachRow 替代:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func ForEachRow(rows Rows, scans []any, fn func() error) (pgconn.CommandTag, error) {
defer rows.Close()

for rows.Next() {
err := rows.Scan(scans...)
if err != nil {
return pgconn.CommandTag{}, err
}

err = fn()
if err != nil {
return pgconn.CommandTag{}, err
}
}

if err := rows.Err(); err != nil {
return pgconn.CommandTag{}, err
}

return rows.CommandTag(), nil
}

而 ForEachRow 的使用示例可以看这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (c *Conn) getCompositeFields(ctx context.Context, oid uint32) ([]pgtype.CompositeCodecField, error) {
var typrelid uint32

err := c.QueryRow(ctx, "select typrelid from pg_type where oid=$1", oid).Scan(&typrelid)
if err != nil {
return nil, err
}

var fields []pgtype.CompositeCodecField
var fieldName string
var fieldOID uint32
rows, _ := c.Query(ctx, `select attname, atttypid
from pg_attribute
where attrelid=$1
and not attisdropped
and attnum > 0
order by attnum`,
typrelid,
)
// 这里是示例
_, err = ForEachRow(rows, []any{&fieldName, &fieldOID}, func() error {
dt, ok := c.TypeMap().TypeForOID(fieldOID)
if !ok {
return fmt.Errorf("unknown composite type field OID: %v", fieldOID)
}
fields = append(fields, pgtype.CompositeCodecField{Name: fieldName, Type: dt})
return nil
})
if err != nil {
return nil, err
}

return fields, nil
}

后记

  对于一个不熟悉的底层库,最好的学习方式还是看它的示例代码,库的开发者很难知道用户会踩哪些坑,文档中自然不会有,毕竟当局者迷。只从文档出发,很容易陷进未知的坑里,甚至掉坑里都不知道,业务出问题后,花费大代价排查之后,才知道掉坑里了。陌生的开源库在使用的时候还是先全库 clone 下来,用 api 的时候,就去源码里搜一下,看看开发者写的示例(不管是测试,还是其他地方的调用),当然现在也可以让 AI 先写,人只要再核实一下文档和源码,能节省很多学习的功夫。