Golang SQL 连接池

Jan 17, 2020 23:45 · 3888 words · 8 minute read Golang Database

连接池是由客户端维护,包含一定数据库连接缓存的“池子”,以便将来有数据库连接请求的时候可以直接复用连接,目的是降低频繁创建和关闭连接的开销

工作原理

  • 建立
  • 连接的使用和管理
  • 关闭

建立

  • 在系统初始化时,连接池会根据系统配置建立
  • 在接受客户端查询请求之前并没有真正创建连接
  • 驱动注册 _ “github.com/go-sql-driver/mysql”
  • 初始化 db,调用 Open 函数,这时候没有真的去获取 db 操作的连接,只是初始化得到一个 db 的数据结构

连接的使用和管理

关闭

  • 当应用程序退出时,关闭连接池中的所有连接,释放连接池相关的资源:
    • 连接
    • 连接请求队列
    • connectionOpener(协程)
    • connectionResetter(协程)
    • connectionCleaner(协程)

database/sql 源码解析

github repo: https://github.com/golang/go/tree/master/src/database/sql

$ tree $GOROOT/src/database/sql
├── convert.go # scan row
├── convert_test.go
├── ctxutil.go # 判断 ctx,然后执行 prepare/exec/query/close 等操作
├── doc.txt
├── driver
│   ├── driver.go # 定义了实现数据库驱动所需的接口,由 sql 包和具体的驱动包来实现
│   ├── types.go # 数据类型的别名和转换
│   └── types_test.go
├── example_cli_test.go
├── example_service_test.go
├── example_test.go
├── fakedb_test.go
├── sql.go # 关于 SQL 数据库的一些通用接口和类型,包括:连接池、数据类型、连接、事务、状态
└── sql_test.go

DB 对象结构

DB 结构是包含至少 0 个连接(也就是连接池)的数据库句柄

// DB is a database handle representing a pool of zero or more
// underlying connections. It's safe for concurrent use by multiple
// goroutines.
//
// The sql package creates and frees connections automatically; it
// also maintains a free pool of idle connections. If the database has
// a concept of per-connection state, such state can be reliably observed
// within a transaction (Tx) or connection (Conn). Once DB.Begin is called, the
// returned Tx is bound to a single connection. Once Commit or
// Rollback is called on the transaction, that transaction's
// connection is returned to DB's idle connection pool. The pool size
// can be controlled with SetMaxIdleConns.
type DB struct {
    // Atomic access only. At top of struct to prevent mis-alignment
    // on 32-bit platforms. Of type time.Duration.
    waitDuration int64 // 等待新连接的总时间,用于统计

    connector driver.Connector // 由数据库驱动实现的连接器
    // numClosed is an atomic counter which represents a total number of
    // closed connections. Stmt.openStmt checks it before cleaning closed
    // connections in Stmt.css.
    numClosed uint64 // 关闭的连接数

    mu           sync.Mutex // 锁
    freeConn     []*driverConn // 可用连接池
    connRequests map[uint64]chan connRequest // 连接请求表,key 是分配的自增键
    nextRequest  uint64 // 连接请求的自增键
    numOpen      int    // 已经打开 + 即将打开的连接数
    // Used to signal the need for new connections
    // a goroutine running connectionOpener() reads on this chan and
    // maybeOpenNewConnections sends on the chan (one send per needed connection)
    // It is closed during db.Close(). The close tells the connectionOpener
    // goroutine to exit.
    openerCh          chan struct{} // 告知 connectionOpener 需要新的连接
    resetterCh        chan *driverConn // connectionResetter 函数,连接放回连接池的时候会用到
    closed            bool
    dep               map[finalCloser]depSet
    lastPut           map[*driverConn]string // debug 时使用,记录上一个放回的连接
    maxIdle           int                    // 连接池大小,默认大小为 2,<= 0 时不使用连接池
    maxOpen           int                    // 最大打开的连接数,<= 0 不限制
    maxLifetime       time.Duration          // 一个连接可以被重用的最大时限,也就是它在连接池中的最大存活时间,0 表示可以一直重用
    cleanerCh         chan struct{} // 告知 connectionCleaner 清理连接
    waitCount         int64 // 等待的连接总数
    maxIdleClosed     int64 // 释放连接时,因为连接池已满而被关闭的连接总数
    maxLifetimeClosed int64 // 因为超过存活时间而被关闭的连接总数

    stop func() // stop cancels the connection opener and the session resetter.
}

driverConn 对象结构

// driverConn wraps a driver.Conn with a mutex, to
// be held during all calls into the Conn. (including any calls onto
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
type driverConn struct {
    db        *DB // 数据库句柄
    createdAt time.Time

    sync.Mutex  // 锁
    ci          driver.Conn // 对应具体的连接
    closed      bool // 是否标记关闭
    finalClosed bool // 是否最终关闭
    openStmt    map[*driverStmt]bool // 在这个连接上打开的状态
    lastErr     error // connectionResetter 的返回结果

    // guarded by db.mu
    inUse      bool // 连接是否占用
    onPut      []func() // 连接归还时要运行的函数,在 noteUnusedDriverStatement 添加
    dbmuClosed bool     // 和 closed 状态一致,但是由锁保护,用于 removeClosedStmtLocked
}

获取连接

https://github.com/golang/go/blob/master/src/database/sql/sql.go#L1131-L1246

// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    db.mu.Lock() // 获取连接要对整个连接池加锁
    if db.closed {
        // 连接池已经被关闭
        db.mu.Unlock()
        return nil, errDBClosed
    }
    // Check if the context is expired.
    // 检查连接请求是被否取消或过期
    select {
    default:
    case <-ctx.Done(): // 连接请求被取消
        db.mu.Unlock()
        return nil, ctx.Err()
    }
    lifetime := db.maxLifetime

    // Prefer a free connection, if possible.
    // 优先使用空闲连接
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
        conn := db.freeConn[0] // 取走第一个空闲连接
        copy(db.freeConn, db.freeConn[1:]) // 将 1 ~ n-1 的连接拷贝到 0 ~ n-2,也就是左移
        db.freeConn = db.freeConn[:numFree-1] // 删除 n-1,表示从 n 个空闲连接中取走了一个
        conn.inUse = true // 将取走的连接标记为使用中
        db.mu.Unlock()
        if conn.expired(lifetime) {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        // Lock around reading lastErr to ensure the session resetter finished.
        conn.Lock()
        err := conn.lastErr // 检查这个连接会话重置是否完成
        conn.Unlock()
        if err == driver.ErrBadConn {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        return conn, nil
    }

    // Out of free connections or we were asked not to use one. If we're not
    // allowed to open any more connections, make a request and wait.
    // 设置了最大连接数
    // 如果已打开的连接数 >= 最大连接数,就发一个连接请求
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        // Make the connRequest channel. It's buffered so that the
        // connectionOpener doesn't block while waiting for the req to be read.
        req := make(chan connRequest, 1)
        reqKey := db.nextRequestKeyLocked()
        db.connRequests[reqKey] = req // 新增一个连接请求
        db.waitCount++ // 等待连接数 +1
        db.mu.Unlock()

        waitStart := time.Now() // 开始计算连接等待时间

        // Timeout the connection request with the context.
        select {
        case <-ctx.Done(): // 超时
            // Remove the connection request and ensure no value has been sent
            // on it after removing.
            db.mu.Lock()
            delete(db.connRequests, reqKey) // 删掉连接请求
            db.mu.Unlock()

            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

            select {
            default:
            case ret, ok := <-req: // 连接来了
                if ok && ret.conn != nil {
                    db.putConn(ret.conn, ret.err, false) // 把连接放回连接池,而且不用重置会话
                }
            }
            return nil, ctx.Err()
        case ret, ok := <-req: // 连接来了
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

            if !ok { // 连接被关闭
                return nil, errDBClosed // 说明 DB 已经关闭了
            }
            if ret.err == nil && ret.conn.expired(lifetime) { // 检查连接是否过期
                ret.conn.Close() // 关掉连接
                return nil, driver.ErrBadConn
            }
            if ret.conn == nil {
                return nil, ret.err
            }
            // Lock around reading lastErr to ensure the session resetter finished.
            ret.conn.Lock()
            err := ret.conn.lastErr // 检查这个连接会话重置是否完成
            ret.conn.Unlock()
            if err == driver.ErrBadConn {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            return ret.conn, ret.err
        }
    }

    db.numOpen++ // optimistically
    db.mu.Unlock()
    ci, err := db.connector.Connect(ctx)
    if err != nil {
        db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
        inUse:     true,
    }
    db.addDepLocked(dc, dc)
    db.mu.Unlock()
    return dc, nil
}

从连接池中获取连接时首先要对整个连接池加锁,如果连接池已经事先被关掉了,直接返回 errDBClosed 错误。如果连接池无恙,将会评估连接请求是否取消或过期。

尽可能优先使用空闲的连接而不是新建一条连接(这也是连接池存在的意义)。看一下是否还剩下空闲连接,如果还有余粮,就取第 0 条连接出来,然后左移所有连接填补空位。这里对连接本身操作都会上锁

如果没有空闲连接了,而且已打开的 + 即将打开的连接数超过了限定的最大打开的连接数,就要发送一条连接请求然后排队(不会新建连接)。等待排队期间同时监听连接请求是否取消或过期,如果此时连接被取消很不巧正好有连接来了,就将连接放回连接池中;如果等着等着连接来了,会先检查这个连接的上一次会话是否被重置(擦屁股),确认没问题就用这条连接。

如果还没到限定的最大打开的连接数,放心大胆的新建一条!

https://github.com/golang/go/blob/574c286607015297e35b7c02c793038fd827e59b/src/database/sql/sql.go#L1031-L1047

// Assumes db.mu is locked.
// If there are connRequests and the connection limit hasn't been reached,
// then tell the connectionOpener to open new connections.
// 如果有连接请求,并且没有达到连接数的限制,告知 connectionOpener 打开新的连接
func (db *DB) maybeOpenNewConnections() {
    numRequests := len(db.connRequests)
    if db.maxOpen > 0 {
        numCanOpen := db.maxOpen - db.numOpen
        if numRequests > numCanOpen { // 连接请求超过限制,即最大打开连接数 - 已打开的连接数
            numRequests = numCanOpen // 以限制为准
        }
    }
    for numRequests > 0 {
        db.numOpen++ // optimistically
        numRequests--
        if db.closed {
            return
        }
        db.openerCh <- struct{}{} // 告知 connectionOpener 打开新的连接,由 connectionOpener 执行打开连接的操作
    }
}

连接回池

https://github.com/golang/go/blob/master/src/database/sql/sql.go#L1277-L1343

// putConn adds a connection to the db's free pool.
// err is optionally the last error that occurred on this connection.
// 业务完成,将连接放回连接池
func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
    db.mu.Lock()
    if !dc.inUse { // 检查连接是否在使用中
        if debugGetPut {
            fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
        }
        panic("sql: connection returned that was never out")
    }
    if debugGetPut {
        db.lastPut[dc] = stack()
    }
    dc.inUse = false

    for _, fn := range dc.onPut {
        fn()
    }
    dc.onPut = nil

    if err == driver.ErrBadConn { // 连接有问题
        // Don't reuse bad connections.
        // Since the conn is considered bad and is being discarded, treat it
        // as closed. Don't decrement the open count here, finalClose will
        // take care of that.
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        dc.Close()
        return
    }
    if putConnHook != nil {
        putConnHook(db, dc)
    }
    if db.closed {
        // Connections do not need to be reset if they will be closed.
        // Prevents writing to resetterCh after the DB has closed.
        // DB 关闭时,resetterCh 也关闭,如果还执行会话重置操作会导致写已关闭的 channel
        resetSession = false
    }
    if resetSession {
        if _, resetSession = dc.ci.(driver.SessionResetter); resetSession { // 检查驱动是否实现了会话重置接口
            // Lock the driverConn here so it isn't released until
            // the connection is reset.
            // The lock must be taken before the connection is put into
            // the pool to prevent it from being taken out before it is reset.
            dc.Lock()
        }
    }
    added := db.putConnDBLocked(dc, nil) // 函数名带 Locked 表示已经加过锁
    db.mu.Unlock()

    if !added {
        if resetSession {
            dc.Unlock()
        }
        dc.Close() // 回池出现问题,强行关闭连接
        return
    }
    if !resetSession {
        return
    }
    select {
    default:
        // If the resetterCh is blocking then mark the connection
        // as bad and continue on.
        // 如果 resetterCh 阻塞,将连接标记为 BadConn
        dc.lastErr = driver.ErrBadConn
        dc.Unlock()
    case db.resetterCh <- dc:
    }
}

将连接放回连接池照样要上锁。放回连接池的连接一定是还在使用中的状态

如果回来的连接有问题,以后是不能被重用的,直接关掉。

存在 hook 的话要执行一下。

如果数据库都被关了,重置会话也就没有意义了。接着就是真正的将连接放回连接池,或直接发送给正在等待的连接请求。

最后还要评估一下重置连接时是否有问题。

https://github.com/golang/go/blob/master/src/database/sql/sql.go#L1354-L1385

// Satisfy a connRequest or put the driverConn in the idle pool and return true
// or return false.
// putConnDBLocked will satisfy a connRequest if there is one, or it will
// return the *driverConn to the freeConn list if err == nil and the idle
// connection limit will not be exceeded.
// If err != nil, the value of dc is ignored.
// If err == nil, then dc must not equal nil.
// If a connRequest was fulfilled or the *driverConn was placed in the
// freeConn list, then true is returned, otherwise false is returned.
// 尝试将 driverConn 放到连接池或者满足一个连接请求
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
    if db.closed {
        return false
    }
    // 即将打开 + 已打开的连接数超过了最大打开的连接数
    if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    if c := len(db.connRequests); c > 0 {
        var req chan connRequest
        var reqKey uint64
        for reqKey, req = range db.connRequests { // 从表中随便拿一个
            break
        }
        delete(db.connRequests, reqKey) // Remove from pending requests.
        if err == nil {
            dc.inUse = true // 连接即将被复用
        }
        // 将连接发送给连接请求
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    } else if err == nil && !db.closed {
        // 如果连接池 < 最大闲散连接数,放回连接池
        if db.maxIdleConnsLocked() > len(db.freeConn) {
            db.freeConn = append(db.freeConn, dc) // 放回连接池中
            db.startCleanerLocked()
            return true
        }
        db.maxIdleClosed++
    }
    return false
}

即将打开 + 已打开的连接数不能超过最大打开的连接数限制。如果有连接请求,就从表中随便拿一个,拿出来的要从表中删掉,并把准备回池的连接直接发送给这个请求;没有连接请求的话就正常回池。

connectionOpener

  • 用于处理连接请求 openerCh
  • 如果没有设置 maxIdleClosed,则不会有 openerCh,也不会用到 connectionOpener
  • 只有调用 maybeOpenNewConnections 时检查有没有 connRequests,有则触发 connectionOpener

connectionCleaner

  • 用于定期清理连接池中的闲散连接
  • 如果没有设置 maxLifetime 则不会启动
  • 在将连接放回连接池、设置连接最大存活时间的时候执行检查,符合条件则启动

connectionResetter

  • 将连接回池的时候检查,重置连接
  • 连接池从逻辑上来说也是一个会话池,应当有类似重置会话的功能,避免损坏的连接回池
  • 需要数据库驱动支持

连接池使用技巧

1. 连接池默认大小 defaultMaxIdleConns

  • 连接池默认大小为 2
  • 连接池太小会导致有太多方生方死的连接
  • maxIdleClosed 增长会很快

2. 连接池状态 DBStats

  • 定期获取 DBStats 了解连接池的基本信息
  • 除了 OpenConnections 之外其他都是 Go 1.11 加的

3. 并发安全

连接池是并发安全的,但连接不是。

如果使用同一个连接,在一个事务里面,不要使用多个 goroutine 去操作这个连接。

4. 连接失效

  • 如果连接是客户端主动关闭的,那会在写包的时候返回 ErrBadConn,连接池会在重试次数内获取新的连接
  • 如果连接是服务器主动关闭的,客户端并不知道,拿到连接后写包不会报错,但是在读服务器的 response 包的时候会有 unexpected EOF 错误。

4.1 设置 maxLifetime

  • DB 定期清理连接池中的过期连接
  • 如果没有设置 maxLifetime,表示连接池中的连接可以一直复用,如果服务器关闭了这条连接,连接池是不知道的,返回给客户端的是一条已经关闭的连接。
  • 获取数据库服务器的 wait_timeout,然后设置 maxLifetime 比这个数值小 10s 左右。

4.2 检查连接的有效性

  • MySQL 推荐在获取连接时、回池时、定期检查