etcd WAL 实现

Jul 28, 2021 23:30 · 2316 words · 5 minute read Distributed System

etcd 通过 WAL(预写日志)实现了内存中数据的强持久性,而 WAL 的实现封装在了 wal 包中。

etcd 的 WAL 日志文件被存放在特定的目录下(/var/lib/etcd),由多个分段日志(Segmented Log)组成。通过 Save 方法将 raft 状态和条目追加至每份文件:

metadata := []byte{}
w, err := wal.Create("/var/lib/etcd", metadata)
// ...
err := w.Save(s, ents)

在将 raft 快照保存至磁盘后,要调用 SaveSnapshot 方法来记录下来,这样在重启时 WAL 能够与保存的快照匹配上。

err := w.SaveSnapshot(walpb.Snapshot{Index: 10, Term: 2})

当用户结束使用 WAL 后要显示地将其关闭:

w.Close()

每个 WAL 文件都是一条 WAL 记录流;一条记录由长度字段和 WAL 的记录 protobuf 构成。记录 protobuf 包括 CRC、类型和数据。长度字段有 64 位,较低的 56 位保存剩余逻辑记录数据的长度,最重要的一字节前三位保存物理填充。每条记录都是 8 字节对齐这样长度字段就不会被撕裂。CRC 字段包含除 CRC 外所有数据 protobuf 的 CRC32。

WAL 文件被存放在 ${seq}-${index}.wal 格式的目录中。第一个被创建的 WAL 文件固定为 0000000000000000-0000000000000000.wal,表示初始序列号和 raft 索引都为 0。第一条写入 WAL 记录的 raft 索引必须为 0

当体积超过 64MB 时 WAL 会分段,将会新建一个文件,内部的序列号也会自增。假设最近保存的 raft 索引是 0x20,而这又是第一次 WAL 分段,那么序列号将从 0x01 递增至 0x02,这样拼出来的新文件就是 0000000000000001-0000000000000021.wal。

如果存在快照,就读取快照来恢复状态。

w, err := wal.Open("/var/lib/etcd", walpb.Snapshot{Index: 10, Term: 2})
// ...

前提是这个快照必须已经被写入 WAL 了。

直到快照和 WAL 都被读取了,才能继续追加。

metadata, state, ents, err := w.ReadAll()

二阶段提交

下面我们就来看 etcd v.3.3.25 中 WAL 包的源码:

WAL 结构体的定义 https://github.com/etcd-io/etcd/blob/v3.3.24/wal/wal.go

type WAL struct {
    dir string // the living directory of the underlay files

    // dirFile is a fd for the wal directory for syncing on Rename
    dirFile *os.File

    metadata []byte           // metadata recorded at the head of each WAL
    state    raftpb.HardState // hardstate recorded at the head of WAL

    start     walpb.Snapshot // snapshot to start reading
    decoder   *decoder       // decoder to decode records
    readClose func() error   // closer for decode reader

    mu      sync.Mutex
    enti    uint64   // index of the last entry saved to the wal
    encoder *encoder // encoder to encode records

    locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
    fp    *filePipeline
}

WAL 对象由 Create 函数实例化,我们直接通过 pkg.go.dev 文档中心来查看其使用说明:

func Create(dirpath string, metadata []byte) (*WAL, error)

Create creates a WAL ready for appending records. The given metadata is recorded at the head of each WAL file, and can be retrieved with ReadAll after the file is Open.

WAL 的正确用法是在内存中的数据变更之前先追加 WAL 日志条目,在 etcd v3.3.24 中,WAL 结构的 Save 方法 https://github.com/etcd-io/etcd/blob/v3.3.24/wal/wal.go#L781-#L814

func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error
    w.mu.Lock()
    defer w.mu.Unlock()

    // short cut, do not call sync
    if raft.IsEmptyHardState(st) && len(ents) == 0 {
        return nil
    }

    mustSync := raft.MustSync(st, w.state, len(ents))

    // TODO(xiangli): no more reference operator
    for i := range ents {
        if err := w.saveEntry(&ents[i]); err != nil {
            return err
        }
    }
    if err := w.saveState(&st); err != nil {
        return err
    }

    curOff, err := w.tail().Seek(0, io.SeekCurrent)
    if err != nil {
        return err
    }
    if curOff < SegmentSizeBytes {
        if mustSync {
            return w.sync()
        }
        return nil
    }

    return w.cut()

通过全局搜索可以了解到整个 etcd v3.3.24 项目的源码中调用 wal.Save 方法的地方其实并不多。

我们首先来看 EtcdServer 实例化的函数 etcdserver.NewServer

// EtcdServer is the production implementation of the Server interface
type EtcdServer struct {
    // inflightSnapshots holds count the number of snapshots currently inflight.
    inflightSnapshots int64  // must use atomic operations to access; keep 64-bit aligned.
    appliedIndex      uint64 // must use atomic operations to access; keep 64-bit aligned.
    committedIndex    uint64 // must use atomic operations to access; keep 64-bit aligned.
    // consistIndex used to hold the offset of current executing entry
    // It is initialized to 0 before executing any entry.
    consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned.
    r            raftNode        // uses 64-bit atomics; keep 64-bit aligned.
}

// NewServer creates a new EtcdServer from the supplied configuration. The
// configuration is considered static for the lifetime of the EtcdServer.
func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {

    // a lot of code here

    srv = &EtcdServer{
        readych:     make(chan struct{}),
        Cfg:         cfg,
        errorc:      make(chan error, 1),
        store:       st,
        snapshotter: ss,
        r: *newRaftNode(
            raftNodeConfig{
                isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
                Node:        n,
                heartbeat:   heartbeat,
                raftStorage: s,
                storage:     NewStorage(w, ss), // here
            },
        ),
        id:            id,
        attributes:    membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
        cluster:       cl,
        stats:         sstats,
        lstats:        lstats,
        SyncTicker:    time.NewTicker(500 * time.Millisecond),
        peerRt:        prt,
        reqIDGen:      idutil.NewGenerator(uint16(id), time.Now()),
        forceVersionC: make(chan struct{}),
    }

    // a lot of code here
}

在实例化 EtcdServer 结构时还会一起实例化 raftNode 结构:

type raftNode struct {
    // Cache of the latest raft index and raft term the server has seen.
    // These three unit64 fields must be the first elements to keep 64-bit
    // alignment for atomic access to the fields.
    index uint64
    term  uint64
    lead  uint64

    tickMu *sync.Mutex
    raftNodeConfig
}

type raftNodeConfig struct {
    // to check if msg receiver is removed from cluster
    isIDRemoved func(id uint64) bool
    raft.Node
    raftStorage *raft.MemoryStorage
    storage     Storage
    heartbeat   time.Duration // for logging
    // transport specifies the transport to send and receive msgs to members.
    // Sending messages MUST NOT block. It is okay to drop messages, since
    // clients should timeout and reissue their messages.
    // If transport is nil, server will panic.
    transport rafthttp.Transporter
}

我们再来看 etcdserver.NewStorage 函数和它实例化的结构 storage

type storage struct {
    *wal.WAL
    *snap.Snapshotter
}

func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
    return &storage{w, s}
}

storage 结构体内嵌了 wal.WAL 对象,也就是 storage 结构能复用 wal.WAL 对象的所有方法

接着我们在 etcdserver 目录下搜索 .storage.Save(,寻找调用 WAL 的 Save 方法的相关代码 [https://github.com/etcd-io/etcd/blob/v3.3.24/etcdserver/raft.go#L242-L272]:

// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { // step 1
    plog.Fatalf("failed to raft save state and entries %v", err)
}
if !raft.IsEmptyHardState(rd.HardState) {
    proposalsCommitted.Set(float64(rd.HardState.Commit))
}
// gofail: var raftAfterSave struct{}

if !raft.IsEmptySnap(rd.Snapshot) {
    // Force WAL to fsync its hard state before Release() releases
    // old data from the WAL. Otherwise could get an error like:
    // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
    // See https://github.com/etcd-io/etcd/issues/10219 for more details.
    if err := r.storage.Sync(); err != nil {
        plog.Fatalf("failed to sync Raft snapshot %v", err)
    }

    // etcdserver now claim the snapshot has been persisted onto the disk
    notifyc <- struct{}{}

    // gofail: var raftAfterSaveSnap struct{}
    r.raftStorage.ApplySnapshot(rd.Snapshot)
    plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
    // gofail: var raftAfterApplySnap struct{}

    if err := r.storage.Release(rd.Snapshot); err != nil {
        plog.Fatalf("failed to release Raft wal %v", err)
    }
}

r.raftStorage.Append(rd.Entries) // step 2

确实 raft 模块在修改状态机(r.raftStorage.Append(rd.Entries))之前首先持久化 WAL 日志,这是典型的二阶段提交(2-phase commit)的实现。

落盘

下面我们来看 WAL 的 Save 方法的实现:

func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
    w.mu.Lock()
    defer w.mu.Unlock()

    // short cut, do not call sync
    if raft.IsEmptyHardState(st) && len(ents) == 0 {
        return nil
    }

    mustSync := raft.MustSync(st, w.state, len(ents))

    // TODO(xiangli): no more reference operator
    for i := range ents {
        if err := w.saveEntry(&ents[i]); err != nil {
            return err
        }
    }
    if err := w.saveState(&st); err != nil {
        return err
    }

    curOff, err := w.tail().Seek(0, io.SeekCurrent)
    if err != nil {
        return err
    }
    if curOff < SegmentSizeBytes {
        if mustSync {
            return w.sync()
        }
        return nil
    }

    return w.cut()
}

一刀切的互斥锁,会带来极大的性能影响。两个涉及落盘的方法 saveEntrysaveState

都是通过 encoder 将数据编码并持久化至磁盘:

func (e *encoder) encode(rec *walpb.Record) error {
    e.mu.Lock()
    defer e.mu.Unlock()

    e.crc.Write(rec.Data)
    rec.Crc = e.crc.Sum32()

    // a lot of code here
    n, err = e.bw.Write(data)

}

Write-Ahead Log 中说过确保在读取日志时能够识别损坏的日志文件,etcd 通过在写日志时带上 CRC,并在读取日志时校验。

encoderbw 字段是 ioutil.PageWriter 结构:

// Flush flushes buffered data.
func (pw *PageWriter) Flush() error {
    _, err := pw.flush()
    return err
}

// FlushN flushes buffered data and returns the number of written bytes.
func (pw *PageWriter) FlushN() (int, error) {
    return pw.flush()
}

func (pw *PageWriter) flush() (int, error) {
    if pw.bufferedBytes == 0 {
        return 0, nil
    }
    n, err := pw.w.Write(pw.buf[:pw.bufferedBytes])
    pw.pageOffset = (pw.pageOffset + pw.bufferedBytes) % pw.pageBytes
    pw.bufferedBytes = 0
    return n, err
}

PageWriterWrite 方法中,通过 io.WriterWrite 方法将数据写入 wal.Create 函数打开的文件中。