spring schedule任务调度方式

spring schedule任务调度

启用 spring 的任务调度功能需要使用@enablescheduling注解,该注解会引入scheduledannotationbeanpostprocessor。

beanprocessor是一个bean后置处理器,负责扫描带有 @scheduled 注解的方法,将其转换为可执行的任务,并根据注解的属性将其注册到 taskscheduler 中进行管理和执行。

这样,开发者只需要在普通 spring bean 的方法上添加 @scheduled 注解,spring 就能自动地按照指定的时间策略执行这些方法,而无需手动创建和管理线程。其内部有一个registrar是 scheduledtaskregistrar用来注册任务。

查找 @scheduled 注解

在scheduledannotationbeanpostprocessor.postprocessafterinitialization()方法处理所有的@scheduled,具体处理每个注解方法是

public object postprocessafterinitialization(object bean, string beanname) {
		if (bean instanceof aopinfrastructurebean || bean instanceof taskscheduler ||
				bean instanceof scheduledexecutorservice) {
			// ignore aop infrastructure such as scoped proxies.
			return bean;
		}
		//解析所有的@scheduled注解
		class targetclass = aopproxyutils.ultimatetargetclass(bean);
		if (!this.nonannotatedclasses.contains(targetclass) &&
				annotationutils.iscandidateclass(targetclass, arrays.aslist(scheduled.class, schedules.class))) {
			map> annotatedmethods = methodintrospector.selectmethods(targetclass,
					(methodintrospector.metadatalookup>) method -> {
						set scheduledannotations = annotatedelementutils.getmergedrepeatableannotations(
								method, scheduled.class, schedules.class);
						return (!scheduledannotations.isempty() ? scheduledannotations : null);
					});
			if (annotatedmethods.isempty()) {
				this.nonannotatedclasses.add(targetclass);
							}
			else {
				// 调用processscheduled()方法初始化调度任务
				annotatedmethods.foreach((method, scheduledannotations) ->
						scheduledannotations.foreach(scheduled -> processscheduled(scheduled, method, bean)));
				
			}
		}
		return bean;
	}
processscheduled(scheduled scheduled, method method, object bean)
  • scheduled:注解配置任务周期相关信息
  • method:注解所在方法
  • bean:注解所在实例对象。有bean和method就可以通过反射 进行方法调用。

processscheduled()方法首先将schedueld注解上的方法封装传给你一个runnable任务,然后

封装任务

protected runnable createrunnable(object target, method method) {
  //@scheduled 注解修饰的方法必须是无参的
		assert.istrue(method.getparametercount() == 0, "only no-arg methods may be annotated with @scheduled");
		method invocablemethod = aoputils.selectinvocablemethod(method, target.getclass());
		return new scheduledmethodrunnable(target, invocablemethod);
	}

这里就是将实例方法包装成一个scheduledmethodrunnable对象,

scheduledmethodrunnable.run方法就是通过反射调用该方法。

reflectionutils.makeaccessible(this.method);
this.method.invoke(this.target);

注册任务

下一步会解析 @scheduled 注解中的属性,如

  • fixedrate: 以固定的时间间隔(毫秒)执行任务,在上一次任务开始后等待指定的时间。
  • fixeddelay: 在上一次任务完成后,等待指定的时间(毫秒)再执行下一次任务。
  • cron: 使用 cron 表达式定义任务的执行时间。
  • initialdelay: 任务第一次执行前的延迟时间(毫秒)。

不同的类型会通过registrar不同方法进行注册。

this.registrar.schedulecrontask()
this.registrar.schedulefixeddelaytask()
this.registrar.schedulefixedratetask()

这里又引入了一个重要的类scheduledtaskregistrar来注册任务。

这里以schedulecrontask方法为例来看下cron表达式类型任务的注册:

public scheduledtask schedulecrontask(crontask task) {
		scheduledtask scheduledtask = this.unresolvedtasks.remove(task);
		boolean newtask = false;
		if (scheduledtask == null) {
			scheduledtask = new scheduledtask(task);
			newtask = true;
		}
  //taskscheduler是否初始化
		if (this.taskscheduler != null) {
          //创建任务
			scheduledtask.future = this.taskscheduler.schedule(task.getrunnable(), task.gettrigger());
		}
		else {//taskscheduler未初始化,将任务放到未处理列表里
			addcrontask(task);
			this.unresolvedtasks.put(task, scheduledtask);
		}
		return (newtask ? scheduledtask : null);
	}

入参是一个crontask类型,在上面的processscheduled()方法调用实例是

/**
runnable就是封装的scheduledmethodrunnable
*/
this.registrar.schedulecrontask(new crontask(runnable, new crontrigger(cron /**cron表达式*/, timezone)))

最后调用taskscheduler.schedule(task.getrunnable(), task.gettrigger())来启动任务。

taskscheduler

在看taskscheduler.schedule()方法前首先来看taskscheduler是怎么初始化的。

这还要看scheduledannotationbeanpostprocessor ,其实现了smartinitializingsingleton接口。在所有singleton bean 初始化完成后被调用aftersingletonsinstantiated()方法。该方法又会调用finishregistration();来完成惹我你的注册。

private void finishregistration() {
		if (this.scheduler != null) {
			this.registrar.setscheduler(this.scheduler);
		}

		if (this.beanfactory instanceof listablebeanfactory) {
			map beans =
					((listablebeanfactory) this.beanfactory).getbeansoftype(schedulingconfigurer.class);
			list configurers = new arraylist<>(beans.values());
			annotationawareordercomparator.sort(configurers);
			for (schedulingconfigurer configurer : configurers) {
				configurer.configuretasks(this.registrar);
			}
		}

		if (this.registrar.hastasks() && this.registrar.getscheduler() == null) {
			assert.state(this.beanfactory != null, "beanfactory must be set to find scheduler by type");
			try {
				// search for taskscheduler bean...
				this.registrar.settaskscheduler(resolveschedulerbean(this.beanfactory, taskscheduler.class, false));
			}
			catch (nouniquebeandefinitionexception ex) {
				
				try {
					this.registrar.settaskscheduler(resolveschedulerbean(this.beanfactory, taskscheduler.class, true));
				}
				catch (nosuchbeandefinitionexception ex2) {
					if (logger.isinfoenabled()) {
						logger.info("more than one taskscheduler bean exists within the context, and " +
								"none is named 'taskscheduler'. mark one of them as primary or name it 'taskscheduler' " +
								"(possibly as an alias); or implement the schedulingconfigurer interface and call " +
								"scheduledtaskregistrar#setscheduler explicitly within the configuretasks() callback: " +
								ex.getbeannamesfound());
					}
				}
			}
			catch (nosuchbeandefinitionexception ex) {
				
				// search for scheduledexecutorservice bean next...
				try {
					this.registrar.setscheduler(resolveschedulerbean(this.beanfactory, scheduledexecutorservice.class, false));
				}
				catch (nouniquebeandefinitionexception ex2) {
					
					try {
						this.registrar.setscheduler(resolveschedulerbean(this.beanfactory, scheduledexecutorservice.class, true));
					}
					catch (nosuchbeandefinitionexception ex3) {
						
					}
				}
				catch (nosuchbeandefinitionexception ex2) {
					
					// giving up -> falling back to default scheduler within the registrar...
					logger.info("no taskscheduler/scheduledexecutorservice bean found for scheduled processing");
				}
			}
		}

		this.registrar.afterpropertiesset();
	}

这里首先会从beanfacotry中查找schedulingconfigurer类型的bean,然后调用configuretasks来加载自定义schedule配置信息,这里入参是registrar,然后从容器中查找taskscheduler、scheduledexecutorservice类型的bean来初始化taskscheduler。最后调用registrar.afterpropertiesset()。这里还有一步兜底,如果schedule还是为空,则默认创建一个concurrenttaskscheduler类型的scheduler。

registrar.afterpropertiesset()方法

	protected void scheduletasks() {
		if (this.taskscheduler == null) {
			this.localexecutor = executors.newsinglethreadscheduledexecutor();
			this.taskscheduler = new concurrenttaskscheduler(this.localexecutor);
		}
		if (this.triggertasks != null) {
			for (triggertask task : this.triggertasks) {
				addscheduledtask(scheduletriggertask(task));
			}
		}
		if (this.crontasks != null) {
			for (crontask task : this.crontasks) {
				addscheduledtask(schedulecrontask(task));
			}
		}
		if (this.fixedratetasks != null) {
			for (intervaltask task : this.fixedratetasks) {
				addscheduledtask(schedulefixedratetask(task));
			}
		}
		if (this.fixeddelaytasks != null) {
			for (intervaltask task : this.fixeddelaytasks) {
				addscheduledtask(schedulefixeddelaytask(task));
			}
		}
	}

任务执行

回过头来继续看taskscheduler.schedule()任务的注册,这里就看默认的concurrenttaskscheduler.schedule()方法。

public scheduledfuture schedule(runnable task, trigger trigger) {
		try {
			if (this.enterpriseconcurrentscheduler) {
				return new enterpriseconcurrenttriggerscheduler().schedule(decoratetask(task, true), trigger);
			}
			else {
				errorhandler errorhandler =
						(this.errorhandler != null ? this.errorhandler : taskutils.getdefaulterrorhandler(true));
				return new reschedulingrunnable(task, trigger, this.clock, this.scheduledexecutor, errorhandler).schedule();
			}
		}
		catch (rejectedexecutionexception ex) {
			throw new taskrejectedexception("executor [" + this.scheduledexecutor + "] did not accept task: " + task, ex);
		}
	}

最后调用reschedulingrunnable(task, trigger, this.clock, executor, errorhandler).schedule()

来看reschedulingrunnable的schedule方法

public scheduledfuture schedule() {
		synchronized (this.triggercontextmonitor) {
			this.scheduledexecutiontime = this.trigger.nextexecutiontime(this.triggercontext);
			if (this.scheduledexecutiontime == null) {
				return null;
			}
			long initialdelay = this.scheduledexecutiontime.gettime() - this.triggercontext.getclock().millis();
			this.currentfuture = this.executor.schedule(this, initialdelay, timeunit.milliseconds);
			return this;
		}
	}

计算好下次执行时间initialdelay,使用线程池executor延迟执行当前reschedulingrunnable。

run方法

public void run() {
		date actualexecutiontime = new date(this.triggercontext.getclock().millis());
		super.run();
		date completiontime = new date(this.triggercontext.getclock().millis());
		synchronized (this.triggercontextmonitor) {
			assert.state(this.scheduledexecutiontime != null, "no scheduled execution");
			this.triggercontext.update(this.scheduledexecutiontime, actualexecutiontime, completiontime);
			if (!obtaincurrentfuture().iscancelled()) {
				schedule();
			}
		}
	}

这里super.run()就是调用reschedulingrunnable extends delegatingerrorhandlingrunnable构造方法创建传入的task,也就是原始schedule注解方法。

super.run()就是delegatingerrorhandlingrunnable的run方法

public delegatingerrorhandlingrunnable(runnable delegate, errorhandler errorhandler) {
		assert.notnull(delegate, "delegate must not be null");
		assert.notnull(errorhandler, "errorhandler must not be null");
		this.delegate = delegate;
		this.errorhandler = errorhandler;
	}
public void run() {
		try {
			this.delegate.run();
		}
		catch (undeclaredthrowableexception ex) {
			this.errorhandler.handleerror(ex.getundeclaredthrowable());
		}
		catch (throwable ex) {
			this.errorhandler.handleerror(ex);
		}
	}

回到reschedulingrunnable的run方法,在执行完被代理task后,如果任务没有被取消,又调用schedule()方法进行下一次任务执行。这样就完成了任务的周期性执行。

怎么动态控制定时任务?

如果想取消或修改某个任务执行周期,这个时候该如何做呢?

这个时候可以使用上面说的schedulingconfigurer接口,该接口回暴露scheduledtaskregistrar类实例。上面代码分析可以看到,所有的任务都是通过该类进行初始化的,通过该类可以动态的添加任务。并且schedule()方法返回的是一个scheduledfuture,可以通过调用cancel方法来取消任务。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

发布于 2025-05-07 22:17:51
分享
海报
117
上一篇:spring IOC的理解之原理和实现过程 下一篇:SpringBoot配置文件之properties和yml的使用
目录

    忘记密码?

    图形验证码