Django定时任务之django_apscheduler使用

今天在写一个任务需求时需要用到定时任务来做一部分数据处理与优化,于是在了解完现有方法,结合自己需求决定使用django_apscheduler,记录一下过程,有几篇值得参考的文章放在结尾,很多本文未提到之处可详见参考文章。 1、安装插件

pip install django-crontab

2、添加至settings中

INSTALLED_APPS = [

'django.contrib.admin',

'django.contrib.auth',

'django.contrib.contenttypes',

'django.contrib.sessions',

'django.contrib.messages',

'django.contrib.staticfiles',

'django_apscheduler', # 这里

...

]

3、migrate一下

python manage.py migrate

会多出两个表,记录定时任务及执行 4、写定时任务,可以直接写在已有app的views.py文件中,也可以自己新建一个app,放在新建app的views.py文件中,不过有一点需要注意,就是views.py文件得在api中引用才行,否则定时任务无法执行 1)若是写在已有app的views.py文件中,可以直接复制以下代码进行测试

from apscheduler.schedulers.background import BackgroundScheduler

from django_apscheduler.jobstores import DjangoJobStore, register_job

# 实例化调度器

scheduler = BackgroundScheduler(timezone='Asia/Shanghai')

# 调度器使用DjangoJobStore()

scheduler.add_jobstore(DjangoJobStore(), "default")

@register_job(scheduler, "interval", seconds=5, id="test_func", replace_existing=True, misfire_grace_time=120)

def test_func():

print('test')

scheduler.start()

2)新建app,这里我新建了一个timedtasks的app,以下是view.py文件的内容,分别有三个job,可以供测试

# Create your views here.

from django.http import HttpResponse

import time

from apscheduler.schedulers.background import BackgroundScheduler

from django_apscheduler.jobstores import DjangoJobStore, register_job, register_events

import logging

logger = logging.getLogger('apps')

# 实例化调度器

scheduler = BackgroundScheduler(timezone='Asia/Shanghai')

# 调度器使用DjangoJobStore()

scheduler.add_jobstore(DjangoJobStore(), "default")

logger.info('django-apscheduler')

# 1.通过 @register_job 类装饰器做法 每间隔5s执行一次

@register_job(scheduler, "interval", seconds=5, args=['job1name'], id='job1', replace_existing=True)

def job1(name):

# 具体要执行的代码

print('{} 任务运行成功!{}'.format(name, time.strftime("%Y-%m-%d %H:%M:%S")))

def job2(name):

# 具体要执行的代码

print('{} 任务运行成功!{}'.format(name, time.strftime("%Y-%m-%d %H:%M:%S")))

def job3(name):

print('{} 任务运行成功!{}'.format(name, time.strftime("%Y-%m-%d %H:%M:%S")))

# 2.add_job的方式 interval 每间隔10s执行一次

scheduler.add_job(job2, "interval", seconds=10, args=['job2name'], id="job2", replace_existing=True)

# 3.add_job的方式 crontab 16点 38分/40分 执行

scheduler.add_job(job3, 'cron', hour='16', minute='38,40', args=['job3name'], id='job3', replace_existing=True)

# 监控任务

register_events(scheduler)

# 调度器开始运行

scheduler.start()

# 注:必须得注册到API中去才可以,不然定时任务不能注册,不能执行

def testTimedTasks(request):

return HttpResponse('ok')

注意,urls.py文件中得引用:

from apps.timedtasks.views import testTimedTasks

urlpatterns = [

...,

path("test-timed-tasks/", testTimedTasks, name="test-timed-tasks"),

]

简单记录一下流程,如有错误欢迎指正,如有补充欢迎评论!

【参考】https://blog.csdn.net/weixin_42782150/article/details/123212604 【参考】https://blog.csdn.net/hans99812345/article/details/123926503 【参考】https://www.cnblogs.com/leisunny/p/16185169.html

推荐阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: