Django Celery取消预取机制

基本情况问题解决

基本情况

Django + celery,消息队列用的是redis,开启Django服务后,接着开启celery,开启celery的命令如下:

# Linux

celery -A zy_ds worker -l info # 测试使用

nohup celery -A zy_ds worker -l info > celery.log 2>&1 & # 正常启动

nohup celery -A zy_ds worker --prefetch-multiplier=1 -l info > celery.log 2>&1 & # 设置并发量为1

# Windows

celery -A zy_ds worker -l info -P eventlet # Windows中需要加-P eventlet

# Windows中需要提前pip install eventlet

settings.py中celery的配置如下:

# ../zy_ds/zy_ds/settings.py

# celery配置

CELERY_BROKER_URL = 'redis://:123456@127.0.0.1:6379/0'

CELERY_RESULT_BACKEND = 'redis://:123456@127.0.0.1:6379/0'

# celery内容等消息的格式设置

CELERY_TASK_SERIALIZER = 'pickle'

CELERY_RESULT_SERIALIZER = 'pickle'

CELERY_ACCEPT_CONTENT = ['pickle', 'application/json']

# Worker并发数量,一般默认CPU核数,可以不设置

CELERY_WORKER_CONCURRENCY = 1

# 预取机制(每次去消息队列读取任务的数量,默认值是4)

CELERY_PREFETCH_MULTIPLIER = 1

项目主目录中的celery.py设置如下:

# ../zy_ds/zy_ds/celery.py

import os

from celery import Celery

from django.conf import settings

# 设置环境变量

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'zy_ds.settings')

# 实例化

app = Celery('zy_ds')

# namespace='CELERY'作用是允许你在Django配置文件中对Celery进行配置

# 但所有Celery配置项必须以CELERY开头,防止冲突

app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动从Django的已注册app中寻找加载worker函数(例如每个app下的tasks.py)

app.autodiscover_tasks(settings.INSTALLED_APPS)

问题

本人在settings.py中设置的worker并发数量是1,预取的数量也是1,就是想实现worker每次只从redis消息队列中拿一个任务,每次只执行一个任务,并且禁用预取机制。(因为预取机制很有可能导致这样一个问题:一个worker在执行某个耗时任务A时,它预取的任务B长时间被它占用并得不到释放,会使其他worker想要取任务时取不到,导致执行效率低下)

尝试1:(在启动命令中添加参数:-O fair) celery -A zy_ds worker --loglevel=info -O fair

尝试2:(在settings.py中添加如下参数) CELERY_ACKS_LATE = True CELERY_CONCURRENCY = 1 CELERY_PREFETCH_MULTIPLIER = 1

除此之外也试过其他方法,均无效

解决

重点是在celery.py中添加参数,而不是在settings.py中添加,celery.py参数添加如下:

# ../zy_ds/zy_ds/celery.py

import os

from celery import Celery

from django.conf import settings

# 设置环境变量

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'zy_ds.settings')

# 实例化

app = Celery('zy_ds')

# 添加配置参数,取消预取机制

app.conf.CELERY_ACKS_LATE = True

app.conf.CELERYD_CONCURRENCY = 1 # 并发的worker数量 可选

app.conf.CELERYD_PREFETCH_MULTIPLIER = 1 # 预取的任务数量

app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动从Django的已注册app中寻找加载worker函数(例如每个app下的tasks.py)

app.autodiscover_tasks(settings.INSTALLED_APPS)

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: