心跳

Aug 13, 2021 20:30 · 2310 words · 5 minute read Distributed System

问题

当集群由多台服务器组成,及时发现服务器故障至关重要,确保采取纠正措施让其他服务器来处理对客户端对故障服务器的请求。

解决方案

定期向所有其他服务器发送一个请求,请求间隔时间要大于服务之间来回网络传输时间(round-trip time)。所有服务器的等待心跳超时时间,往往是请求时间间隔的倍数。一般:

超时时间 > 请求时间间隔 > 服务器之间来回的网络传输时间(RTT)

举个栗子,服务器之间的来回网络延迟为 20ms,心跳包则可以每 30ms 发送,服务器在 1s 后才检查,这样有足够的时间发送多个心跳来减少误判。如果在这个时间间隔内没有收到心跳,就会认为发送服务器故障。

etcd 的官方调教文档 https://etcd.io/docs/v3.4/tuning/#time-parameters 中对此也有说明:

Adjusting these values is a trade off. The value of heartbeat interval is recommended to be around the maximum of average round-trip time (RTT) between members, normally around 0.5-1.5x the round-trip time. If heartbeat interval is too low, etcd will send unnecessary messages that increase the usage of CPU and network resources. On the other side, a too high heartbeat interval leads to high election timeout. Higher election timeout takes longer time to detect a leader failure.

发送端

etcd 中发送心跳消息的方法 sendHeartbeat 定义如下 https://github.com/etcd-io/etcd/blob/v3.3.24/raft/raft.go#L486-L544

// sendHeartbeat sends an empty MsgApp
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
    // Attach the commit as min(to.matched, r.committed).
    // When the leader sends out heartbeat message,
    // the receiver(follower) might not be matched with the leader
    // or it might not have all the committed entries.
    // The leader MUST NOT forward the follower's commit to
    // an unmatched index.
    commit := min(r.getProgress(to).Match, r.raftLog.committed)
    m := pb.Message{
        To:      to,
        Type:    pb.MsgHeartbeat,
        Commit:  commit,
        Context: ctx,
    }

    r.send(m)
}

// bcastHeartbeat sends RPC, without entries to all the peers.
func (r *raft) bcastHeartbeat() {
    lastCtx := r.readOnly.lastPendingRequestCtx()
    if len(lastCtx) == 0 {
        r.bcastHeartbeatWithCtx(nil)
    } else {
        r.bcastHeartbeatWithCtx([]byte(lastCtx))
    }
}

func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
    r.forEachProgress(func(id uint64, _ *Progress) {
        if id == r.id {
            return
        }
        r.sendHeartbeat(id, ctx)
    })
}

而发送心跳在 etcd 中是 Leader 的工作 https://github.com/etcd-io/etcd/blob/v3.3.24/raft/raft.go#L876-#1076

func stepLeader(r *raft, m pb.Message) {
    // These message types do not require any progress for m.From.
    switch m.Type {
    case pb.MsgBeat:
        r.bcastHeartbeat()
        return
    // a lot code here
    }
}

func (r *raft) becomeLeader() {
    // TODO(xiangli) remove the panic when the raft implementation is stable
    if r.state == StateFollower {
        panic("invalid transition [follower -> leader]")
    }
    r.step = stepLeader // look here
    r.reset(r.Term)
    r.tick = r.tickHeartbeat
    r.lead = r.id
    r.state = StateLeader
    // a lot of code here
}

func (r *raft) Step(m pb.Message) error {
    // a lot of code here
    switch m.Type {
    case pb.MsgHup:
    case pb.MsgVote, pb.MsgPreVote:
    default:
        r.step(r, m)
    }
    return nil
}

Leader 通过 tickHeartbeat 方法来发送心跳信息 https://github.com/etcd-io/etcd/blob/v3.3.24/raft/raft.go#L607-L631

// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
func (r *raft) tickHeartbeat() {
    r.heartbeatElapsed++
    r.electionElapsed++

    if r.electionElapsed >= r.electionTimeout {
        r.electionElapsed = 0
        if r.checkQuorum {
            r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
        }
        // If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
        if r.state == StateLeader && r.leadTransferee != None {
            r.abortLeaderTransfer()
        }
    }

    if r.state != StateLeader {
        return
    }

    if r.heartbeatElapsed >= r.heartbeatTimeout {
        r.heartbeatElapsed = 0
        r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
    }
}

func (r *raft) becomeLeader() {
    // TODO(xiangli) remove the panic when the raft implementation is stable
    if r.state == StateFollower {
        panic("invalid transition [follower -> leader]")
    }
    r.step = stepLeader
    r.reset(r.Term)
    r.tick = r.tickHeartbeat // look here
    r.lead = r.id
    r.state = StateLeader
    // a lot of code here
}

etcd 中利用定时器 time.Ticker 实现了定时任务 https://github.com/etcd-io/etcd/blob/v3.3.24/etcdserver/raft.go#L90-L152

type raftNode struct {
    raftNodeConfig

    // utility
    ticker *time.Ticker
}

type raftNodeConfig struct {
    heartbeat   time.Duration // for logging
}

func newRaftNode(cfg raftNodeConfig) *raftNode {
    // a lot of code here
    if r.heartbeat == 0 {
        r.ticker = &time.Ticker{}
    } else {
        r.ticker = time.NewTicker(r.heartbeat)
    }
}

// raft.Node does not have locks in Raft package
func (r *raftNode) tick() {
    r.tickMu.Lock()
    r.Tick()
    r.tickMu.Unlock()
}

// start prepares and starts raftNode in a new goroutine. It is no longer safe
// to modify the fields after it has been started.
func (r *raftNode) start(rh *raftReadyHandler) {
    internalTimeout := time.Second

    go func() {
        defer r.onStop()
        islead := false

        for {
            select {
                case <-r.ticker.C:
                r.tick()
                case rd := <-r.Ready():
                case <-r.stopped:
                return
            }
        }
    }
}

在 etcd Node 实例运行起来后会单起一条 goroutine https://github.com/etcd-io/etcd/blob/v3.3.24/raft/node.go#L273-#L393 监听各路消息,当有消息推入 tickc channel 就会触发Node 所对应的 tick() 方法

func (n *node) run(r *raft) {
    // a lot of code here
    for {
        select {
        // TODO: maybe buffer the config propose if there exists one (the way
        // described in raft dissertation)
        // Currently it is dropped in Step silently.
        case m := <-propc:
        case m := <-n.recvc:
        case cc := <-n.confc:
        case <-n.tickc:
            r.tick()
        case readyc <- rd:
        case <-advancec:
        case c := <-n.status:
        case <-n.stop:
        }
    }
}

并且 Node Interface 给了一个 Tick() 方法的实现 https://github.com/etcd-io/etcd/blob/v3.3.24/raft/node.go#L395

// Tick increments the internal logical clock for this Node. Election timeouts
// and heartbeat timeouts are in units of ticks.
func (n *node) Tick() {
    select {
    case n.tickc <- struct{}{}:
    case <-n.done:
    default:
        n.logger.Warningf("A tick missed to fire. Node blocks too long!")
    }
}

当调用 Tick() 方法,会向 tickc channel 发送一个消息。

接收端

etcd 通过 handleHeartbeat 函数处理并应答心跳包 https://github.com/etcd-io/etcd/blob/v3.3.24/raft/raft.go#L1194-L1197

func (r *raft) handleHeartbeat(m pb.Message) {
    r.raftLog.commitTo(m.Commit)
    r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}

而根据 Raft 协议,Leader 向 Follower 发送心跳 https://github.com/etcd-io/etcd/blob/v3.3.24/raft/raft.go#L1122-L1177

func stepFollower(r *raft, m pb.Message) {
    switch m.Type {
    case pb.MsgProp:
    case pb.MsgApp:
    case pb.MsgHeartbeat:
        r.electionElapsed = 0
        r.lead = m.From
        r.handleHeartbeat(m)
    case pb.MsgSnap:
    case pb.MsgTransferLeader:
    case pb.MsgTimeoutNow:
    case pb.MsgReadIndex:
    case pb.MsgReadIndexResp:
    }
}

func (r *raft) becomeFollower(term uint64, lead uint64) {
    r.step = stepFollower
    r.reset(term)
    r.tick = r.tickElection
    r.lead = lead
    r.state = StateFollower
    r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}

何时将服务器判定为故障,需要综合各种标准,权衡利弊。一般心跳时间间隔越小,检测故障的速度越快,但是误判的概率就越高。因此心跳间隔和对丢失心跳的解释跟集群规格息息相关,有两大类:

小型集群(像 etcd 和 Zookeeper 这样基于共识的系统)

所有共识实现中,Leader 服务器发送心跳至 Follower 服务器。每当收到心跳,会记录下到达时间:

class TimeoutBasedFailureDetector

  @Override
  void heartBeatReceived(T serverId) {
    Long currentTime = System.nanoTime();
    heartbeatReceivedTimes.put(serverId, currentTime);
    markUp(serverId);
}

如果在一定时间内未收到心跳,Leader 被认为挂了,会选举新的 Leader。处理过慢或者网络有一定的概率导致误判。所以需要使用全局时钟(Generation Clock)来探测有问题的 Leader。这样系统可用性更好,因为崩溃能够在更小的时间段内被检测到。适用于较小的集群,通常是三到五个节点的规格。大多数共识实现都是这么搞的,比如 Zookeeper 和 etcd。

class TimeoutBasedFailureDetector

@Override
void heartBeatCheck() {
    Long now = System.nanoTime();
    Set<T> serverIds = heartbeatReceivedTimes.keySet();
    for (T serverId : serverIds) {
        Long lastHeartbeatReceivedTime = heartbeatReceivedTimes.get(serverId);
        Long timeSinceLastHeartbeat = now - lastHeartbeatReceivedTime;
        if (timeSinceLastHeartbeat >= timeoutNanos) {
            markDown(serverId);
        }
    }
}

大型集群(Gossip 协议)

心跳并不适用于几百到一千台服务器、跨越广域网的大型集群。在大型集群中需要考虑两点:

  1. 每台服务器不能无限制地生产消息
  2. 心跳信息所消耗的总带宽。不应该占用大量的网络带宽,应该有上限来确保心跳不会影响到整个集群的实际数据传输。

all-to-all 的心跳是要避免的,这种情景下通常使用故障探测器和在集群中传播故障信息的 Gossip 协议。