怎么在Python中利用APScheduler实现一个定时任务
这篇文章将为大家详细讲解有关怎么在Python中利用APScheduler实现一个定时任务,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
一、安装APScheduler
pipinstallapscheduler
二、基本概念
APScheduler有四大组件:
1、触发器 triggers :触发器包含调度逻辑。每个作业都有自己的触发器,用于确定下一个任务何时运行。除了初始配置之外,触发器是完全无状态的。
有三种内建的trigger:
(1)date: 特定的时间点触发
(2)interval: 固定时间间隔触发
(3)cron: 在特定时间周期性地触发
2、任务储存器 job stores:用于存放任务,把任务存放在内存(为默认MemoryJobStore)或数据库中。
3、执行器 executors: 执行器是将任务提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。
4、调度器 schedulers: 把上方三个组件作为参数,通过创建调度器实例来运行
根据开发需求选择相应的组件,下面是不同的调度器组件:
BlockingScheduler 阻塞式调度器:适用于只跑调度器的程序。
BackgroundScheduler 后台调度器:适用于非阻塞的情况,调度器会在后台独立运行。
AsyncIOScheduler AsyncIO调度器,适用于应用使用AsnycIO的情况。
GeventScheduler Gevent调度器,适用于应用通过Gevent的情况。
TornadoScheduler Tornado调度器,适用于构建Tornado应用。
TwistedScheduler Twisted调度器,适用于构建Twisted应用。
QtScheduler Qt调度器,适用于构建Qt应用。
三、使用步骤
1、新建一个调度器schedulers
2、添加调度任务
3、运行调度任务
四、使用实例
1、触发器date
特定的时间点触发,只执行一次。参数如下:
参数 | 说明 |
run_date (datetime 或 str) | 作业的运行日期或时间 |
timezone (datetime.tzinfo 或 str) | 指定时区 |
使用例子:
fromdatetimeimportdatetime fromdatetimeimportdate fromapscheduler.schedulers.blockingimportBlockingScheduler defjob(text): print(text) scheduler=BlockingScheduler() #在2019-8-30运行一次job方法 scheduler.add_job(job,'date',run_date=date(2019,8,30),args=['text1']) #在2019-8-3001:00:00运行一次job方法 scheduler.add_job(job,'date',run_date=datetime(2019,8,30,1,0,0),args=['text2']) #在2019-8-3001:00:01运行一次job方法 scheduler.add_job(job,'date',run_date='2019-8-3001:00:00',args=['text3']) scheduler.start()
2、触发器interval
固定时间间隔触发。参数如下:
参数 | 说明 |
weeks (int) | 间隔几周 |
days (int) | 间隔几天 |
hours (int) | 间隔几小时 |
minutes (int) | 间隔几分钟 |
seconds (int) | 间隔多少秒 |
start_date (datetime 或 str) | 开始日期 |
end_date (datetime 或 str) | 结束日期 |
timezone (datetime.tzinfo 或str) |
使用例子:
importtime fromapscheduler.schedulers.blockingimportBlockingScheduler defjob(text): t=time.strftime('%Y-%m-%d%H:%M:%S',time.localtime(time.time())) print('{}---{}'.format(text,t)) scheduler=BlockingScheduler() #每隔1分钟运行一次job方法 scheduler.add_job(job,'interval',minutes=1,args=['job1']) #在2019-08-2922:15:00至2019-08-2922:17:00期间,每隔1分30秒运行一次job方法 scheduler.add_job(job,'interval',minutes=1,seconds=30,start_date='2019-08-2922:15:00',end_date='2019-08-2922:17:00',args=['job2']) scheduler.start() ''' 运行结果: job2---2019-08-2922:15:00 job1---2019-08-2922:15:46 job2---2019-08-2922:16:30 job1---2019-08-2922:16:46 job1---2019-08-2922:17:46 ...余下省略... '''
3、触发器cron
在特定时间周期性地触发。参数如下:
参数 | 说明 |
year (int 或 str) | 年,4位数字 |
month (int 或 str) | 月 (范围1-12) |
day (int 或 str) | 日 (范围1-31) |
week (int 或 str) | 周 (范围1-53) |
day_of_week (int 或 str) | 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun) |
hour (int 或 str) | 时 (范围0-23) |
minute (int 或 str) | 分 (范围0-59) |
second (int 或 str) | 秒 (范围0-59) |
start_date (datetime 或 str) | 最早开始日期(包含) |
end_date (datetime 或 str) | 最晚结束时间(包含) |
timezone (datetime.tzinfo 或str) | 指定时区 |
这些参数支持算数表达式,取值格式有如下:
使用例子:
importtime fromapscheduler.schedulers.blockingimportBlockingScheduler defjob(text): t=time.strftime('%Y-%m-%d%H:%M:%S',time.localtime(time.time())) print('{}---{}'.format(text,t)) scheduler=BlockingScheduler() #在每天22点,每隔1分钟运行一次job方法 scheduler.add_job(job,'cron',hour=22,minute='*/1',args=['job1']) #在每天22和23点的25分,运行一次job方法 scheduler.add_job(job,'cron',hour='22-23',minute='25',args=['job2']) scheduler.start() ''' 运行结果: job1---2019-08-2922:25:00 job2---2019-08-2922:25:00 job1---2019-08-2922:26:00 job1---2019-08-2922:27:00 ...余下省略... '''
4、通过装饰器scheduled_job()添加方法
添加任务的方法有两种:
(1)通过调用add_job()---见上面1至3代码
(2)通过装饰器scheduled_job():
第一种方法是最常用的方法。第二种方法主要是方便地声明在应用程序运行时不会更改的任务。该 add_job()方法返回一个apscheduler.job.Job实例,可以使用该实例稍后修改或删除该任务。
importtime fromapscheduler.schedulers.blockingimportBlockingScheduler scheduler=BlockingScheduler() @scheduler.scheduled_job('interval',seconds=5) defjob1(): t=time.strftime('%Y-%m-%d%H:%M:%S',time.localtime(time.time())) print('job1---{}'.format(t)) @scheduler.scheduled_job('cron',second='*/7') defjob2(): t=time.strftime('%Y-%m-%d%H:%M:%S',time.localtime(time.time())) print('job2---{}'.format(t)) scheduler.start() ''' 运行结果: job2---2019-08-2922:36:35 job1---2019-08-2922:36:37 job2---2019-08-2922:36:42 job1---2019-08-2922:36:42 job1---2019-08-2922:36:47 job2---2019-08-2922:36:49 ...余下省略... '''
关于怎么在Python中利用APScheduler实现一个定时任务就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
推荐阅读
-
Python中怎么动态声明变量赋值
这篇文章将为大家详细讲解有关Python中怎么动态声明变量赋值,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文...
-
python中变量的存储原理是什么
-
Python中怎么引用传递变量赋值
这篇文章将为大家详细讲解有关Python中怎么引用传递变量赋值,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文...
-
python中怎么获取程序执行文件路径
python中怎么获取程序执行文件路径,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的...
-
Python中如何获取文件系统的使用率
Python中如何获取文件系统的使用率,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴...
-
Python中怎么获取文件的创建和修改时间
这篇文章将为大家详细讲解有关Python中怎么获取文件的创建和修改时间,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读...
-
python中怎么获取依赖包
今天就跟大家聊聊有关python中怎么获取依赖包,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据...
-
python怎么实现批量文件加密功能
-
python中怎么实现threading线程同步
小编给大家分享一下python中怎么实现threading线程同步,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!...
-
python下thread模块创建线程的方法
本篇内容介绍了“python下thread模块创建线程的方法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来...