手把手教你实现简单时间轮算法

发布时间:2023-09-07 16:15:36 作者:yexindonglai@163.com 阅读(1363)

1、时间轮概述

传统定时任务

如果想定期执行一个操作,只需要起一个定时器,设置时间间隔,然后时间达到之后就执行任务,一个定时器对应一个任务,如果任务很少的情况下这样做没什么问题,但是任务有成千上万个的时候,就得起很多的定时器,不断轮询,这对系统的内存和cpu都产生了很大的压力,程序还没开始跑呢,定时器已经满天飞了…

以下是纯手写的简单定时器代码,用死循环执行,因为一个定时器只针对一个任务,所以开销巨大,不建议使用

  1. public void job(){
  2. while(true){
  3. sleep(延时N秒);
  4. ... 执行任务
  5. }
  6. }

时间轮算法

时间轮算法的核心是,轮询线程不再是负责遍历所有任务,而只在负责处于其当前时间上的任务.就像钟表一样,时间轮有一个指针会一直旋转,每转到一个新的时间,就去执行挂在这一时刻下的所有任务,当然,这些任务通常是交给其他线程去做,指针要做的只是继续往下转,不会关心任务的执行进度.

下面,我们就从一个最简单的定时任务来一步步优化,看看时间轮到底是怎么设计出来的

我们设置一个环状的时间队列,队列上的每一个元素,我们称为一个槽(slot),这个槽所对应的时间在整个时间轮里是唯一的, 根据前面的分析也能知道,槽里面是放任务的,而且肯定不会放一个任务,而是放一个任务队列. 下图就是一个时间轮,这个轮子上有60个槽,用来模拟一分钟的定时任务,从0开始,每过一秒钟后,指针就调到下一个槽中,到59秒后,下一面又回到0秒,这样就实现了不停地转圈;

那么问题来了, 这个时间轮用什么数据结构来实现呢? 没错,就是用数组,所以我们说是轮子, 但是可以将他展开来看,就可以得到如下数组结构

刚刚说了, 每个槽都是用来放任务列表的,所以任务列表我们可以用队列或者链表来实现,也就是如下结构

2、定时任务流程

好,现在万事具备,只欠东风,刚刚已经确定好定时时间轮的数据结构了,那么我们要怎么启动它呢?首先我们来分析一下:
1、定时任务肯定是需要启动的嘛,所以需要一个 start() 方法;
2、除此之外,肯定还有一个添加任务的入口,所以需要一个 add()方法;
3、需要运行定时器和任务,所以需要2个线程池,一个用来运行定时器,另一个用来运行任务
4、时间轮本身就是一个数组,在这边我们用CopyOnWriteArrayList来实现,因为它是线程安全的,我们使用了线程池,肯定要考虑到线程安全
5、时间轮上的每个槽都对应一个任务列表,这个任务列表我们用 LinkedBlockingQueue 来实现,这个队列本身也是线程安全的,并且它是一个单向链表;

以上就是我们在开发代码之前需要准备的东西,现在东西都准确好了, 在来看看整个定时任务的流程图吧

3、上代码

以下代码中,博主还增加了一个自动启停定时任务的功能,就是时间轮转完一圈后,在统一扫描下每个槽中的任务数量,如果所有的槽都没有任务了,那么轮询这个时间轮的操作就会暂停,进入阻塞状态,当我添加新任务时,轮询的操作又会继续运行,这个功能不是必须的,但也是为了减少cpu资源的消耗而做的优化;

Task.java 用来存放任务

  1. package com.timerTask.single;
  2. import lombok.Data;
  3. import java.util.Date;
  4. /**
  5. * 任务
  6. */
  7. @Data
  8. public class Task {
  9. // 任务执行时间
  10. private Date time;
  11. // 需要执行的任务任务
  12. private Runnable runnable;
  13. }

TaskSingleTimeWheel.java 定时任务实现类

  1. package com.timerTask.single;
  2. import com.google.common.collect.Lists;
  3. import lombok.extern.slf4j.Slf4j;
  4. import java.text.SimpleDateFormat;
  5. import java.util.*;
  6. import java.util.concurrent.*;
  7. import java.util.concurrent.locks.LockSupport;
  8. /**
  9. * 定时任务:单层时间轮
  10. */
  11. @Slf4j
  12. public class TaskSingleTimeWheel {
  13. // 时间轮
  14. private List<LinkedBlockingQueue<Task>> wheelQueue;
  15. // 时间轮线程
  16. private ThreadPoolExecutor wheelThreadService = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
  17. // 执行任务的线程
  18. private ThreadPoolExecutor taskThreadService = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
  19. // 正在阻塞的线程,如果有线程调用LockSupport.park()方法,那么blockThread一定有值
  20. private static volatile Thread blockThread ;
  21. public TaskSingleTimeWheel(){
  22. wheelQueue = Lists.newCopyOnWriteArrayList();
  23. for (int i = 0; i < 60; i++) {
  24. wheelQueue.add(new LinkedBlockingQueue<>());
  25. }
  26. }
  27. // 添加任务
  28. public void add(Task task) {
  29. log.info("添加任务...");
  30. // 计算槽位,获取当前时间的秒数,放到对应的槽位
  31. int second = getCurrentDateSecond(task.getTime());
  32. LinkedBlockingQueue<Task> tasks = wheelQueue.get(second);
  33. if(!tasks.offer(task)){
  34. log.error("添加任务失败:{}",task);
  35. return;
  36. }
  37. if(null != blockThread){
  38. // 解除阻塞
  39. log.info("添加任务成功,解除阻塞");
  40. LockSupport.unpark(blockThread);
  41. blockThread= null;
  42. }
  43. }
  44. // 启动定时任务
  45. public void start() {
  46. // 线程总数 - 已完成的线程数 = 正在运行的线程数量
  47. if( wheelThreadService.getTaskCount() > wheelThreadService.getCompletedTaskCount()){
  48. log.info("定时任务已启动,请不要重复启动");
  49. return;
  50. }
  51. wheelThreadService.execute(()->{
  52. // 与当前机器时间同步
  53. for (;;){
  54. int index = getCurrentDateSecond(new Date());
  55. LinkedBlockingQueue<Task> tasks = wheelQueue.get(index);
  56. log.info("第{}秒的任务数量有{}个",index,tasks.size());
  57. if(!tasks.isEmpty()){
  58. runTask(tasks);
  59. }
  60. // 延时一秒后遍历下一个槽
  61. sleep();
  62. if(index >= wheelQueue.size()-1){
  63. // 阻塞当前线程
  64. blockThisThread();
  65. }
  66. }
  67. });
  68. log.info("开启,getCompletedTaskCount:{},getTaskCount:{}",wheelThreadService.getCompletedTaskCount(),wheelThreadService.getTaskCount());
  69. }
  70. // 判断队列中是否有任务,若没有任务则阻塞线程,避免不必要的开销
  71. private void blockThisThread() {
  72. // 所有的槽中都没有可执行的任务,阻塞当前线程
  73. if(0== wheelQueue.stream().mapToLong(item-> item.stream().count()).sum()){
  74. // 阻塞当前线程
  75. log.info("所有的槽中都没有可执行的任务,进行阻塞操作");
  76. blockThread = Thread.currentThread();
  77. LockSupport.park();
  78. }
  79. }
  80. /**
  81. * 运行任务
  82. * @param tasks
  83. */
  84. private void runTask(LinkedBlockingQueue<Task> tasks) {
  85. while (true){
  86. Task task = tasks.poll();
  87. if(null == task){
  88. break;
  89. }
  90. log.info("开始执行任务,时间点:{}",new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(task.getTime()));
  91. // 运行
  92. taskThreadService.execute(task.getRunnable());
  93. }
  94. }
  95. // 睡眠一段时间
  96. public void sleep(long ms){
  97. try {
  98. Thread.sleep(ms);
  99. } catch (InterruptedException e) {
  100. log.error("睡眠异常",e);
  101. throw new RuntimeException(e);
  102. }
  103. }
  104. // 睡眠一秒
  105. public void sleep(){
  106. sleep(1000);
  107. }
  108. /**
  109. * 获取当前时间的秒数(计算槽位)
  110. * @param time
  111. * @return
  112. */
  113. private int getCurrentDateSecond(Date time){
  114. Calendar calendar = Calendar.getInstance();
  115. calendar.setTime(time);
  116. // 因为时间轮是以秒为单位来计算的,所以获取当前时间的秒数
  117. int i = calendar.get(Calendar.SECOND);
  118. return i;
  119. }
  120. /**
  121. * 获取当前时间的ms数
  122. * @param time
  123. * @return
  124. */
  125. private int getCurrentDateMs(Date time){
  126. Calendar calendar = Calendar.getInstance();
  127. calendar.setTime(time);
  128. // 因为时间轮是以秒为单位来计算的,所以获取当前时间的秒数
  129. int i = calendar.get(Calendar.MILLISECOND);
  130. log.info("当前时间的秒数为:{}",i);
  131. return i;
  132. }
  133. }

Main.java 启动类

  1. package com.timerTask.single;
  2. import lombok.extern.slf4j.Slf4j;
  3. import java.util.Date;
  4. @Slf4j
  5. public class Main {
  6. public static void main(String[] args) throws InterruptedException {
  7. TaskSingleTimeWheel taskSingleTimeWheel = new TaskSingleTimeWheel();
  8. taskSingleTimeWheel.start();
  9. Task task = new Task();
  10. task.setTime(new Date());
  11. task.setRunnable(() -> {
  12. log.info("我是任务,我正在执行中");
  13. });
  14. taskSingleTimeWheel.add(task);
  15. taskSingleTimeWheel.add(task);
  16. taskSingleTimeWheel.add(task);
  17. taskSingleTimeWheel.add(task);
  18. taskSingleTimeWheel.sleep(120000);
  19. taskSingleTimeWheel.add(task);
  20. // 等待其他线程执行完
  21. Thread.currentThread().join();
  22. }
  23. }

关键字定时任务