docs: change to relative path
@ -1,34 +1,37 @@
|
|||||||
# 前言
|
# 前言
|
||||||
|
|
||||||
前段时间在给自己的玩具项目设计的时候就遇到了一个场景需要定时任务,于是就趁机了解了目前主流的一些定时任务方案,比如下面这些:
|
前段时间在给自己的玩具项目设计的时候就遇到了一个场景需要定时任务,于是就趁机了解了目前主流的一些定时任务方案,比如下面这些:
|
||||||
- Timer(halo博客源码中用到了)
|
|
||||||
|
- Timer(halo 博客源码中用到了)
|
||||||
- ScheduledExecutorService
|
- ScheduledExecutorService
|
||||||
- ThreadPoolTaskScheduler(基于ScheduledExecutorService)
|
- ThreadPoolTaskScheduler(基于 ScheduledExecutorService)
|
||||||
- Netty的schedule(用到了PriorityQueue)
|
- Netty 的 schedule(用到了 PriorityQueue)
|
||||||
- Netty的HashedWheelTimer(时间轮)
|
- Netty 的 HashedWheelTimer(时间轮)
|
||||||
- Kafka的TimingWheel(层级时间轮)
|
- Kafka 的 TimingWheel(层级时间轮)
|
||||||
|
|
||||||
还有一些分布式的定时任务:
|
还有一些分布式的定时任务:
|
||||||
|
|
||||||
- Quartz
|
- Quartz
|
||||||
- xxl-job(我实习公司就在用这个)
|
- xxl-job(我实习公司就在用这个)
|
||||||
- ...
|
- ...
|
||||||
|
|
||||||
因为我玩具项目实现业务ACK的方案就打算用HashedWheelTimer,所以本节核心是分析HashedWheelTimer,另外会提下它与schedule的区别,其它定时任务实现原理就请自动Google吧。
|
因为我玩具项目实现业务 ACK 的方案就打算用 HashedWheelTimer,所以本节核心是分析 HashedWheelTimer,另外会提下它与 schedule 的区别,其它定时任务实现原理就请自动 Google 吧。
|
||||||
|
|
||||||
> Netty Version:4.1.42
|
> Netty Version:4.1.42
|
||||||
|
|
||||||
# HashedWheelTimer实现图示
|
# HashedWheelTimer 实现图示
|
||||||
|
|
||||||

|

|
||||||
|
|
||||||
大致有个理解就行,关于蓝色格子中的数字,其实就是剩余时钟轮数,这里听不懂也没关系,等后面看到源码解释就懂了~~(大概)~~。
|
大致有个理解就行,关于蓝色格子中的数字,其实就是剩余时钟轮数,这里听不懂也没关系,等后面看到源码解释就懂了~~(大概)~~。
|
||||||
|
|
||||||
# HashedWheelTimer简答使用例子
|
# HashedWheelTimer 简答使用例子
|
||||||
|
|
||||||
|
这里顺便列出 schedule 的使用方式,下面是某个 Handler 中的代码:
|
||||||
|
|
||||||
这里顺便列出schedule的使用方式,下面是某个Handler中的代码:
|
|
||||||
```java
|
```java
|
||||||
@Override
|
@Override
|
||||||
public void handlerAdded(final ChannelHandlerContext ctx) {
|
public void handlerAdded(final ChannelHandlerContext ctx) {
|
||||||
// 定时任务
|
// 定时任务
|
||||||
ScheduledFuture<?> hello_world = ctx.executor().schedule(() -> {
|
ScheduledFuture<?> hello_world = ctx.executor().schedule(() -> {
|
||||||
ctx.channel().write("hello world");
|
ctx.channel().write("hello world");
|
||||||
@ -40,22 +43,22 @@
|
|||||||
|
|
||||||
// 取消任务
|
// 取消任务
|
||||||
timeout1.cancel();
|
timeout1.cancel();
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
# HashedWheelTimer 源码
|
||||||
# HashedWheelTimer源码
|
|
||||||
|
|
||||||
### 继承关系、方法
|
### 继承关系、方法
|
||||||
|
|
||||||

|

|
||||||
|
|
||||||
### 构造函数、属性
|
### 构造函数、属性
|
||||||
|
|
||||||
请记住这些属性的是干啥用的,后面会频繁遇到:
|
请记住这些属性的是干啥用的,后面会频繁遇到:
|
||||||
`io.netty.util.HashedWheelTimer#HashedWheelTimer(java.util.concurrent.ThreadFactory, long, java.util.concurrent.TimeUnit, int, boolean, long)`
|
`io.netty.util.HashedWheelTimer#HashedWheelTimer(java.util.concurrent.ThreadFactory, long, java.util.concurrent.TimeUnit, int, boolean, long)`
|
||||||
|
|
||||||
```java
|
```java
|
||||||
public HashedWheelTimer(
|
public HashedWheelTimer(
|
||||||
ThreadFactory threadFactory,
|
ThreadFactory threadFactory,
|
||||||
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
|
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
|
||||||
long maxPendingTimeouts) {
|
long maxPendingTimeouts) {
|
||||||
@ -116,16 +119,17 @@
|
|||||||
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
|
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
|
||||||
reportTooManyInstances();
|
reportTooManyInstances();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
### 添加定时任务
|
### 添加定时任务
|
||||||
|
|
||||||
添加定时任务其实就是Timer接口的newTimeOut方法:
|
添加定时任务其实就是 Timer 接口的 newTimeOut 方法:
|
||||||
`io.netty.util.HashedWheelTimer#newTimeout`
|
`io.netty.util.HashedWheelTimer#newTimeout`
|
||||||
|
|
||||||
```java
|
```java
|
||||||
@Override
|
@Override
|
||||||
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
|
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
throw new NullPointerException("task");
|
throw new NullPointerException("task");
|
||||||
}
|
}
|
||||||
@ -167,13 +171,14 @@
|
|||||||
timeouts.add(timeout);
|
timeouts.add(timeout);
|
||||||
// 返回任务对象,该对象可以用于取消任务、获取任务信息等
|
// 返回任务对象,该对象可以用于取消任务、获取任务信息等
|
||||||
return timeout;
|
return timeout;
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
这里我们再跟进start方法看看:
|
这里我们再跟进 start 方法看看:
|
||||||
`io.netty.util.HashedWheelTimer#start`
|
`io.netty.util.HashedWheelTimer#start`
|
||||||
|
|
||||||
```java
|
```java
|
||||||
public void start() {
|
public void start() {
|
||||||
switch (WORKER_STATE_UPDATER.get(this)) {
|
switch (WORKER_STATE_UPDATER.get(this)) {
|
||||||
case WORKER_STATE_INIT:
|
case WORKER_STATE_INIT:
|
||||||
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
|
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
|
||||||
@ -198,22 +203,23 @@
|
|||||||
// Ignore - it will be ready very soon.
|
// Ignore - it will be ready very soon.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
### 定时任务执行
|
### 定时任务执行
|
||||||
|
|
||||||
定时任务的执行逻辑其实就在Worker的run方法中:
|
定时任务的执行逻辑其实就在 Worker 的 run 方法中:
|
||||||
`io.netty.util.HashedWheelTimer.Worker#run`
|
`io.netty.util.HashedWheelTimer.Worker#run`
|
||||||
|
|
||||||
```java
|
```java
|
||||||
// 用于处理取消的任务
|
// 用于处理取消的任务
|
||||||
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
|
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
|
||||||
|
|
||||||
// 时钟指针转动的次数
|
// 时钟指针转动的次数
|
||||||
private long tick;
|
private long tick;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
// Initialize the startTime.
|
// Initialize the startTime.
|
||||||
startTime = System.nanoTime();
|
startTime = System.nanoTime();
|
||||||
if (startTime == 0) {
|
if (startTime == 0) {
|
||||||
@ -264,14 +270,16 @@
|
|||||||
}
|
}
|
||||||
// 处理取消的任务
|
// 处理取消的任务
|
||||||
processCancelledTasks();
|
processCancelledTasks();
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
- 取消任务的逻辑这里就不展开看了,也比较简单,有兴趣自行补充即可。
|
- 取消任务的逻辑这里就不展开看了,也比较简单,有兴趣自行补充即可。
|
||||||
|
|
||||||
看看上面的transferTimeoutsToBuckets方法,如果你看不懂上面图中蓝色格子数字是什么意思,那就认真看看这个方法:
|
看看上面的 transferTimeoutsToBuckets 方法,如果你看不懂上面图中蓝色格子数字是什么意思,那就认真看看这个方法:
|
||||||
`io.netty.util.HashedWheelTimer.Worker#transferTimeoutsToBuckets`
|
`io.netty.util.HashedWheelTimer.Worker#transferTimeoutsToBuckets`
|
||||||
|
|
||||||
```java
|
```java
|
||||||
private void transferTimeoutsToBuckets() {
|
private void transferTimeoutsToBuckets() {
|
||||||
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
|
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
|
||||||
// adds new timeouts in a loop.
|
// adds new timeouts in a loop.
|
||||||
for (int i = 0; i < 100000; i++) {
|
for (int i = 0; i < 100000; i++) {
|
||||||
@ -304,16 +312,17 @@
|
|||||||
HashedWheelBucket bucket = wheel[stopIndex];
|
HashedWheelBucket bucket = wheel[stopIndex];
|
||||||
bucket.addTimeout(timeout);
|
bucket.addTimeout(timeout);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
继续看看上面run方法中的bucket.expireTimeouts(deadline);,这里面就是拿出任务并执行的逻辑:
|
继续看看上面 run 方法中的 bucket.expireTimeouts(deadline);,这里面就是拿出任务并执行的逻辑:
|
||||||
`io.netty.util.HashedWheelTimer.HashedWheelBucket#expireTimeouts`
|
`io.netty.util.HashedWheelTimer.HashedWheelBucket#expireTimeouts`
|
||||||
|
|
||||||
```java
|
```java
|
||||||
/**
|
/**
|
||||||
* Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
|
* Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
|
||||||
*/
|
*/
|
||||||
public void expireTimeouts(long deadline) {
|
public void expireTimeouts(long deadline) {
|
||||||
HashedWheelTimeout timeout = head;
|
HashedWheelTimeout timeout = head;
|
||||||
|
|
||||||
// process all timeouts
|
// process all timeouts
|
||||||
@ -341,24 +350,25 @@
|
|||||||
}
|
}
|
||||||
timeout = next;
|
timeout = next;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
# 和schedule对比
|
# 和 schedule 对比
|
||||||
|
|
||||||
关于schedule方法加入的定时任务什么时候被执行,你可以参考我之前写的[这篇博客](https://wenjie.store/archives/netty-nioeventloop-boot-2),在时间操作上和HashedWheelTimer大同小异。
|
关于 schedule 方法加入的定时任务什么时候被执行,你可以参考我之前写的[这篇博客](https://wenjie.store/archives/netty-nioeventloop-boot-2),在时间操作上和 HashedWheelTimer 大同小异。
|
||||||
|
|
||||||
schedule方法也是Netty的定时任务实现之一,但是底层的数据结构和HashedWheelTimer不一样,schedule方法用到的数据结构其实和ScheduledExecutorService类似,是PriorityQueue,它是一个优先级的队列。
|
schedule 方法也是 Netty 的定时任务实现之一,但是底层的数据结构和 HashedWheelTimer 不一样,schedule 方法用到的数据结构其实和 ScheduledExecutorService 类似,是 PriorityQueue,它是一个优先级的队列。
|
||||||
|
|
||||||
除此之外,schedule方法其实也用到MpscQueue,只是任务执行的时候,会把任务从PriorityQueue转移到MpscQueue上。
|
除此之外,schedule 方法其实也用到 MpscQueue,只是任务执行的时候,会把任务从 PriorityQueue 转移到 MpscQueue 上。
|
||||||
|
|
||||||
下面来跟踪下schedule方法看看,由于主要是看数据结构的区别,所以一些地方在这里我就不深追了
|
下面来跟踪下 schedule 方法看看,由于主要是看数据结构的区别,所以一些地方在这里我就不深追了
|
||||||
|
|
||||||
首先来到如下代码:
|
首先来到如下代码:
|
||||||
`io.netty.util.concurrent.AbstractScheduledEventExecutor#schedule(java.lang.Runnable, long, java.util.concurrent.TimeUnit)`
|
`io.netty.util.concurrent.AbstractScheduledEventExecutor#schedule(java.lang.Runnable, long, java.util.concurrent.TimeUnit)`
|
||||||
|
|
||||||
```java
|
```java
|
||||||
@Override
|
@Override
|
||||||
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
||||||
ObjectUtil.checkNotNull(command, "command");
|
ObjectUtil.checkNotNull(command, "command");
|
||||||
ObjectUtil.checkNotNull(unit, "unit");
|
ObjectUtil.checkNotNull(unit, "unit");
|
||||||
if (delay < 0) {
|
if (delay < 0) {
|
||||||
@ -368,13 +378,14 @@ schedule方法也是Netty的定时任务实现之一,但是底层的数据结
|
|||||||
|
|
||||||
return schedule(new ScheduledFutureTask<Void>(
|
return schedule(new ScheduledFutureTask<Void>(
|
||||||
this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
|
this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
继续跟进schedule方法看看:
|
继续跟进 schedule 方法看看:
|
||||||
`io.netty.util.concurrent.AbstractScheduledEventExecutor#schedule(io.netty.util.concurrent.ScheduledFutureTask<V>)`
|
`io.netty.util.concurrent.AbstractScheduledEventExecutor#schedule(io.netty.util.concurrent.ScheduledFutureTask<V>)`
|
||||||
|
|
||||||
```java
|
```java
|
||||||
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
|
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
|
||||||
if (inEventLoop()) {
|
if (inEventLoop()) {
|
||||||
scheduledTaskQueue().add(task.setId(nextTaskId++));
|
scheduledTaskQueue().add(task.setId(nextTaskId++));
|
||||||
} else {
|
} else {
|
||||||
@ -387,13 +398,14 @@ schedule方法也是Netty的定时任务实现之一,但是底层的数据结
|
|||||||
}
|
}
|
||||||
|
|
||||||
return task;
|
return task;
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
继续跟进scheduledTaskQueue()方法:
|
继续跟进 scheduledTaskQueue()方法:
|
||||||
`io.netty.util.concurrent.AbstractScheduledEventExecutor#scheduledTaskQueue`
|
`io.netty.util.concurrent.AbstractScheduledEventExecutor#scheduledTaskQueue`
|
||||||
|
|
||||||
```java
|
```java
|
||||||
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
|
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
|
||||||
if (scheduledTaskQueue == null) {
|
if (scheduledTaskQueue == null) {
|
||||||
scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
|
scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
|
||||||
SCHEDULED_FUTURE_TASK_COMPARATOR,
|
SCHEDULED_FUTURE_TASK_COMPARATOR,
|
||||||
@ -401,10 +413,10 @@ schedule方法也是Netty的定时任务实现之一,但是底层的数据结
|
|||||||
11);
|
11);
|
||||||
}
|
}
|
||||||
return scheduledTaskQueue;
|
return scheduledTaskQueue;
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
可以看到返回值就是PriorityQueue,它是一个最小堆实现的优先队列。
|
可以看到返回值就是 PriorityQueue,它是一个最小堆实现的优先队列。
|
||||||
|
|
||||||
# 扩展
|
# 扩展
|
||||||
|
|
||||||
@ -413,47 +425,53 @@ schedule方法也是Netty的定时任务实现之一,但是底层的数据结
|
|||||||
这里我就直接贴下网上大佬给出的解释:
|
这里我就直接贴下网上大佬给出的解释:
|
||||||
|
|
||||||
如果使用最小堆实现的优先级队列:
|
如果使用最小堆实现的优先级队列:
|
||||||

|

|
||||||
- 大致意思就是你的任务如果插入到堆顶,时间复杂度为O(log(n))。
|
|
||||||
|
- 大致意思就是你的任务如果插入到堆顶,时间复杂度为 O(log(n))。
|
||||||
|
|
||||||
如果使用链表(既然有说道,那就扩展下):
|
如果使用链表(既然有说道,那就扩展下):
|
||||||

|

|
||||||
- 中间插入后的事件复杂度为O(n)
|
|
||||||
|
- 中间插入后的事件复杂度为 O(n)
|
||||||
|
|
||||||
单个时间轮:
|
单个时间轮:
|
||||||

|

|
||||||
- 复杂度可以降至O(1)。
|
|
||||||
|
- 复杂度可以降至 O(1)。
|
||||||
|
|
||||||
记录轮数的时间轮(其实就是文章开头的那个):
|
记录轮数的时间轮(其实就是文章开头的那个):
|
||||||

|

|
||||||
|
|
||||||
层级时间轮:
|
层级时间轮:
|
||||||

|

|
||||||
- 时间复杂度是O(n),n是轮子的数量,除此之外还要计算一个轮子上的bucket。
|
|
||||||
|
- 时间复杂度是 O(n),n 是轮子的数量,除此之外还要计算一个轮子上的 bucket。
|
||||||
|
|
||||||
### 单时间轮缺点
|
### 单时间轮缺点
|
||||||
|
|
||||||
根据上面的图其实不难理解,如果任务是很久之后才执行的、同时要保证任务低延迟,那么单个时间轮所需的bucket数就会变得非常多,从而导致内存占用持续升高(CPU空转时间还是不变的,仅仅是内存需求变高了),如下图:
|
根据上面的图其实不难理解,如果任务是很久之后才执行的、同时要保证任务低延迟,那么单个时间轮所需的 bucket 数就会变得非常多,从而导致内存占用持续升高(CPU 空转时间还是不变的,仅仅是内存需求变高了),如下图:
|
||||||
|
|
||||||

|

|
||||||
|
|
||||||
Netty对于单个时间轮的优化方式就是记录下remainingRounds,从而减少bucket过多的内存占用。
|
Netty 对于单个时间轮的优化方式就是记录下 remainingRounds,从而减少 bucket 过多的内存占用。
|
||||||
|
|
||||||
### 时间轮和PriorityQueue对比
|
### 时间轮和 PriorityQueue 对比
|
||||||
|
|
||||||
看完上面的时间复杂度对比,你可能会觉得:
|
看完上面的时间复杂度对比,你可能会觉得:
|
||||||
|
|
||||||
- Q:时间轮的复杂度只有O(1),schedule和ScheduledExecutorService这种都是O(log(n)),那时间轮不是碾压吗?
|
- Q:时间轮的复杂度只有 O(1),schedule 和 ScheduledExecutorService 这种都是 O(log(n)),那时间轮不是碾压吗?
|
||||||
|
|
||||||
- A:你不要忘了,如果任务是在很久之后才执行的,那么时间轮就会产生很多空转,这是非常浪费CPU性能的,这种空转消耗可以通过增大tickDuration来避免,但这样做又会产生降低定时任务的精度,可能导致一些任务推到很迟才执行。
|
- A:你不要忘了,如果任务是在很久之后才执行的,那么时间轮就会产生很多空转,这是非常浪费 CPU 性能的,这种空转消耗可以通过增大 tickDuration 来避免,但这样做又会产生降低定时任务的精度,可能导致一些任务推到很迟才执行。
|
||||||
- A:而ScheduledExecutorService不会有这个问题。
|
- A:而 ScheduledExecutorService 不会有这个问题。
|
||||||
|
|
||||||
|
另外,Netty 时间轮的实现模型抽象出来是大概这个样子的:
|
||||||
|
|
||||||
另外,Netty时间轮的实现模型抽象出来是大概这个样子的:
|
|
||||||
```java
|
```java
|
||||||
for(Tasks task : tasks) {
|
for(Tasks task : tasks) {
|
||||||
task.doXxx();
|
task.doXxx();
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
这个抽象是个什么意思呢?你要注意一个点,这里的任务循环执行是同步的,**这意味着你第一个任务执行很慢延迟很高,那么后面的任务全都会被堵住**,所以你加进时间轮的任务不可以是耗时任务,比如一些延迟很高的数据库查询,如果有这种耗时任务,最好再嵌入线程池处理,不要让任务阻塞在这一层。
|
这个抽象是个什么意思呢?你要注意一个点,这里的任务循环执行是同步的,**这意味着你第一个任务执行很慢延迟很高,那么后面的任务全都会被堵住**,所以你加进时间轮的任务不可以是耗时任务,比如一些延迟很高的数据库查询,如果有这种耗时任务,最好再嵌入线程池处理,不要让任务阻塞在这一层。
|
||||||
|
|
||||||
> 原文链接:https://wenjie.store/archives/netty-hashedwheeltimer-and-schedule
|
> 原文链接:https://wenjie.store/archives/netty-hashedwheeltimer-and-schedule
|
||||||
|
BIN
images/Netty/image_1595751597062.png
Normal file
After Width: | Height: | Size: 59 KiB |
BIN
images/Netty/image_1595752125587.png
Normal file
After Width: | Height: | Size: 69 KiB |
BIN
images/Netty/image_1595756711656.png
Normal file
After Width: | Height: | Size: 22 KiB |
BIN
images/Netty/image_1595756928493.png
Normal file
After Width: | Height: | Size: 25 KiB |
BIN
images/Netty/image_1595757035360.png
Normal file
After Width: | Height: | Size: 67 KiB |
BIN
images/Netty/image_1595757110003.png
Normal file
After Width: | Height: | Size: 52 KiB |
BIN
images/Netty/image_1595757328715.png
Normal file
After Width: | Height: | Size: 52 KiB |
BIN
images/Netty/image_1595758329809.png
Normal file
After Width: | Height: | Size: 93 KiB |