一、概述

1、celery框架和flask框架在运行时,是在不同的进程中,资源是独占的。

2、celery异步任务如果想使用flask中的功能,如orm,是需要在flask应用上下文管理器中执行orm操作的

3、使用celery是需要使用到中间件的,简单点就使用redis做中间件

注意:

在flask工厂模式集成celery异步框架,在celery的异步任务中能够获取到flask的应用上下文管理器,也就是说在celery异步任务中你可以去调用flask项目中功能,如orm操作等。

使用本文配置,可以无需修改flask创建app应用的程序,直接将celery相关包创建,运行就可以使用,且能够在异步任务使用flask的功能。

二、项目结构

依赖环境:

celery==4.4.7

eventlet==0.33.3

Flask==2.1.3

Flask-Caching==1.10.1

Flask-Cors==3.0.10

Flask-Migrate==2.7.0

Flask-RESTful==0.3.9

Flask-SocketIO==5.1.1

Flask-SQLAlchemy==2.5.1

PyMySQL==1.0.2

redis==3.5.3

SQLAlchemy==1.4.0

Werkzeug==2.0.2

目录结构:

flask-project

        |--apps

                |-- user

                        |-- models

                        |--views.py

                        |--urls.py

                |--__init__.py

        |--ext

                |--__init__.py

                |--config.py

        |--celery_task

                |--__init__.py

                |--async_task.py

                |--celery.py

                |--celeryconfig.py

                |--check_task.py

                |--scheduler_task.py

        app.py

三、flask工厂模式下各模块功能

1、apps/user/models.py : 写了一个user表

2、apps/user/views.py:写了测试调用celery异步任务的接口

3、apps/user/urls.py: 注册路由的

4、ext/__init__.py:cache、db、cors的拓展

5、ext/config.py : cache和cors使用到的配置

6、apps/__init__.py: 一个函数create_app,生成flask应用对象

7、app.py: 启动flask应用对象的模块

本文重点不在flask工厂模式,默认看官都懂如何创建flaks工厂模式的项目了。

在视图中在执行异步任务,并获取异步任务的id:

from celery_task.async_task import send_email_task,cache_user_task

#用户资源:get\put\delete, 对单个进行操作

class UserOneResource(ResourceBase):

def put(self,id):

#测试异步发邮件

email = request.args.get('email')

code = request.args.get('code')

res = send_email_task.delay(email,code)

print(res.id)

return NewResponse(msg='put',data={'task_id':res.id})

def patch(self,id):

#测试异步操作flask的orm和cache

p = request.args.get('p')

if p=='set':

res = cache_user_task.delay()

print(res,type(res))

return NewResponse(msg='patch',data={'task_id':res.id})

else:

from ext import cache

data = cache.get('all-user-data')

return NewResponse(msg='patch',data=data)

res = 异步函数.delay(函数需要的参数)

task_id = res.id

注意:task_id 可以知道对应的任务的完成情况,获取任务的返回值等。

四、celery项目的配置

1、celery的配置

将celery的配置都放到一个py文件中,方便后期的维护和使用

celeryconfig.py

from celery.schedules import crontab

from datetime import timedelta

'''

参数解析:

accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;

task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;

result_serializer:结果序列化格式,默认值为json;

timezone:配置Celery以使用自定义时区;

enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。

result_expires: 异步任务结果存活时长

beat_schedule:设置定时任务

'''

#手动注册celery的异步任务:将所有celery异步任务所在的模块找到,写成字符串

task_module = [

'celery_task.async_task', # 写任务模块导入路径,该模块主要写异步任务的方法

'celery_task.scheduler_task', # 写任务模块导入路径,该模块主要写定时任务的方法

]

#celery的配置

config = {

"broker_url" :'redis://127.0.0.1:6379/0', #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码

"result_backend" : 'redis://127.0.0.1:6379/1',

"task_serializer" : 'json',

"result_serializer" : 'json',

"accept_content" : ['json'],

"timezone" : 'Asia/Shanghai',

"enable_utc" : False,

"result_expires" : 1*60*60,

"beat_schedule" : { #定时任务配置

# 名字随意命名

'add-func-30-seconds': {

# 执行add_task下的addy函数

'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func

# 每10秒执行一次

'schedule': timedelta(seconds=30),

# add函数传递的参数

'args': (10, 21)

},

# 名字随意起

'add-func-5-minutes': {

'task': 'celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func

# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务

'schedule': crontab(minute='5'), # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行

'args': (19, 22) # 定时任务需要的参数

},

# 缓存用户数据到cache中

'cache-user-func': {

'task': 'celery_task.scheduler_task.cache_user_func',

# 导入任务函数:from celery_task.scheduler_task import cache_user_func

'schedule': timedelta(minutes=1), # 每1分钟执行一次,将用户消息缓存到cache中

}

}

}

2、创建celery对象

celery.py

from celery import Celery,Task

from .celeryconfig import config,task_module

import sys

import os

'1、把flask项目路径添加到系统环境变量中'

project_path = os.path.dirname(os.path.dirname(__file__))

sys.path.append(project_path)

'''

2、创建celery应用对象

'task'可以任务是该celery对象名字,用于区分celery对象

broker是指定消息中间件

backend是指定任务结果存储位置

include是手动指定异步任务所在的模块的位置

'''

#创建celery异步对象

celery = Celery('task', broker=config.get('broker_url'), backend=config.get('result_backend'), include=task_module)

#导入一些基本配置

celery.conf.update(**config)

'3、给celery所有任务添加flask的应用上下文,在celery异步任务中就可以调用flask中的对象了'

class ContextTask(celery.Task):

def __call__(self, *args, **kwargs):

from apps import create_app

app = create_app()

with app.app_context():

return self.run(*args, **kwargs)

celery.Task = ContextTask

注意:

1、第一步很关键,设置到python项目运行时,加载环境变量的问题。这一步是将flask项目的根目录加载环境变量中,这样第3步才能从apps中导入create_app函数。

2、第二步是创建celery通用的方法了,没什么好说的。

3、第三步很关键,涉及到celery异步任务能否在flask应用上下文管理器运行,从而可以调用flask中的功能,例如orm操作,cache操作.。(在执行任务时,先套上flask的应用上下文管理器)

3、异步任务模块

将所有异步任务相关的函数都集中到一个模块中,方便维护和使用。

async_task.py

# 导入celery对象app

from celery_task.celery import celery

from ext import cache

import time

'''

1、没有返回值的,@app.task(ignore_result=True)

2、有返回值的任务,@app.task 默认就是(ignore_result=False)

'''

# 没有返回值,禁用掉结果后端

@celery.task

def send_email_task(receiver_email,code): # 此时可以直接传邮箱,还能减少一次数据库的IO操作

'''

:param email: 接收消息的邮箱,用户的邮箱

:return:

'''

# 模拟邮件发送验证码

time.sleep(5)

return {'result':'邮件已经发送',receiver_email:'2356'}

@celery.task

def cache_user_task():

#orm查询数据,放到cache中

from apps.user.models import UserModel

user = UserModel.query.all()

lis = []

for u in user:

id = u.id

name = u.name

dic = {'id':id,'name':name}

lis.append(dic)

print(dic)

cache.set('all-user-data',lis)

return {'code':200,'msg':'查询数据成功'}

4、定时任务模块

将所有定时任务相关的函数都集中到一个模块中,方便维护和使用。

schedulser_task.py

from celery_task.celery import celery

import time

# 有返回值,返回值可以从结果后端中获取

@celery.task

def add_func(a, b):

print('执行了加法函数',a+b)

return a + b

# 不需要返回值,禁用掉结果后端

@celery.task(ignore_result=True)

def cache_user_func():

print('all')

5、检测任务id获取任务状态和返回值

check_task.py:

from celery.result import AsyncResult

from celery_task.celery import celery

'''验证任务的执行状态的'''

def check_task_status(task_id):

'''

任务的执行状态:

PENDING :等待执行

STARTED :开始执行

RETRY :重新尝试执行

SUCCESS :执行成功

FAILURE :执行失败

:param task_id:

:return:

'''

result = AsyncResult(id=task_id, app=celery)

dic = {

'type': result.status,

'msg': '',

'data': None,

'code': 400

}

if result.status == 'PENDING':

dic['msg'] = '任务等待中'

elif result.status == 'STARTED':

dic['msg'] = '任务开始执行'

elif result.status == 'RETRY':

dic['msg'] = '任务重新尝试执行'

elif result.status == 'FAILURE':

dic['msg'] = '任务执行失败了'

elif result.status == 'SUCCESS':

result = result.get()

dic['msg'] = '任务执行成功'

dic['data'] = result

dic['code'] = 200

# result.forget() # 将结果删除

# async.revoke(terminate=True) # 无论现在是什么时候,都要终止

# async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。

return dic

在视图函数中调用该方法,通过task_id ,返回任务的运行结果。

五、测试

1、运行项目

flask项目(在项目根目录下执行):

        flask run --host 0.0.0.0 --port 5000

celery项目(在项目根目录下执行):

启动celery进程:

windows系统:

        celery -A celery_task.celery worker -l info  -P  eventlet

linux系统:

        celery -A celery_task.celery worker -l info 

启动定时任务(先启动celery进程在启动定时任务):

celery -A celery_task.celery beat -l info

2、运行结果

1、执行异步任务中,将orm数据存到cache中

2、执行定时任务了

六、注意事项

1、在系统中要先安装好redis和mysql,并都启动了

2、在测试异步操作orm时,会使用到flask的cache存数据,注意flask的cache不能配置内存模式,不然celery进程存到cache中的数据,flask进程中取不到的。

3、当前的配置下,celery的目录必须是在flask根目录下

七、拓展-改变celery_task的位置

如果你想将celery_task包移动到apps包下,此时你需要修改什么?

1、apps/celery_task/celery.py:将flask项目根目录加载到系统环境变量中的路径有变

'1、把flask项目路径添加到系统环境变量中'

project_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))

2、apps/celery_task/celeryconfig.py: 注册异步任务的模块,定时任务的模块的位置变化

'1、加上apps.'

task_module = [

'apps.celery_task.async_task', # 写任务模块导入路径,该模块主要写异步任务的方法

'apps.celery_task.scheduler_task', # 写任务模块导入路径,该模块主要写定时任务的方法

]

'2、task参数对应的字符串,加上apps.'

config = {

"broker_url" :'redis://127.0.0.1:6379/0', #'redis://:123456@127.0.0.1:6379/1' 有密码时,123456是密码

"result_backend" : 'redis://127.0.0.1:6379/1',

"task_serializer" : 'json',

"result_serializer" : 'json',

"accept_content" : ['json'],

"timezone" : 'Asia/Shanghai',

"enable_utc" : False,

"result_expires" : 1*60*60,

"beat_schedule" : { #定时任务配置

# 名字随意命名

'add-func-30-seconds': {

# 执行add_task下的addy函数

'task': 'apps.celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func

# 每10秒执行一次

'schedule': timedelta(seconds=30),

# add函数传递的参数

'args': (10, 21)

},

# 名字随意起

'add-func-5-minutes': {

'task': 'apps.celery_task.scheduler_task.add_func', # 任务函数的导入路径,from celery_task.scheduler_task import add_func

# crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务

'schedule': crontab(minute='5'), # 之前时间点执行,每小时的第5分钟执行任务, 改成小时,分钟,秒 就是每天的哪个小时哪分钟哪秒钟执行

'args': (19, 22) # 定时任务需要的参数

},

# 缓存用户数据到cache中

'cache-user-func': {

'task': 'apps.celery_task.scheduler_task.cache_user_func',

# 导入任务函数:from celery_task.scheduler_task import cache_user_func

'schedule': timedelta(minutes=1), # 每1分钟执行一次,将用户消息缓存到cache中

}

}

}

3、在视图函数导入异步任务的路径也变了

#异步任务

from apps.celery_task.async_task import send_email_task,cache_user_task

4、启动celery和定时任务的命令变量【在项目根目录下执行命令】

启动celery:

windows启动命令: celery  -A  apps.celery_task.celery worker -l info  -P  eventlet

linux启动命令: celery  -A  apps.celery_task.celery worker -l info 

启动定时任务:

celery -A apps.celery_task beat -l info

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: