1、时间轮概述
传统定时任务
如果想定期执行一个操作,只需要起一个定时器,设置时间间隔,然后时间达到之后就执行任务,一个定时器对应一个任务,如果任务很少的情况下这样做没什么问题,但是任务有成千上万个的时候,就得起很多的定时器,不断轮询,这对系统的内存和cpu都产生了很大的压力,程序还没开始跑呢,定时器已经满天飞了…
以下是纯手写的简单定时器代码,用死循环执行,因为一个定时器只针对一个任务,所以开销巨大,不建议使用
public void job(){
while(true){
sleep(延时N秒);
... 执行任务
}
}
时间轮算法
时间轮算法的核心是,轮询线程不再是负责遍历所有任务,而只在负责处于其当前时间上的任务.就像钟表一样,时间轮有一个指针会一直旋转,每转到一个新的时间,就去执行挂在这一时刻下的所有任务,当然,这些任务通常是交给其他线程去做,指针要做的只是继续往下转,不会关心任务的执行进度.
下面,我们就从一个最简单的定时任务来一步步优化,看看时间轮到底是怎么设计出来的
我们设置一个环状的时间队列,队列上的每一个元素,我们称为一个槽(slot),这个槽所对应的时间在整个时间轮里是唯一的, 根据前面的分析也能知道,槽里面是放任务的,而且肯定不会放一个任务,而是放一个任务队列. 下图就是一个时间轮,这个轮子上有60个槽,用来模拟一分钟的定时任务,从0开始,每过一秒钟后,指针就调到下一个槽中,到59秒后,下一面又回到0秒,这样就实现了不停地转圈;
那么问题来了, 这个时间轮用什么数据结构来实现呢? 没错,就是用数组,所以我们说是轮子, 但是可以将他展开来看,就可以得到如下数组结构
刚刚说了, 每个槽都是用来放任务列表的,所以任务列表我们可以用队列或者链表来实现,也就是如下结构
2、定时任务流程
好,现在万事具备,只欠东风,刚刚已经确定好定时时间轮的数据结构了,那么我们要怎么启动它呢?首先我们来分析一下:
1、定时任务肯定是需要启动的嘛,所以需要一个 start() 方法;
2、除此之外,肯定还有一个添加任务的入口,所以需要一个 add()方法;
3、需要运行定时器和任务,所以需要2个线程池,一个用来运行定时器,另一个用来运行任务
4、时间轮本身就是一个数组,在这边我们用CopyOnWriteArrayList来实现,因为它是线程安全的,我们使用了线程池,肯定要考虑到线程安全
5、时间轮上的每个槽都对应一个任务列表,这个任务列表我们用 LinkedBlockingQueue 来实现,这个队列本身也是线程安全的,并且它是一个单向链表;
以上就是我们在开发代码之前需要准备的东西,现在东西都准确好了, 在来看看整个定时任务的流程图吧
3、上代码
以下代码中,博主还增加了一个自动启停定时任务的功能,就是时间轮转完一圈后,在统一扫描下每个槽中的任务数量,如果所有的槽都没有任务了,那么轮询这个时间轮的操作就会暂停,进入阻塞状态,当我添加新任务时,轮询的操作又会继续运行,这个功能不是必须的,但也是为了减少cpu资源的消耗而做的优化;
Task.java 用来存放任务
package com.timerTask.single;
import lombok.Data;
import java.util.Date;
/**
* 任务
*/
@Data
public class Task {
// 任务执行时间
private Date time;
// 需要执行的任务任务
private Runnable runnable;
}
TaskSingleTimeWheel.java 定时任务实现类
package com.timerTask.single;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;
/**
* 定时任务:单层时间轮
*/
@Slf4j
public class TaskSingleTimeWheel {
// 时间轮
private List<LinkedBlockingQueue<Task>> wheelQueue;
// 时间轮线程
private ThreadPoolExecutor wheelThreadService = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
// 执行任务的线程
private ThreadPoolExecutor taskThreadService = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
// 正在阻塞的线程,如果有线程调用LockSupport.park()方法,那么blockThread一定有值
private static volatile Thread blockThread ;
public TaskSingleTimeWheel(){
wheelQueue = Lists.newCopyOnWriteArrayList();
for (int i = 0; i < 60; i++) {
wheelQueue.add(new LinkedBlockingQueue<>());
}
}
// 添加任务
public void add(Task task) {
log.info("添加任务...");
// 计算槽位,获取当前时间的秒数,放到对应的槽位
int second = getCurrentDateSecond(task.getTime());
LinkedBlockingQueue<Task> tasks = wheelQueue.get(second);
if(!tasks.offer(task)){
log.error("添加任务失败:{}",task);
return;
}
if(null != blockThread){
// 解除阻塞
log.info("添加任务成功,解除阻塞");
LockSupport.unpark(blockThread);
blockThread= null;
}
}
// 启动定时任务
public void start() {
// 线程总数 - 已完成的线程数 = 正在运行的线程数量
if( wheelThreadService.getTaskCount() > wheelThreadService.getCompletedTaskCount()){
log.info("定时任务已启动,请不要重复启动");
return;
}
wheelThreadService.execute(()->{
// 与当前机器时间同步
for (;;){
int index = getCurrentDateSecond(new Date());
LinkedBlockingQueue<Task> tasks = wheelQueue.get(index);
log.info("第{}秒的任务数量有{}个",index,tasks.size());
if(!tasks.isEmpty()){
runTask(tasks);
}
// 延时一秒后遍历下一个槽
sleep();
if(index >= wheelQueue.size()-1){
// 阻塞当前线程
blockThisThread();
}
}
});
log.info("开启,getCompletedTaskCount:{},getTaskCount:{}",wheelThreadService.getCompletedTaskCount(),wheelThreadService.getTaskCount());
}
// 判断队列中是否有任务,若没有任务则阻塞线程,避免不必要的开销
private void blockThisThread() {
// 所有的槽中都没有可执行的任务,阻塞当前线程
if(0== wheelQueue.stream().mapToLong(item-> item.stream().count()).sum()){
// 阻塞当前线程
log.info("所有的槽中都没有可执行的任务,进行阻塞操作");
blockThread = Thread.currentThread();
LockSupport.park();
}
}
/**
* 运行任务
* @param tasks
*/
private void runTask(LinkedBlockingQueue<Task> tasks) {
while (true){
Task task = tasks.poll();
if(null == task){
break;
}
log.info("开始执行任务,时间点:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(task.getTime()));
// 运行
taskThreadService.execute(task.getRunnable());
}
}
// 睡眠一段时间
public void sleep(long ms){
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
log.error("睡眠异常",e);
throw new RuntimeException(e);
}
}
// 睡眠一秒
public void sleep(){
sleep(1000);
}
/**
* 获取当前时间的秒数(计算槽位)
* @param time
* @return
*/
private int getCurrentDateSecond(Date time){
Calendar calendar = Calendar.getInstance();
calendar.setTime(time);
// 因为时间轮是以秒为单位来计算的,所以获取当前时间的秒数
int i = calendar.get(Calendar.SECOND);
return i;
}
/**
* 获取当前时间的ms数
* @param time
* @return
*/
private int getCurrentDateMs(Date time){
Calendar calendar = Calendar.getInstance();
calendar.setTime(time);
// 因为时间轮是以秒为单位来计算的,所以获取当前时间的秒数
int i = calendar.get(Calendar.MILLISECOND);
log.info("当前时间的秒数为:{}",i);
return i;
}
}
Main.java 启动类
package com.timerTask.single;
import lombok.extern.slf4j.Slf4j;
import java.util.Date;
@Slf4j
public class Main {
public static void main(String[] args) throws InterruptedException {
TaskSingleTimeWheel taskSingleTimeWheel = new TaskSingleTimeWheel();
taskSingleTimeWheel.start();
Task task = new Task();
task.setTime(new Date());
task.setRunnable(() -> {
log.info("我是任务,我正在执行中");
});
taskSingleTimeWheel.add(task);
taskSingleTimeWheel.add(task);
taskSingleTimeWheel.add(task);
taskSingleTimeWheel.add(task);
taskSingleTimeWheel.sleep(120000);
taskSingleTimeWheel.add(task);
// 等待其他线程执行完
Thread.currentThread().join();
}
}