跳至主要內容

Dubbo时间轮

javadubbo系列约 3160 字大约 11 分钟

Dubbo时间轮

时间轮是一种高效的、批量管理定时任务的调度模型。时间轮一般会实现成一个环形结构,类似一个时钟,分为很多槽,一个槽代表一个时间间隔,每个槽使用双向链表存储定时任务;指针周期性地跳动,跳动到一个槽位,就执行该槽位的定时任务。

1.png
1.png

需要注意的是,单层时间轮的容量和精度都是有限的,对于精度要求特别高、时间跨度特别大或是海量定时任务需要调度的场景,通常会使用多级时间轮以及持久化存储与时间轮结合的方案。

那在 Dubbo 中,时间轮的具体实现方式是怎样的呢?Dubbo 的时间轮实现位于 dubbo-common 模块的 org.apache.dubbo.common.timer 包中,下面我们就来分析时间轮涉及的核心接口和实现。

核心接口

TimerTask接口

在 Dubbo 中,所有的定时任务都要继承 TimerTask 接口。TimerTask 接口非常简单,只定义了一个 run() 方法,该方法的入参是一个 Timeout 接口的对象。

/**
 * A task which is executed after the delay specified with
 * {@link Timer#newTimeout(TimerTask, long, TimeUnit)} (TimerTask, long, TimeUnit)}.
 */
public interface TimerTask {

    /**
     * Executed after the delay specified with
     * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
     *
     * @param timeout a handle which is associated with this task
     */
    void run(Timeout timeout) throws Exception;
}

Timeout接口

Timeout 对象与 TimerTask 对象一一对应,两者的关系类似于线程池返回的 Future 对象与提交到线程池中的任务对象之间的关系。通过 Timeout 对象,我们不仅可以查看定时任务的状态,还可以操作定时任务(例如取消关联的定时任务)。

image-20230402222206190
image-20230402222206190

Timer接口

Timer 接口定义了定时器的基本行为,如下图所示,其核心是 newTimeout() 方法:提交一个定时任务(TimerTask)并返回关联的 Timeout 对象,这有点类似于向线程池提交任务的感觉。

image-20230402222333395
image-20230402222333395
image-20230402223154974
image-20230402223154974

接口实现

HashedWheelTimeout

HashedWheelTimeout 是 Timeout 接口的唯一实现,是 HashedWheelTimer 的内部类。HashedWheelTimeout 扮演了两个角色:

  • 第一个,时间轮中双向链表的节点,即定时任务 TimerTask 在 HashedWheelTimer 中的容器。
  • 第二个,定时任务 TimerTask 提交到 HashedWheelTimer 之后返回的句柄(Handle),用于在时间轮外部查看和控制定时任务。

HashedWheelTimeout 中的核心字段如下:

  • prev、next(HashedWheelTimeout类型),分别对应当前定时任务在链表中的前驱节点和后继节点。
  • task(TimerTask类型),指实际被调度的任务。
  • deadline(long类型),指定时任务执行的时间。这个时间是在创建 HashedWheelTimeout 时指定的,计算公式是:currentTime(创建 HashedWheelTimeout 的时间) + delay(任务延迟时间) - startTime(HashedWheelTimer 的启动时间),时间单位为纳秒。
  • state(volatile int类型),指定时任务当前所处状态,可选的有三个,分别是 INIT(0)、CANCELLED(1)和 EXPIRED(2)。另外,还有一个 STATE_UPDATER 字段(AtomicIntegerFieldUpdater类型)实现 state 状态变更的原子性。
  • remainingRounds(long类型),指当前任务剩余的时钟周期数。时间轮所能表示的时间长度是有限的,在任务到期时间与当前时刻的时间差,超过时间轮单圈能表示的时长,就出现了套圈的情况,需要该字段值表示剩余的时钟周期。

HashedWheelTimeout 中的核心方法有:

  • isCancelled()、isExpired() 、state() 方法, 主要用于检查当前 HashedWheelTimeout 状态。
  • cancel() 方法, 将当前 HashedWheelTimeout 的状态设置为 CANCELLED,并将当前 HashedWheelTimeout 添加到 cancelledTimeouts 队列中等待销毁。
  • expire() 方法, 当任务到期时,会调用该方法将当前 HashedWheelTimeout 设置为 EXPIRED 状态,然后调用其中的 TimerTask 的 run() 方法执行定时任务。
  • remove() 方法, 将当前 HashedWheelTimeout 从时间轮中删除。
private static final class HashedWheelTimeout implements Timeout {

}

HashedWheelBucket

HashedWheelBucket 是时间轮中的一个槽,时间轮中的槽实际上就是一个用于缓存和管理双向链表的容器,双向链表中的每一个节点就是一个 HashedWheelTimeout 对象,也就关联了一个 TimerTask 定时任务。

HashedWheelBucket 持有双向链表的首尾两个节点,分别是 head 和 tail 两个字段,再加上每个 HashedWheelTimeout 节点均持有前驱和后继的引用,这样就可以正向或是逆向遍历整个双向链表了。

  • addTimeout() 方法:新增 HashedWheelTimeout 到双向链表的尾部。
  • pollTimeout() 方法:移除双向链表中的头结点,并将其返回。
  • remove() 方法:从双向链表中移除指定的 HashedWheelTimeout 节点。
  • clearTimeouts() 方法:循环调用 pollTimeout() 方法处理整个双向链表,并返回所有未超时或者未被取消的任务。
  • expireTimeouts() 方法:遍历双向链表中的全部 HashedWheelTimeout 节点。 在处理到期的定时任务时,会通过 remove() 方法取出,并调用其 expire() 方法执行;对于已取消的任务,通过 remove() 方法取出后直接丢弃;对于未到期的任务,会将 remainingRounds 字段(剩余时钟周期数)减一。
private static final class HashedWheelBucket {

        /**
         * Used for the linked-list datastructure
         */
        private HashedWheelTimeout head;
        private HashedWheelTimeout tail;
}

HashedWheelTimer

HashedWheelTimer 是 Timer 接口的实现,它通过时间轮算法实现了一个定时器。

HashedWheelTimer 会根据当前时间轮指针选定对应的槽(HashedWheelBucket),从双向链表的头部开始迭代,对每个定时任务(HashedWheelTimeout)进行计算,属于当前时钟周期则取出运行,不属于则将其剩余的时钟周期数减一操作。

  • workerState(volatile int类型):时间轮当前所处状态,可选值有 init、started、shutdown。同时,有相应的 AtomicIntegerFieldUpdater 实现 workerState 的原子修改。
  • startTime(long类型):当前时间轮的启动时间,提交到该时间轮的定时任务的 deadline 字段值均以该时间戳为起点进行计算。
  • wheel(HashedWheelBucket[]类型):该数组就是时间轮的环形队列,每一个元素都是一个槽。当指定时间轮槽数为 n 时,实际上会取大于且最靠近 n 的 2 的幂次方值。
  • timeouts、cancelledTimeouts(LinkedBlockingQueue类型):timeouts 队列用于缓冲外部提交时间轮中的定时任务,cancelledTimeouts 队列用于暂存取消的定时任务。HashedWheelTimer 会在处理 HashedWheelBucket 的双向链表之前,先处理这两个队列中的数据。
  • tick(long类型):该字段在 HashedWheelTimer$Worker 中,是时间轮的指针,是一个步长为 1 的单调递增计数器。
  • mask(int类型):掩码, mask = wheel.length - 1,执行 ticks & mask 便能定位到对应的时钟槽。
  • ticksDuration(long类型):时间指针每次加 1 所代表的实际时间,单位为纳秒。
  • pendingTimeouts(AtomicLong类型):当前时间轮剩余的定时任务总数。
  • workerThread(Thread类型):时间轮内部真正执行定时任务的线程。
  • worker(Worker类型):真正执行定时任务的逻辑封装这个 Runnable 对象中。
public class HashedWheelTimer implements Timer {

}

时间轮对外提供了一个 newTimeout() 接口用于提交定时任务,在定时任务进入到 timeouts 队列之前会先调用 start() 方法启动时间轮,其中会完成下面两个关键步骤:

  1. 确定时间轮的 startTime 字段;
  2. 启动 workerThread 线程,开始执行 worker 任务。
  3. 之后根据 startTime 计算该定时任务的 deadline 字段,最后才能将定时任务封装成 HashedWheelTimeout 并添加到 timeouts 队列。
    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
				//判断当前时间轮剩余的定时任务总数。
        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                    + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                    + "timeouts (" + maxPendingTimeouts + ")");
        }
				//启动时间轮内部真正执行定时任务。
        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

下面我们来分析时间轮指针一次转动的全流程。

  1. 时间轮指针转动,时间轮周期开始。
  2. 清理用户主动取消的定时任务,这些定时任务在用户取消时,会记录到 cancelledTimeouts 队列中。在每次指针转动的时候,时间轮都会清理该队列。
  3. 将缓存在 timeouts 队列中的定时任务转移到时间轮中对应的槽中。
  4. 根据当前指针定位对应槽,处理该槽位的双向链表中的定时任务。
  5. 检测时间轮的状态。如果时间轮处于运行状态,则循环执行上述步骤,不断执行定时任务。如果时间轮处于停止状态,则执行下面的步骤获取到未被执行的定时任务并加入 unprocessedTimeouts 队列:遍历时间轮中每个槽位,并调用 clearTimeouts() 方法;对 timeouts 队列中未被加入槽中循环调用 poll()。
  6. 最后再次清理 cancelledTimeouts 队列中用户主动取消的定时任务。
 @Override
        public void run() {
            // Initialize the startTime.
            startTime = System.nanoTime();
            if (startTime == 0) {
                // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
                startTime = 1;
            }

            // Notify the other threads waiting for the initialization at start().
            startTimeInitialized.countDown();

            do {
                final long deadline = waitForNextTick();
                if (deadline > 0) {
                    //掩码, mask = wheel.length - 1,执行 ticks & mask 便能定位到对应的时钟槽。
                    int idx = (int) (tick & mask);
                    //清理用户主动取消的定时任务,这些定时任务在用户取消时,会记录到 cancelledTimeouts 队列中。在每次指针转动的时候,时间轮都会清理该队列。
                    processCancelledTasks();
                    HashedWheelBucket bucket =
                            wheel[idx];
                    //将缓存在 timeouts 队列中的定时任务转移到时间轮中对应的槽中。
                    transferTimeoutsToBuckets();
                    //根据当前指针定位对应槽,处理该槽位的双向链表中的定时任务。
                    bucket.expireTimeouts(deadline);
                    tick++;
                }
                //时间轮处于运行状态
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
            // Fill the unprocessedTimeouts so we can return them from stop() method.
            for (HashedWheelBucket bucket : wheel) {
                //获取到未被执行的定时任务并加入 unprocessedTimeouts 队列
                bucket.clearTimeouts(unprocessedTimeouts);
            }
            for (; ; ) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.isCancelled()) {
                    unprocessedTimeouts.add(timeout);
                }
            }
            //再次清理 cancelledTimeouts 队列中用户主动取消的定时任务。
            processCancelledTasks();
        }

Dubbo 中如何使用定时任务

在 Dubbo 中,时间轮并不直接用于周期性操作,而是只向时间轮提交执行单次的定时任务,在上一次任务执行完成的时候,调用 newTimeout() 方法再次提交当前任务,这样就会在下个周期执行该任务。即使在任务执行过程中出现了 GC、I/O 阻塞等情况,导致任务延迟或卡住,也不会有同样的任务源源不断地提交进来,导致任务堆积。

Dubbo 中对时间轮的应用主要体现在如下两个方面:

  • 失败重试, 例如,Provider 向注册中心进行注册失败时的重试操作,或是 Consumer 向注册中心订阅时的失败重试等。
  • 周期性定时任务, 例如,定期发送心跳请求,请求超时的处理,或是网络连接断开后的重连机制。

举例: 注册失败,失败重试

org.apache.dubbo.registry.support.FailbackRegistry#register
 @Override
    public void register(URL url) {
    		if (!acceptable(url)) {
            logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
            return;
        }
        super.register(url);  // 完成本地文件缓存的初始化
        // 清理failedRegistered集合和failedUnregistered集合,并取消相关任务
        removeFailedRegistered(url); // 清理FailedRegisteredTask定时任务
        removeFailedUnregistered(url); // 清理FailedUnregisteredTask定时任务
        try {
            // Sending a registration request to the server side
            doRegister(url);  // 与服务发现组件进行交互,具体由子类实现
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            // 检测check参数,决定是否直接抛出异常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

          	//这里重试
            // 如果不抛出异常,则创建失败重试的任务,并添加到failedRegistered集合中
            addFailedRegistered(url);
    }
    private void addFailedRegistered(URL url) {
        FailedRegisteredTask oldOne = failedRegistered.get(url);
        if (oldOne != null) { // 已经存在重试任务,则无须创建,直接返回
            return;
        }
        FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);
        oldOne = failedRegistered.putIfAbsent(url, newTask);
        if (oldOne == null) {
            // 如果是新建的重试任务,则提交到时间轮中,等待retryPeriod毫秒后执行
            retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
        }
    }