先定义下,下文可能会用到的时间常量
import time
second = 1
minute = second * 60
hour = minute * 60
day = hour * 24
month = day * 30
year = month * 12
def task():
print("读取数据...")
time.sleep(1)
print("导出数据...")
print("file stream...")
def loop(delay):
while True:
task()
time.sleep(delay)
if __name__ == '__main__':
loop(second)
实现最简陋,最不推荐, time.sleep(delay)会阻塞主线程,导致整个程序在等待任务执行完成时无法响应其他事件。
from threading import Timer
def task():
print("读取数据...")
time.sleep(1)
print("导出数据...")
print("file stream...")
def loop(delay):
task()
t = Timer(interval=delay, function=loop,args=(delay,))
t.start()
if __name__ == '__main__':
loop(second)
sched模块实现了一个通用事件调度器,在调度器类使用一个延迟函数等待特定的时间,执行任务。同时支持程序,在每个任务执行后会立刻调用延时函数,以确保其他线程也能执行。
import sched
def task():
print("读取数据...")
time.sleep(1)
print("导出数据...")
print("file stream...")
def loop():
task()
s.enter(delay=second, priority=1, action=loop)
if __name__ == '__main__':
s = sched.scheduler(timefunc=time.time, delayfunc=time.sleep)
s.enter(delay=second, priority=1, action=loop)
s.run()
scheduler对象主要方法:
1.enter(delay, priority, action, argument),安排一个事件来延迟delay个时间单位。
2.cancel(event):从队列中删除事件。如果事件不是当前队列中的事件,则该方法将抛出一个ValueError。
3.run():运行所有预定的事件。这个函数将等待(使用传递给构造函数的delayfunc()函数),然后执行事件,直到不再有预定的事件。
schedule库是一个的定时任务方案。优势是使用简单,可以直接上手,基本不需要做什么配置。缺点是,因为比较轻量,导致他的功能比较局限,比如无法动态的添加任务,无法将任务的结果进行持久化等。
import schedule
import time
def job():
print("I am working ...")
# 每10分钟执行一次job这个任务
schedule.every(10).minutes.do(job)
# 每小时执行任务
schedule.every().hour.do(job)
# 每天10:30执行任务
schedule.every().day.at("10:30").do(job)
# 每5~10分钟执行一次任务
schedule.every(5).to(10).minutes.do(job)
# 每周一执行一次任务
schedule.every().monday.do(job)
# 每周三13:15执行一次任务
schedule.every().wednesday.at("13:15").do(job)
while True:
schedule.run_pending()
time.sleep(1)
import schedule
from datetime import datetime, timedelta, time
def job():
print('working...')
schedule.every().second.until('23:59').do(job) # 今天23:59停止
schedule.every().second.until('2030-01-01 18:30').do(job) # 2030-01-01 18:30停止
schedule.every().second.until(timedelta(hours=8)).do(job) # 8小时后停止
schedule.every().second.until(time(23, 59, 59)).do(job) # 今天23:59:59停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job) # 2030-01-01 18:30停止
while True:
schedule.run_pending()
1.注意:不要高射炮打蚊子!!schedule就挺好。
2.实现:参考以往博客:celery。
Celery在异步任务和定时任务两个方面可以说是非常出名。他性能稳定,功能强大,在中大型项目中扮演着重要的角色。但是 Celery也无法动态的添加任务( Django中有相应的插件可以实现动态任务),而且对于之前没有使用过 Celery的项目,单独因为定时任务而搭建一套 Celery系统显得会有点大材小用的感觉,因为搭建 Celery还得配一个 Broker, Broker一般是 Redis或者 RabbitMQ,所以比较重量级,也比较浪费资源。
安装:pip install apscheduler
APScheduler不仅使用简单,功能还比较强大,比如支持动态添加和删除任务,支持持久化,持久化的方式还有很多选择,比如内存、 MongoDB、 SQLAlchemy、 Redis等,也针对一些常用的框架做了接口,比如有asyncio调度器、 gevent调度器等。
import time
from apscheduler.schedulers.blocking import BlockingScheduler
def task():
print("读取数据...")
time.sleep(1)
print("导出数据...")
print("file stream...")
def loop():
scheduler = BlockingScheduler()
scheduler.add_job(func=task, trigger="interval", seconds=second * 10)
scheduler.start()
if __name__ == '__main__':
loop()
Job作为APScheduler最小执行单位。创建Job时指定执行的函数,函数中所需参数,Job执行时的一些设置信息。
def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None,
misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
next_run_time=undefined, jobstore='default', executor='default',
replace_existing=False, **trigger_args):
构建说明:
Trigger绑定到Job,在scheduler调度筛选Job时,根据触发器的规则计算出Job的触发时间,然后与当前时间比较确定此Job是否会被执行,总之就是根据trigger规则计算出下一个执行时间。
目前APScheduler支持触发器:
date定时,作业只执行一次。
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
sched.add_job(my_job, 'date', run_date=datetime(2019, 7, 6, 16, 30, 5), args=['text'])
interval间隔调度
sched.add_job(job_function, 'interval', hours=2)
cron调度
# 6-8,11-12月第三个周五 00:00, 01:00, 02:00, 03:00运行
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 每周一到周五运行 直到2024-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30')
Executor在scheduler中初始化,另外也可通过scheduler的add_executor动态添加Executor。每个executor都会绑定一个alias,这个作为唯一标识绑定到Job,在实际执行时会根据Job绑定的executor找到实际的执行器对象,然后根据执行器对象执行Job。
Executor的种类会根据不同的调度来选择,如果选择AsyncIO作为调度的库,那么选择AsyncIOExecutor,如果选择tornado作为调度的库,选择TornadoExecutor,如果选择启动进程作为调度,选择ThreadPoolExecutor或者ProcessPoolExecutor都可以。
Executor的选择需要根据实际的scheduler来选择不同的执行器。目前APScheduler支持的Executor:
Jobstore在scheduler中初始化,另外也可通过scheduler的add_jobstore动态添加Jobstore。每个jobstore都会绑定一个alias,scheduler在Add Job时,根据指定的jobstore在scheduler中找到相应的jobstore,并将job添加到jobstore中。作业存储器决定任务的保存方式, 默认存储在内存中(MemoryJobStore),重启后就没有了。APScheduler支持的任务存储器有:
Event是APScheduler在进行某些操作时触发相应的事件,用户可以自定义一些函数来监听这些事件,当触发某些Event时,做一些具体的操作。常见的比如。Job执行异常事件 EVENT_JOB_ERROR。Job执行时间错过事件 EVENT_JOB_MISSED。
目前APScheduler定义的Event:
EVENT_SCHEDULER_STARTED
EVENT_SCHEDULER_START
EVENT_SCHEDULER_SHUTDOWN
EVENT_SCHEDULER_PAUSED
EVENT_SCHEDULER_RESUMED
EVENT_EXECUTOR_ADDED
EVENT_EXECUTOR_REMOVED
EVENT_JOBSTORE_ADDED
EVENT_JOBSTORE_REMOVED
EVENT_ALL_JOBS_REMOVED
EVENT_JOB_ADDED
EVENT_JOB_REMOVED
EVENT_JOB_MODIFIED
EVENT_JOB_EXECUTED
EVENT_JOB_ERROR
EVENT_JOB_MISSED
EVENT_JOB_SUBMITTED
EVENT_JOB_MAX_INSTANCES
Listener表示用户自定义监听的一些Event,比如当Job触发了EVENT_JOB_MISSED事件时可以根据需求做一些其他处理。
Scheduler是APScheduler的核心,所有相关组件通过其定义。scheduler启动之后,将开始按照配置的任务进行调度。除了依据所有定义Job的trigger生成的将要调度时间唤醒调度之外。当发生Job信息变更时也会触发调度。
APScheduler支持的调度器方式如下,比较常用的为BlockingScheduler和BackgroundScheduler
BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回。
BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用start后主线程不会阻塞。
AsyncIOScheduler:适用于使用了asyncio模块的应用程序。
GeventScheduler:适用于使用gevent模块的应用程序。
TwistedScheduler:适用于构建Twisted的应用程序。
QtScheduler:适用于构建Qt的应用程序。
添加任务:
使用 scheduler.add_job(job_obj,args,id,trigger,**trigger_kwargs)。
删除任务:
使用 scheduler.remove_job(job_id,jobstore=None)。
暂停任务:
使用 scheduler.pause_job(job_id,jobstore=None)。
恢复任务:
使用 scheduler.resume_job(job_id,jobstore=None)。
修改某个任务属性信息:
使用 scheduler.modify_job(job_id,jobstore=None,**changes)。
修改单个作业的触发器并更新下次运行时间:
scheduler.reschedule_job(job_id,jobstore=None,trigger=None,**trigger_args)
输出作业信息:
使用 scheduler.print_jobs(jobstore=None,out=sys.stdout)
当我们的任务跑出异常后,我们可以监听到,然后把错误信息进行记录,示例代码如下:
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import datetime
import logging
Scheduler添加job流程:
Scheduler调度流程:
9.linux crontab实现
crontab命令:
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- fenyunshixun.cn 版权所有 湘ICP备2023022495号-9
违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务