欢迎来到我的博客!

分布式任务队列 Celery 之初探

Django Aaron 1 年,5 月 1906 17评论

Celery是基于python开发的分布式任务队列,对于一些耗时的任务,我们可以把它交给celery异步处理,同时celery还可以做定时任务。

Celery在执行任务时,需要一个中间人(broker)来发送和接收任务消息。充当这个中间人角色的是 RabbitMQ 或者 Redis,本文将使用Redis作为broker。关于Redis的介绍和使用,可以参考下上一篇文章 django博客开发:redis的安装以及用django-redis作缓存

安装

本文使用的是celery 4.1.0,当前最新版本

pip install -U Celery

因为要用redis做中间人,所以可以用下面这种来安装celery和一些支持redis的组件

pip install "celery[redis]"

在django中使用celery

我们的django项目结构通常是这样的

- proj/
  - manage.py
  - proj/
    - __init__.py
    - settings.py
    - urls.py

在proj/proj/ 下创建celery.py文件,用来定义一个celery实例。

proj/proj/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# 这里定义了namespace='CELERY',在settings里所有跟celery有关的设置都有加上'CELERY_'前缀
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
# 自动搜索所有注册的app下的tasks.py,所有celery处理的任务,都写在app下的tasks.py里
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

proj/proj/__init__.py里引入该实例

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

接下来需要在proj/proj/settings.py里进行设置

# celery settings
# 使用redis作为中间人
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
# 任务的结果存储在redis
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'

关于RESULT_BACKEND,还可以使用django的ORM或者Cache来存储任务结果,需要用到一个扩展app, django-celery-results,使用也很简单

1. 安装 pip install django-celery-results

2. 加入到INSTALLED_APPS中

settings.py

INSTALLED_APPS = (
    ...,
    'django_celery_results',
)

3. 数据库迁移 python manage.py migrate django_celery_results

4. 在settings.py里设置CELERY_RESULT_BACKEND

任务结果存储到数据库的话:

CELERY_RESULT_BACKEND = 'django-db'

存储到缓存中:

CELERY_RESULT_BACKEND = 'django-cache'

处理异步任务

Celery的功能直接就是异步处理,比如在以下场景:我做评论邮件通知功能时,按照逻辑,回复一条评论,需要发送邮件通知,而发邮件算是一个比较耗时的任务,每次提交评论后都会卡1~2秒,非常影响用户体验。

假设原来的代码是这样的:

blog/views.py

import time

# 发表评论的视图函数
def submit_comment(request):
    ...
    ...
    # 发送邮件
    send_mail(email)
    return JsonResponse({'msg': '评论出错!'})

# 发送邮件的函数
def send_mail(email):
    ...
    ...
    print('sending email...')
    time.sleep(2) # 耗时两秒
    print('finished')
    return True

把send_mail这任务交给celery来处理, 在blog/下创建tasks.py

blog/tasks.py

from proj import celery_app # 这是我们之前定义的celery实例
import time


# 原来的发送邮件函数写在这里,并用task装饰器
@celery_app.task
def send_mail(*args, **kwargs):
    ...
    ...
    print('sending email...')
    time.sleep(2) # 耗时两秒
    print('finished')
    return True

blog/views.py

from .tasks import send_mail


# 发表评论的视图函数
def submit_comment(request):
    ...
    ...
    # 发送邮件
    send_mail.delay(email)
    return JsonResponse({'msg': '评论出错!'})

注意:这里send_mail的参数必须是可json序列化的对象,不然会报错

 

运行任务

任务已经写好了,还需要启动celery才能运行任务

启动celery也很简单,在项目根目录下(跟我们运行 python manage.py runserver 一样),运行如下命令

celery -A proj worker -l info

注意:这里的proj就是我们之前定义celery实例时,你可以自己更改,比如我定义的是 app = Celery('django_blog')

正常运行后,终端里会显示如下:

django里运行celery

运行celery之后,再打开一个终端,启动服务器

python manage.py runserver

尝试发送评论,不会再有之前的阻塞现象,终端的log如下:

每当celery执行一个任务,都会产生一个key,这个key对应的就是任务的结果,它存储在result_backend里,我们使用的是redis,可以去redis里查看一下

打开redis交互

/usr/local/bin/redis-cli

以上就是celery执行异步任务的介绍,下一篇文章将介绍celery定时炸弹任务。

注意:每当添加一个任务时,都需要重新启动celery,否则celery无法发现新添加的任务。

supervisor启动celery

在生产环境下,我们肯定不希望像上面介绍的那样手动开启celery,这里可以使用进程监控神器 supervisor,关于supervisor的使用,在上一篇文章 django博客开发:redis的安装以及用django-redis作缓存 中有介绍

/etc/supervisor/conf.d 下创建 celery.conf,作为监控celery的配置文件

vim /etc/supervisor/conf.d/celery.conf

 /etc/supervisor/conf.d/celery.conf

[program:celery]

; 这里是项目的路径
directory = /home/aaron/Desktop/sites/aaron-zhao.com/django_blog

; 启动celery的命令
command = /home/aaron/Desktop/sites/aaron-zhao.com/env/bin/celery -A django_blog worker -l info --logfile log/celery.log

; 进程数,默认是1
numprocs = 1
; 自动启动
autostart = true
; 自动重启
autorestart = true

NOTICE:这里启动celery的命令 /home/aaron/Desktop/sites/aaron-zhao.com/env/bin/ 是celery所在的路径,因为celery是安装在虚拟环境中的 env/ 就是虚拟环境。 --logfile log/celery.log 是配置celery的日志,日志的路径自行更改。

评论 8人参与 | 17条评论