单一更新队列

Jul 24, 2021 23:00 · 2170 words · 5 minute read Distributed System

问题

当系统的状态由多个并发的客户端更新时,我们需要安全地更新,一次一个的变更。例如预写日志中的例子,即使多个客户端在并发写,一次只处理一个日志条目。锁,通常被用于防止并发修改,但如果正在写文件,这会阻塞所有其他调用的线程直到任务完成,会大大降低系统的吞吐量和延迟。我们的目标是高效地利用计算资源,同时保证一次执行一个的需求

解决方案

实现一条工作队列(workqueue)和一条工作线程。多客户端将状态变更提交至队列中,由单线程来处理系统状态修改。在 Golang 自然是通过 goroutine 和 channel 来实现。

一个典型的 Java 实现:

一个 SingularUpdateQueue 类有一条队列和工作函数,继承 java.lang.Thread 以确保单线程执行。

public class SingularUpdateQueue<Req, Res> extends Thread implements Logging {
    private ArrayBlockingQueue<RequestWrapper<Req, Res>> workQueue
            = new ArrayBlockingQueue<RequestWrapper<Req, Res>>(100);
    private Function<Req, Res> handler;
    private volatile boolean isRunning = false;

客户端向队列提交请求,队列通过包装器将每个请求包裹进包装类来与一个 future 结合,将 future 返回给客户端,这样当请求最终完成时客户端能够做出反应。

class SingularUpdateQueue

    public CompletableFuture<Res> submit(Req request) {
        try {
            var requestWrapper = new RequestWrapper<Req, Res>(request);
            workQueue.put(requestWrapper);
            return requestWrapper.getFuture();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

class RequestWrapper<Req, Res> {
    private final CompletableFuture<Res> future;
    private final Req request;

    public RequestWrapper(Req request) {
        this.request = request;
        this.future = new CompletableFuture<Res>();
    }
    public CompletableFuture<Res> getFuture() { return future; }
    public Req getRequest()                   { return request; }

队列中的元素由单线程处理,队列允许多个并发的生产者向其添加任务。队列的实现应该是线程安全的,不应该在竞争下增加大量开销。执行线程从队列取出请求,一次只处理一个。CompletableFuture 是用任务执行的应答完成的。

class SingularUpdateQueue

    @Override
    public void run() {
        isRunning = true;
        while(isRunning) {
            Optional<RequestWrapper<Req, Res>> item = take();
            item.ifPresent(requestWrapper -> {
                try {
                    Res response = handler.apply(requestWrapper.getRequest());
                    requestWrapper.complete(response);

                } catch (Exception e) {
                    requestWrapper.completeExceptionally(e);
                }
            });
        }
    }
class RequestWrapper

    public void complete(Res response) {
        future.complete(response);
    }

    public void completeExceptionally(Exception e) {
        e.printStackTrace();
        getFuture().completeExceptionally(e);
    }

当从队列读取时加一个超时是很管用的,防止永久阻塞。通过将 isRunning 设置成 false 如有必要退出线程,这样当队列为空时就不会无限阻塞工作线程。所以我们使用带超时的 poll 方法。这使得我们可以干净利落地关闭执行线程。

class SingularUpdateQueue

    private Optional<RequestWrapper<Req, Res>> take() {
        try {
            return Optional.ofNullable(workQueue.poll(300, TimeUnit.MILLISECONDS));

        } catch (InterruptedException e) {
            return Optional.empty();
        }
    }


    public void shutdown() {
        this.isRunning = false;
    }

举个栗子,服务器处理多个客户端的请求并更新预写日志,就可以使用单一更新队列技术。

在这个例子中,我们为 WAL 使用了一个消费者。这个消费者是一个单例,它来控制对日志数据结构的访问。消费者要将每个请求放入日志并返回应答。应答消息必须要在消息被写入日志后才能发送。于是我们使用 SingularUpdateQueue 来确保这些操作有一个明确的顺序

public class WalRequestConsumer implements Consumer<Message<RequestOrResponse>> {

    private final SingularUpdateQueue<Message<RequestOrResponse>, Message<RequestOrResponse>> walWriterQueue;
    private final WriteAheadLog wal;

    public WalRequestConsumer(Config config) {
        this.wal = WriteAheadLog.openWAL(config);
        walWriterQueue = new SingularUpdateQueue<>((message) -> {
            wal.writeEntry(serialize(message));
            return responseMessage(message);
        });
        startHandling();
    }

    private void startHandling() { this.walWriterQueue.start(); }

消费者的 accept 方法接收消息,将它们入队并在每个消息被处理后发送一个响应。这个方法在调用者的线程上运行,这就允许诸多调用者同时调用 accept

@Override
public void accept(Message message) {
    CompletableFuture<Message<RequestOrResponse>> future = walWriterQueue.submit(message);
    future.whenComplete((responseMessage, error) -> {
        sendResponse(responseMessage);
    });
}

队列的选项

队列数据结构非常重要,在 JVM 中,有多种可供选择的数据结构:

  • ArrayBlockingQueue(Kafka 的请求队列中使用)

    名副其实,这是一个基于数组的阻塞队列。当需要固定边界的队列时可以使用 ArrayBlockingQueue。一旦队列填满,生产者将阻塞。当消费者速度缓慢而生产者速度很快时很有用。

  • ConcurrentLinkedQueue 和 ForkJoinPool(Akka Actors 邮箱的实现中使用)

    没有消费者等待生产者的场景中可以使用 ConcurrentLinkedQueue,但也有些协调器可以做到在任务入队至 ConcurrentLinkedQueue 时才安排消费者。

  • LinkedBlockingDeque(Zookeeper 和 Kafka 的应答队列所使用)

    最主要在无限制入队中使用,不会阻塞生产者。要谨慎选择,如果没有实现 backpressure 技术,队列可能会迅速填满,耗光所有内存。

  • 环形缓冲区(LMAX Disruptor 使用)

    有些场景对延迟很敏感。ArrayBlockingQueue 队列在处理阶段之间的复制操作会增加延迟,这是不可接受的。环形缓冲区可用于在阶段之间传递任务。

使用 channel 和 goroutine

这对于支持轻量级线程和通道概念的语言或库(比如 Golang 和 Kotlin)来说是很自然的。所有的请求被传递到单一的 channel 被处理,还有一个 goroutine 在处理所有的消息来更新状态。然后应答被写入另一个 channel,并由另一条 goroutine 发回客户端。从下面的代码可以看到,更新键值对的请求被传递到一个共享的请求 channel。

func (s *Server) putKV(w http.ResponseWriter, r *http.Request) {
    kv, err := s.readRequest(r, w)
    if err != nil {
        log.Panic(err)
        return
    }

    request := &requestResponse{
        request:         kv,
        responseChannel: make(chan string),
    }

    s.requestChannel <- request
    response := s.waitForResponse(request)
    w.Write([]byte(response))
}

请求由单条 groutine 处理来更新所有状态。

func (s *Server) Start() error {
    go s.serveHttp()

    go s.singularUpdateQueue()

    return nil
}

func (s *Server) singularUpdateQueue() {
    for {
        select {
            case e := <-s.requestChannel:
                s.updateState(e)
                e.responseChannel <- buildResponse(e)
        }
    }
}

背压(Backpressure)

当工作队列被用于线程间的通讯时,背压是一个要关注的点。在消费者的速度很慢而生产者的速度飞快这样的案例中,队列很快就会被填满,内存有可能被耗尽。一般,如果队列满了,发送者会阻塞。例如,java.util.concurrent.ArrayBlockingQueue 有两个方法来添加元素。如果数组满了,put 方法将阻塞生产者;而 add 方法会抛出 IllegalStateException 而不是阻塞生产者。所以要仔细阅读文档,在 ArrayBlockingQueue 的案例中,应该用 put 方法来阻塞发送者,通过阻塞提供“背压”。也可以使用 reactive-streams 这样的框架来帮助实现一个更复杂的从生产者到消费者的背压机制。

案例

所有像 ZooKeeper(ZAB) 和 etcd(RAFT)这类一致性实现都要以严格的顺序处理请求,一次一个。他们使用了相似的代码结构。

  • ZooKeeper 通过单线程的请求处理器实现了请求处理的流水线
  • etcd 这类基于 Golang 的实现有条单独的 goroutine,从请求 channel 接收数据来更新状态。
  • Kafka 的控制器需要根据来自 ZooKeeper 的多个并发事件更新状态,在单线程中处理这些事件。