spring schedule任务调度方式
spring schedule任务调度
启用 spring 的任务调度功能需要使用@enablescheduling注解,该注解会引入scheduledannotationbeanpostprocessor。
beanprocessor是一个bean后置处理器,负责扫描带有 @scheduled 注解的方法,将其转换为可执行的任务,并根据注解的属性将其注册到 taskscheduler 中进行管理和执行。
这样,开发者只需要在普通 spring bean 的方法上添加 @scheduled 注解,spring 就能自动地按照指定的时间策略执行这些方法,而无需手动创建和管理线程。其内部有一个registrar是 scheduledtaskregistrar用来注册任务。
查找 @scheduled 注解
在scheduledannotationbeanpostprocessor.postprocessafterinitialization()方法处理所有的@scheduled,具体处理每个注解方法是
public object postprocessafterinitialization(object bean, string beanname) { if (bean instanceof aopinfrastructurebean || bean instanceof taskscheduler || bean instanceof scheduledexecutorservice) { // ignore aop infrastructure such as scoped proxies. return bean; } //解析所有的@scheduled注解 class> targetclass = aopproxyutils.ultimatetargetclass(bean); if (!this.nonannotatedclasses.contains(targetclass) && annotationutils.iscandidateclass(targetclass, arrays.aslist(scheduled.class, schedules.class))) { map> annotatedmethods = methodintrospector.selectmethods(targetclass, (methodintrospector.metadatalookup >) method -> { set scheduledannotations = annotatedelementutils.getmergedrepeatableannotations( method, scheduled.class, schedules.class); return (!scheduledannotations.isempty() ? scheduledannotations : null); }); if (annotatedmethods.isempty()) { this.nonannotatedclasses.add(targetclass); } else { // 调用processscheduled()方法初始化调度任务 annotatedmethods.foreach((method, scheduledannotations) -> scheduledannotations.foreach(scheduled -> processscheduled(scheduled, method, bean))); } } return bean; }
processscheduled(scheduled scheduled, method method, object bean)
scheduled
:注解配置任务周期相关信息method
:注解所在方法bean
:注解所在实例对象。有bean和method就可以通过反射 进行方法调用。
processscheduled()方法首先将schedueld注解上的方法封装传给你一个runnable任务,然后
封装任务
protected runnable createrunnable(object target, method method) { //@scheduled 注解修饰的方法必须是无参的 assert.istrue(method.getparametercount() == 0, "only no-arg methods may be annotated with @scheduled"); method invocablemethod = aoputils.selectinvocablemethod(method, target.getclass()); return new scheduledmethodrunnable(target, invocablemethod); }
这里就是将实例方法包装成一个scheduledmethodrunnable对象,
scheduledmethodrunnable.run方法就是通过反射调用该方法。
reflectionutils.makeaccessible(this.method); this.method.invoke(this.target);
注册任务
下一步会解析 @scheduled 注解中的属性,如
fixedrate
: 以固定的时间间隔(毫秒)执行任务,在上一次任务开始后等待指定的时间。fixeddelay
: 在上一次任务完成后,等待指定的时间(毫秒)再执行下一次任务。cron
: 使用 cron 表达式定义任务的执行时间。initialdelay
: 任务第一次执行前的延迟时间(毫秒)。
不同的类型会通过registrar不同方法进行注册。
this.registrar.schedulecrontask() this.registrar.schedulefixeddelaytask() this.registrar.schedulefixedratetask()
这里又引入了一个重要的类scheduledtaskregistrar来注册任务。
这里以schedulecrontask方法为例来看下cron表达式类型任务的注册:
public scheduledtask schedulecrontask(crontask task) { scheduledtask scheduledtask = this.unresolvedtasks.remove(task); boolean newtask = false; if (scheduledtask == null) { scheduledtask = new scheduledtask(task); newtask = true; } //taskscheduler是否初始化 if (this.taskscheduler != null) { //创建任务 scheduledtask.future = this.taskscheduler.schedule(task.getrunnable(), task.gettrigger()); } else {//taskscheduler未初始化,将任务放到未处理列表里 addcrontask(task); this.unresolvedtasks.put(task, scheduledtask); } return (newtask ? scheduledtask : null); }
入参是一个crontask类型,在上面的processscheduled()方法调用实例是
/** runnable就是封装的scheduledmethodrunnable */ this.registrar.schedulecrontask(new crontask(runnable, new crontrigger(cron /**cron表达式*/, timezone)))
最后调用taskscheduler.schedule(task.getrunnable(), task.gettrigger())来启动任务。
taskscheduler
在看taskscheduler.schedule()方法前首先来看taskscheduler是怎么初始化的。
这还要看scheduledannotationbeanpostprocessor ,其实现了smartinitializingsingleton接口。在所有singleton bean 初始化完成后被调用aftersingletonsinstantiated()方法。该方法又会调用finishregistration();来完成惹我你的注册。
private void finishregistration() { if (this.scheduler != null) { this.registrar.setscheduler(this.scheduler); } if (this.beanfactory instanceof listablebeanfactory) { mapbeans = ((listablebeanfactory) this.beanfactory).getbeansoftype(schedulingconfigurer.class); list configurers = new arraylist<>(beans.values()); annotationawareordercomparator.sort(configurers); for (schedulingconfigurer configurer : configurers) { configurer.configuretasks(this.registrar); } } if (this.registrar.hastasks() && this.registrar.getscheduler() == null) { assert.state(this.beanfactory != null, "beanfactory must be set to find scheduler by type"); try { // search for taskscheduler bean... this.registrar.settaskscheduler(resolveschedulerbean(this.beanfactory, taskscheduler.class, false)); } catch (nouniquebeandefinitionexception ex) { try { this.registrar.settaskscheduler(resolveschedulerbean(this.beanfactory, taskscheduler.class, true)); } catch (nosuchbeandefinitionexception ex2) { if (logger.isinfoenabled()) { logger.info("more than one taskscheduler bean exists within the context, and " + "none is named 'taskscheduler'. mark one of them as primary or name it 'taskscheduler' " + "(possibly as an alias); or implement the schedulingconfigurer interface and call " + "scheduledtaskregistrar#setscheduler explicitly within the configuretasks() callback: " + ex.getbeannamesfound()); } } } catch (nosuchbeandefinitionexception ex) { // search for scheduledexecutorservice bean next... try { this.registrar.setscheduler(resolveschedulerbean(this.beanfactory, scheduledexecutorservice.class, false)); } catch (nouniquebeandefinitionexception ex2) { try { this.registrar.setscheduler(resolveschedulerbean(this.beanfactory, scheduledexecutorservice.class, true)); } catch (nosuchbeandefinitionexception ex3) { } } catch (nosuchbeandefinitionexception ex2) { // giving up -> falling back to default scheduler within the registrar... logger.info("no taskscheduler/scheduledexecutorservice bean found for scheduled processing"); } } } this.registrar.afterpropertiesset(); }
这里首先会从beanfacotry中查找schedulingconfigurer类型的bean,然后调用configuretasks来加载自定义schedule配置信息,这里入参是registrar,然后从容器中查找taskscheduler、scheduledexecutorservice类型的bean来初始化taskscheduler。最后调用registrar.afterpropertiesset()。这里还有一步兜底,如果schedule还是为空,则默认创建一个concurrenttaskscheduler类型的scheduler。
registrar.afterpropertiesset()方法
protected void scheduletasks() { if (this.taskscheduler == null) { this.localexecutor = executors.newsinglethreadscheduledexecutor(); this.taskscheduler = new concurrenttaskscheduler(this.localexecutor); } if (this.triggertasks != null) { for (triggertask task : this.triggertasks) { addscheduledtask(scheduletriggertask(task)); } } if (this.crontasks != null) { for (crontask task : this.crontasks) { addscheduledtask(schedulecrontask(task)); } } if (this.fixedratetasks != null) { for (intervaltask task : this.fixedratetasks) { addscheduledtask(schedulefixedratetask(task)); } } if (this.fixeddelaytasks != null) { for (intervaltask task : this.fixeddelaytasks) { addscheduledtask(schedulefixeddelaytask(task)); } } }
任务执行
回过头来继续看taskscheduler.schedule()任务的注册,这里就看默认的concurrenttaskscheduler.schedule()方法。
public scheduledfuture> schedule(runnable task, trigger trigger) { try { if (this.enterpriseconcurrentscheduler) { return new enterpriseconcurrenttriggerscheduler().schedule(decoratetask(task, true), trigger); } else { errorhandler errorhandler = (this.errorhandler != null ? this.errorhandler : taskutils.getdefaulterrorhandler(true)); return new reschedulingrunnable(task, trigger, this.clock, this.scheduledexecutor, errorhandler).schedule(); } } catch (rejectedexecutionexception ex) { throw new taskrejectedexception("executor [" + this.scheduledexecutor + "] did not accept task: " + task, ex); } }
最后调用reschedulingrunnable(task, trigger, this.clock, executor, errorhandler).schedule()
来看reschedulingrunnable的schedule方法
public scheduledfuture> schedule() { synchronized (this.triggercontextmonitor) { this.scheduledexecutiontime = this.trigger.nextexecutiontime(this.triggercontext); if (this.scheduledexecutiontime == null) { return null; } long initialdelay = this.scheduledexecutiontime.gettime() - this.triggercontext.getclock().millis(); this.currentfuture = this.executor.schedule(this, initialdelay, timeunit.milliseconds); return this; } }
计算好下次执行时间initialdelay,使用线程池executor延迟执行当前reschedulingrunnable。
run方法
public void run() { date actualexecutiontime = new date(this.triggercontext.getclock().millis()); super.run(); date completiontime = new date(this.triggercontext.getclock().millis()); synchronized (this.triggercontextmonitor) { assert.state(this.scheduledexecutiontime != null, "no scheduled execution"); this.triggercontext.update(this.scheduledexecutiontime, actualexecutiontime, completiontime); if (!obtaincurrentfuture().iscancelled()) { schedule(); } } }
这里super.run()就是调用reschedulingrunnable extends delegatingerrorhandlingrunnable构造方法创建传入的task,也就是原始schedule注解方法。
super.run()就是delegatingerrorhandlingrunnable的run方法
public delegatingerrorhandlingrunnable(runnable delegate, errorhandler errorhandler) { assert.notnull(delegate, "delegate must not be null"); assert.notnull(errorhandler, "errorhandler must not be null"); this.delegate = delegate; this.errorhandler = errorhandler; } public void run() { try { this.delegate.run(); } catch (undeclaredthrowableexception ex) { this.errorhandler.handleerror(ex.getundeclaredthrowable()); } catch (throwable ex) { this.errorhandler.handleerror(ex); } }
回到reschedulingrunnable的run方法,在执行完被代理task后,如果任务没有被取消,又调用schedule()方法进行下一次任务执行。这样就完成了任务的周期性执行。
怎么动态控制定时任务?
如果想取消或修改某个任务执行周期,这个时候该如何做呢?
这个时候可以使用上面说的schedulingconfigurer接口,该接口回暴露scheduledtaskregistrar类实例。上面代码分析可以看到,所有的任务都是通过该类进行初始化的,通过该类可以动态的添加任务。并且schedule()方法返回的是一个scheduledfuture,可以通过调用cancel方法来取消任务。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。
推荐阅读
-
IDEA中使用Gradle构建项目中文报GBK错误的解决方案
-
将Java应用做成exe可执行软件的流程步骤
-
SpringBoot实现多种来源的Zip多层目录打包下载
需要将一批文件(可能分布在不同目录、不同来源)打包成zip格式,按目录结构导出给用户下载。1.核心思路支持将本地服务器上的文...
-
Java中减少if-else的设计模式和优化技巧
前言“过于依赖if-else不仅会让代码变得臃肿不堪,还会使维护成本大大增加。其实,if-else虽然是最基础的条件分支,...
-
Spring Boot 中使用 Drools 规则引擎的完整步骤
-
Spring Boot整合Drools规则引擎实战指南及最佳实践
一、drools简介与核心概念1.1什么是drools?drools是redhat旗下的开源业务规则管理系统(brms),...
-
Springboot项目瘦身之如何将jar包与lib依赖分开打包
将jar包与lib依赖分开打包方法一:项目和依赖完全分离maven-jar-plugin负责生成jar文件(jar文件中...
-
Spring动态修改bean属性配置key的几种方法
静态配置的局限性先来看一个典型场景。假设我们有一个数据源配置类:@configuration@configurationpr...
-
Java如何判断一个IP是否在给定的网段内
-
从零开始学java之二叉树和哈希表实现代码