SpringBoot定时任务功能怎么实现

SpringBoot定时任务功能怎么实现

本篇内容介绍了“SpringBoot定时任务功能怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

SpringBoot定时任务功能怎么实现

一 背景

项目中需要一个可以动态新增定时定时任务的功能,现在项目中使用的是xxl-job定时任务调度系统,但是经过一番对xxl-job功能的了解,发现xxl-job对项目动态新增定时任务,动态删除定时任务的支持并不是那么好,所以需要自己手动实现一个定时任务的功能

二 动态定时任务调度

1 技术选择

Timer or ScheduledExecutorService

这两个都能实现定时任务调度,先看下Timer的定时任务调度

publicclassMyTimerTaskextendsTimerTask{privateStringname;publicMyTimerTask(Stringname){this.name=name;}publicStringgetName(){returnname;}publicvoidsetName(Stringname){this.name=name;}@Overridepublicvoidrun(){//taskCalendarinstance=Calendar.getInstance();System.out.println(newSimpleDateFormat("yyyy-MM-ddHH:mm:ss").format(instance.getTime()));}}Timertimer=newTimer();MyTimerTasktimerTask=newMyTimerTask("NO.1");//首次执行,在当前时间的1秒以后,之后每隔两秒钟执行一次timer.schedule(timerTask,1000L,2000L);

在看下ScheduledThreadPoolExecutor的实现

//org.apache.commons.lang3.concurrent.BasicThreadFactoryScheduledExecutorServiceexecutorService=newScheduledThreadPoolExecutor(1,newBasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());executorService.scheduleAtFixedRate(newRunnable(){@Overridepublicvoidrun(){//dosomething}},initialDelay,period,TimeUnit.HOURS);

两个都能实现定时任务,那他们的区别呢,使用阿里p3c会给出建议和区别

多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。

从建议上来看,是一定要选择ScheduledExecutorService了,我们看看源码看看为什么Timer出现问题会终止执行

/***Thetimerthread.*/privatefinalTimerThreadthread=newTimerThread(queue);publicTimer(){this("Timer-"+serialNumber());}publicTimer(Stringname){thread.setName(name);thread.start();}

新建对象时,我们看到开启了一个线程,那么这个线程在做什么呢?一起看看

classTimerThreadextendsThread{booleannewTasksMayBeScheduled=true;/***每一件一个任务都是一个quene*/privateTaskQueuequeue;TimerThread(TaskQueuequeue){this.queue=queue;}publicvoidrun(){try{mainLoop();}finally{//SomeonekilledthisThread,behaveasifTimercancelledsynchronized(queue){newTasksMayBeScheduled=false;queue.clear();//清除所有任务信息}}}/***Themaintimerloop.(Seeclasscomment.)*/privatevoidmainLoop(){while(true){try{TimerTasktask;booleantaskFired;synchronized(queue){//Waitforqueuetobecomenon-emptywhile(queue.isEmpty()&&newTasksMayBeScheduled)queue.wait();if(queue.isEmpty())break;//Queueisemptyandwillforeverremain;die//Queuenonempty;lookatfirstevtanddotherightthinglongcurrentTime,executionTime;task=queue.getMin();synchronized(task.lock){if(task.state==TimerTask.CANCELLED){queue.removeMin();continue;//Noactionrequired,pollqueueagain}currentTime=System.currentTimeMillis();executionTime=task.nextExecutionTime;if(taskFired=(executionTime<=currentTime)){if(task.period==0){//Non-repeating,removequeue.removeMin();task.state=TimerTask.EXECUTED;}else{//Repeatingtask,reschedulequeue.rescheduleMin(task.period<0?currentTime-task.period:executionTime+task.period);}}}if(!taskFired)//Taskhasn'tyetfired;waitqueue.wait(executionTime-currentTime);}if(taskFired)//Taskfired;runit,holdingnolockstask.run();}catch(InterruptedExceptione){}}}}

我们看到,执行了 mainLoop(),里面是 while (true)方法无限循环,获取程序中任务对象中的时间和当前时间比对,相同就执行,但是一旦报错,就会进入finally中清除掉所有任务信息。

这时候我们已经找到了答案,timer是在被实例化后,启动一个线程,不间断的循环匹配,来执行任务,他是单线程的,一旦报错,线程就终止了,所以不会执行后续的任务,而ScheduledThreadPoolExecutor是多线程执行的,就算其中有一个任务报错了,并不影响其他线程的执行。

2 使用ScheduledThreadPoolExecutor

从上面看,使用ScheduledThreadPoolExecutor还是比较简单的,但是我们要实现的更优雅一些,所以选择 TaskScheduler来实现

@ComponentpublicclassCronTaskRegistrarimplementsDisposableBean{privatefinalMap<Runnable,ScheduledTask>scheduledTasks=newConcurrentHashMap<>(16);@AutowiredprivateTaskSchedulertaskScheduler;publicTaskSchedulergetScheduler(){returnthis.taskScheduler;}publicvoidaddCronTask(Runnabletask,StringcronExpression){addCronTask(newCronTask(task,cronExpression));}privatevoidaddCronTask(CronTaskcronTask){if(cronTask!=null){Runnabletask=cronTask.getRunnable();if(this.scheduledTasks.containsKey(task)){removeCronTask(task);}this.scheduledTasks.put(task,scheduleCronTask(cronTask));}}publicvoidremoveCronTask(Runnabletask){Set<Runnable>runnables=this.scheduledTasks.keySet();Iteratorit1=runnables.iterator();while(it1.hasNext()){SchedulingRunnableschedulingRunnable=(SchedulingRunnable)it1.next();LongtaskId=schedulingRunnable.getTaskId();SchedulingRunnablecancelRunnable=(SchedulingRunnable)task;if(taskId.equals(cancelRunnable.getTaskId())){ScheduledTaskscheduledTask=this.scheduledTasks.remove(schedulingRunnable);if(scheduledTask!=null){scheduledTask.cancel();}}}}publicScheduledTaskscheduleCronTask(CronTaskcronTask){ScheduledTaskscheduledTask=newScheduledTask();scheduledTask.future=this.taskScheduler.schedule(cronTask.getRunnable(),cronTask.getTrigger());returnscheduledTask;}@Overridepublicvoiddestroy()throwsException{for(ScheduledTasktask:this.scheduledTasks.values()){task.cancel();}this.scheduledTasks.clear();}}

TaskScheduler是本次功能实现的核心类,但是他是一个接口

publicinterfaceTaskScheduler{/***Schedulethegiven{@linkRunnable},invokingitwheneverthetrigger*indicatesanextexecutiontime.*<p>Executionwillendoncetheschedulershutsdownorthereturned*{@linkScheduledFuture}getscancelled.*@paramtasktheRunnabletoexecutewheneverthetriggerfires*@paramtriggeranimplementationofthe{@linkTrigger}interface,*e.g.a{@linkorg.springframework.scheduling.support.CronTrigger}object*wrappingacronexpression*@returna{@linkScheduledFuture}representingpendingcompletionofthetask,*or{@codenull}ifthegivenTriggerobjectneverfires(i.e.returns*{@codenull}from{@linkTrigger#nextExecutionTime})*@throwsorg.springframework.core.task.TaskRejectedExceptionifthegiventaskwasnotaccepted*forinternalreasons(e.g.apooloverloadhandlingpolicyorapoolshutdowninprogress)*@seeorg.springframework.scheduling.support.CronTrigger*/@NullableScheduledFuture<?>schedule(Runnabletask,Triggertrigger);

前面的代码可以看到,我们在类中注入了这个类,但是他是接口,我们怎么知道是那个实现类呢,以往出现这种情况要在类上面加@Primany或者@Quality来执行实现的类,但是我们看到我的注入上并没有标记,因为是通过另一种方式实现的

@ConfigurationpublicclassSchedulingConfig{@BeanpublicTaskSchedulertaskScheduler(){ThreadPoolTaskSchedulertaskScheduler=newThreadPoolTaskScheduler();//定时任务执行线程池核心线程数taskScheduler.setPoolSize(4);taskScheduler.setRemoveOnCancelPolicy(true);taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-");returntaskScheduler;}}

在spring初始化时就注册了Bean TaskScheduler,而我们可以看到他的实现是ThreadPoolTaskScheduler,在网上的资料中有人说ThreadPoolTaskScheduler是TaskScheduler的默认实现类,其实不是,还是需要我们去指定,而这种方式,当我们想替换实现时,只需要修改配置类就行了,很灵活。

而为什么说他是更优雅的实现方式呢,因为他的核心也是通过ScheduledThreadPoolExecutor来实现的

publicScheduledExecutorServicegetScheduledExecutor()throwsIllegalStateException{Assert.state(this.scheduledExecutor!=null,"ThreadPoolTaskSchedulernotinitialized");returnthis.scheduledExecutor;}

三 多节点任务执行问题

这次的实现过程中,我并没有选择xxl-job来进行实现,而是采用了TaskScheduler来实现,这也产生了一个问题,xxl-job是分布式的程序调度系统,当想要执行定时任务的应用使用xxl-job时,无论应用程序中部署多少个节点,xxl-job只会选择其中一个节点作为定时任务执行的节点,从而不会产生定时任务在不同节点上同时执行,导致重复执行问题,而使用TaskScheduler来实现,就要考虑多节点重复执行问题。当然既然有问题,就有解决方案

&middot; 方案一 将定时任务功能拆出来单独部署,且只部署一个节点 &middot; 方案二 使用redis setNx的形式,保证同一时间只有一个任务在执行

我选择的是方案二来执行,当然还有一些方式也能保证不重复执行,这里就不多说了,一下是我的实现

publicvoidexecuteTask(LongtaskId){if(!redisService.setIfAbsent(String.valueOf(taskId),"1",2L,TimeUnit.SECONDS)){log.info("已有执行中定时发送短信任务,本次不执行!");return;}

“SpringBoot定时任务功能怎么实现”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注恰卡编程网网站,小编将为大家输出更多高质量的实用文章!

发布于 2022-05-19 10:37:15
收藏
分享
海报
0 条评论
18
上一篇:thinkphp如何用中间件记录行为日志 下一篇:C++类和对象之封装及class与struct的区别是什么
目录

    0 条评论

    本站已关闭游客评论,请登录或者注册后再评论吧~

    忘记密码?

    图形验证码