Golang SQL 连接池
Jan 17, 2020 23:45 · 3888 words · 8 minute read
连接池是由客户端维护,包含一定数据库连接缓存的“池子”,以便将来有数据库连接请求的时候可以直接复用连接,目的是降低频繁创建和关闭连接的开销。
工作原理
- 建立
- 连接的使用和管理
- 关闭
建立
- 在系统初始化时,连接池会根据系统配置建立
- 在接受客户端查询请求之前并没有真正创建连接
- 驱动注册 _ “github.com/go-sql-driver/mysql”
- 初始化 db,调用 Open 函数,这时候没有真的去获取 db 操作的连接,只是初始化得到一个 db 的数据结构
连接的使用和管理
关闭
- 当应用程序退出时,关闭连接池中的所有连接,释放连接池相关的资源:
- 连接
- 连接请求队列
- connectionOpener(协程)
- connectionResetter(协程)
- connectionCleaner(协程)
database/sql 源码解析
github repo: https://github.com/golang/go/tree/master/src/database/sql
$ tree $GOROOT/src/database/sql
├── convert.go # scan row
├── convert_test.go
├── ctxutil.go # 判断 ctx,然后执行 prepare/exec/query/close 等操作
├── doc.txt
├── driver
│ ├── driver.go # 定义了实现数据库驱动所需的接口,由 sql 包和具体的驱动包来实现
│ ├── types.go # 数据类型的别名和转换
│ └── types_test.go
├── example_cli_test.go
├── example_service_test.go
├── example_test.go
├── fakedb_test.go
├── sql.go # 关于 SQL 数据库的一些通用接口和类型,包括:连接池、数据类型、连接、事务、状态
└── sql_test.go
DB 对象结构
DB 结构是包含至少 0 个连接(也就是连接池)的数据库句柄。
// DB is a database handle representing a pool of zero or more
// underlying connections. It's safe for concurrent use by multiple
// goroutines.
//
// The sql package creates and frees connections automatically; it
// also maintains a free pool of idle connections. If the database has
// a concept of per-connection state, such state can be reliably observed
// within a transaction (Tx) or connection (Conn). Once DB.Begin is called, the
// returned Tx is bound to a single connection. Once Commit or
// Rollback is called on the transaction, that transaction's
// connection is returned to DB's idle connection pool. The pool size
// can be controlled with SetMaxIdleConns.
type DB struct {
// Atomic access only. At top of struct to prevent mis-alignment
// on 32-bit platforms. Of type time.Duration.
waitDuration int64 // 等待新连接的总时间,用于统计
connector driver.Connector // 由数据库驱动实现的连接器
// numClosed is an atomic counter which represents a total number of
// closed connections. Stmt.openStmt checks it before cleaning closed
// connections in Stmt.css.
numClosed uint64 // 关闭的连接数
mu sync.Mutex // 锁
freeConn []*driverConn // 可用连接池
connRequests map[uint64]chan connRequest // 连接请求表,key 是分配的自增键
nextRequest uint64 // 连接请求的自增键
numOpen int // 已经打开 + 即将打开的连接数
// Used to signal the need for new connections
// a goroutine running connectionOpener() reads on this chan and
// maybeOpenNewConnections sends on the chan (one send per needed connection)
// It is closed during db.Close(). The close tells the connectionOpener
// goroutine to exit.
openerCh chan struct{} // 告知 connectionOpener 需要新的连接
resetterCh chan *driverConn // connectionResetter 函数,连接放回连接池的时候会用到
closed bool
dep map[finalCloser]depSet
lastPut map[*driverConn]string // debug 时使用,记录上一个放回的连接
maxIdle int // 连接池大小,默认大小为 2,<= 0 时不使用连接池
maxOpen int // 最大打开的连接数,<= 0 不限制
maxLifetime time.Duration // 一个连接可以被重用的最大时限,也就是它在连接池中的最大存活时间,0 表示可以一直重用
cleanerCh chan struct{} // 告知 connectionCleaner 清理连接
waitCount int64 // 等待的连接总数
maxIdleClosed int64 // 释放连接时,因为连接池已满而被关闭的连接总数
maxLifetimeClosed int64 // 因为超过存活时间而被关闭的连接总数
stop func() // stop cancels the connection opener and the session resetter.
}
driverConn 对象结构
// driverConn wraps a driver.Conn with a mutex, to
// be held during all calls into the Conn. (including any calls onto
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
type driverConn struct {
db *DB // 数据库句柄
createdAt time.Time
sync.Mutex // 锁
ci driver.Conn // 对应具体的连接
closed bool // 是否标记关闭
finalClosed bool // 是否最终关闭
openStmt map[*driverStmt]bool // 在这个连接上打开的状态
lastErr error // connectionResetter 的返回结果
// guarded by db.mu
inUse bool // 连接是否占用
onPut []func() // 连接归还时要运行的函数,在 noteUnusedDriverStatement 添加
dbmuClosed bool // 和 closed 状态一致,但是由锁保护,用于 removeClosedStmtLocked
}
获取连接
https://github.com/golang/go/blob/master/src/database/sql/sql.go#L1131-L1246
// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
db.mu.Lock() // 获取连接要对整个连接池加锁
if db.closed {
// 连接池已经被关闭
db.mu.Unlock()
return nil, errDBClosed
}
// Check if the context is expired.
// 检查连接请求是被否取消或过期
select {
default:
case <-ctx.Done(): // 连接请求被取消
db.mu.Unlock()
return nil, ctx.Err()
}
lifetime := db.maxLifetime
// Prefer a free connection, if possible.
// 优先使用空闲连接
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
conn := db.freeConn[0] // 取走第一个空闲连接
copy(db.freeConn, db.freeConn[1:]) // 将 1 ~ n-1 的连接拷贝到 0 ~ n-2,也就是左移
db.freeConn = db.freeConn[:numFree-1] // 删除 n-1,表示从 n 个空闲连接中取走了一个
conn.inUse = true // 将取走的连接标记为使用中
db.mu.Unlock()
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
// Lock around reading lastErr to ensure the session resetter finished.
conn.Lock()
err := conn.lastErr // 检查这个连接会话重置是否完成
conn.Unlock()
if err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
}
// Out of free connections or we were asked not to use one. If we're not
// allowed to open any more connections, make a request and wait.
// 设置了最大连接数
// 如果已打开的连接数 >= 最大连接数,就发一个连接请求
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
// Make the connRequest channel. It's buffered so that the
// connectionOpener doesn't block while waiting for the req to be read.
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req // 新增一个连接请求
db.waitCount++ // 等待连接数 +1
db.mu.Unlock()
waitStart := time.Now() // 开始计算连接等待时间
// Timeout the connection request with the context.
select {
case <-ctx.Done(): // 超时
// Remove the connection request and ensure no value has been sent
// on it after removing.
db.mu.Lock()
delete(db.connRequests, reqKey) // 删掉连接请求
db.mu.Unlock()
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
select {
default:
case ret, ok := <-req: // 连接来了
if ok && ret.conn != nil {
db.putConn(ret.conn, ret.err, false) // 把连接放回连接池,而且不用重置会话
}
}
return nil, ctx.Err()
case ret, ok := <-req: // 连接来了
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
if !ok { // 连接被关闭
return nil, errDBClosed // 说明 DB 已经关闭了
}
if ret.err == nil && ret.conn.expired(lifetime) { // 检查连接是否过期
ret.conn.Close() // 关掉连接
return nil, driver.ErrBadConn
}
if ret.conn == nil {
return nil, ret.err
}
// Lock around reading lastErr to ensure the session resetter finished.
ret.conn.Lock()
err := ret.conn.lastErr // 检查这个连接会话重置是否完成
ret.conn.Unlock()
if err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
}
db.numOpen++ // optimistically
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
if err != nil {
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
inUse: true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil
}
从连接池中获取连接时首先要对整个连接池加锁,如果连接池已经事先被关掉了,直接返回 errDBClosed
错误。如果连接池无恙,将会评估连接请求是否取消或过期。
尽可能优先使用空闲的连接而不是新建一条连接(这也是连接池存在的意义)。看一下是否还剩下空闲连接,如果还有余粮,就取第 0 条连接出来,然后左移所有连接填补空位。这里对连接本身操作都会上锁。
如果没有空闲连接了,而且已打开的 + 即将打开的连接数超过了限定的最大打开的连接数,就要发送一条连接请求然后排队(不会新建连接)。等待排队期间同时监听连接请求是否取消或过期,如果此时连接被取消很不巧正好有连接来了,就将连接放回连接池中;如果等着等着连接来了,会先检查这个连接的上一次会话是否被重置(擦屁股),确认没问题就用这条连接。
如果还没到限定的最大打开的连接数,放心大胆的新建一条!
// Assumes db.mu is locked.
// If there are connRequests and the connection limit hasn't been reached,
// then tell the connectionOpener to open new connections.
// 如果有连接请求,并且没有达到连接数的限制,告知 connectionOpener 打开新的连接
func (db *DB) maybeOpenNewConnections() {
numRequests := len(db.connRequests)
if db.maxOpen > 0 {
numCanOpen := db.maxOpen - db.numOpen
if numRequests > numCanOpen { // 连接请求超过限制,即最大打开连接数 - 已打开的连接数
numRequests = numCanOpen // 以限制为准
}
}
for numRequests > 0 {
db.numOpen++ // optimistically
numRequests--
if db.closed {
return
}
db.openerCh <- struct{}{} // 告知 connectionOpener 打开新的连接,由 connectionOpener 执行打开连接的操作
}
}
连接回池
https://github.com/golang/go/blob/master/src/database/sql/sql.go#L1277-L1343
// putConn adds a connection to the db's free pool.
// err is optionally the last error that occurred on this connection.
// 业务完成,将连接放回连接池
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
db.mu.Lock()
if !dc.inUse { // 检查连接是否在使用中
if debugGetPut {
fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
}
panic("sql: connection returned that was never out")
}
if debugGetPut {
db.lastPut[dc] = stack()
}
dc.inUse = false
for _, fn := range dc.onPut {
fn()
}
dc.onPut = nil
if err == driver.ErrBadConn { // 连接有问题
// Don't reuse bad connections.
// Since the conn is considered bad and is being discarded, treat it
// as closed. Don't decrement the open count here, finalClose will
// take care of that.
db.maybeOpenNewConnections()
db.mu.Unlock()
dc.Close()
return
}
if putConnHook != nil {
putConnHook(db, dc)
}
if db.closed {
// Connections do not need to be reset if they will be closed.
// Prevents writing to resetterCh after the DB has closed.
// DB 关闭时,resetterCh 也关闭,如果还执行会话重置操作会导致写已关闭的 channel
resetSession = false
}
if resetSession {
if _, resetSession = dc.ci.(driver.SessionResetter); resetSession { // 检查驱动是否实现了会话重置接口
// Lock the driverConn here so it isn't released until
// the connection is reset.
// The lock must be taken before the connection is put into
// the pool to prevent it from being taken out before it is reset.
dc.Lock()
}
}
added := db.putConnDBLocked(dc, nil) // 函数名带 Locked 表示已经加过锁
db.mu.Unlock()
if !added {
if resetSession {
dc.Unlock()
}
dc.Close() // 回池出现问题,强行关闭连接
return
}
if !resetSession {
return
}
select {
default:
// If the resetterCh is blocking then mark the connection
// as bad and continue on.
// 如果 resetterCh 阻塞,将连接标记为 BadConn
dc.lastErr = driver.ErrBadConn
dc.Unlock()
case db.resetterCh <- dc:
}
}
将连接放回连接池照样要上锁。放回连接池的连接一定是还在使用中的状态。
如果回来的连接有问题,以后是不能被重用的,直接关掉。
存在 hook 的话要执行一下。
如果数据库都被关了,重置会话也就没有意义了。接着就是真正的将连接放回连接池,或直接发送给正在等待的连接请求。
最后还要评估一下重置连接时是否有问题。
https://github.com/golang/go/blob/master/src/database/sql/sql.go#L1354-L1385
// Satisfy a connRequest or put the driverConn in the idle pool and return true
// or return false.
// putConnDBLocked will satisfy a connRequest if there is one, or it will
// return the *driverConn to the freeConn list if err == nil and the idle
// connection limit will not be exceeded.
// If err != nil, the value of dc is ignored.
// If err == nil, then dc must not equal nil.
// If a connRequest was fulfilled or the *driverConn was placed in the
// freeConn list, then true is returned, otherwise false is returned.
// 尝试将 driverConn 放到连接池或者满足一个连接请求
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.closed {
return false
}
// 即将打开 + 已打开的连接数超过了最大打开的连接数
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests { // 从表中随便拿一个
break
}
delete(db.connRequests, reqKey) // Remove from pending requests.
if err == nil {
dc.inUse = true // 连接即将被复用
}
// 将连接发送给连接请求
req <- connRequest{
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed {
// 如果连接池 < 最大闲散连接数,放回连接池
if db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc) // 放回连接池中
db.startCleanerLocked()
return true
}
db.maxIdleClosed++
}
return false
}
即将打开 + 已打开的连接数不能超过最大打开的连接数限制。如果有连接请求,就从表中随便拿一个,拿出来的要从表中删掉,并把准备回池的连接直接发送给这个请求;没有连接请求的话就正常回池。
connectionOpener
- 用于处理连接请求 openerCh
- 如果没有设置 maxIdleClosed,则不会有 openerCh,也不会用到 connectionOpener
- 只有调用 maybeOpenNewConnections 时检查有没有 connRequests,有则触发 connectionOpener
connectionCleaner
- 用于定期清理连接池中的闲散连接
- 如果没有设置 maxLifetime 则不会启动
- 在将连接放回连接池、设置连接最大存活时间的时候执行检查,符合条件则启动
connectionResetter
- 将连接回池的时候检查,重置连接
- 连接池从逻辑上来说也是一个会话池,应当有类似重置会话的功能,避免损坏的连接回池
- 需要数据库驱动支持
连接池使用技巧
1. 连接池默认大小 defaultMaxIdleConns
- 连接池默认大小为 2
- 连接池太小会导致有太多方生方死的连接
- maxIdleClosed 增长会很快
2. 连接池状态 DBStats
- 定期获取 DBStats 了解连接池的基本信息
- 除了 OpenConnections 之外其他都是 Go 1.11 加的
3. 并发安全
连接池是并发安全的,但连接不是。
如果使用同一个连接,在一个事务里面,不要使用多个 goroutine 去操作这个连接。
4. 连接失效
- 如果连接是客户端主动关闭的,那会在写包的时候返回 ErrBadConn,连接池会在重试次数内获取新的连接
- 如果连接是服务器主动关闭的,客户端并不知道,拿到连接后写包不会报错,但是在读服务器的 response 包的时候会有 unexpected EOF 错误。
4.1 设置 maxLifetime
- DB 定期清理连接池中的过期连接
- 如果没有设置 maxLifetime,表示连接池中的连接可以一直复用,如果服务器关闭了这条连接,连接池是不知道的,返回给客户端的是一条已经关闭的连接。
- 获取数据库服务器的 wait_timeout,然后设置 maxLifetime 比这个数值小 10s 左右。
4.2 检查连接的有效性
- MySQL 推荐在获取连接时、回池时、定期检查