初识 Celery

以前一直写的是后台的业务系统,数据不需要直接的实时返回,经常不会有什么队列方面的需求。最近有个关于交互的演示项目需要实时返回结果,但是中间几个过程都需要大量的交互(网络通讯),因此这个时候就需要引入消息队列。相对来说,最简单的 Python 队列实现就是 Celery。

Celery 的使用比较简单,但是需要后端的数据库做消息队列存储机制,这个一直没想明白,看文档的时候想当然的以为不需要后端也行的。假如没有后端的话,很有可能会不能实现功能。

比如我有一个上传文件的要求:

1
2

r = requests.post(url, files=fh)

这个请求可能需要大量的时间实现上传,应用又有体验的原因需要实时返回结果,因此实现的方式就可以选择队列实现。更加普遍的场景还有发送邮件、短信等等的场景。

首先是编写 Celery 的 Worker 进程。这个进程是作为队列的后端处理程序存在的,我在选择后端时使用的是 Redis,主要是比较方便,amq 安装相对麻烦。有人曾经反应说 Redis 的支持并不是十分完善,但是我看文档中并未提到,因此在实际生产环境的时候注意一下这个坑,如果发现 Redis 有问题,可以果断迁移至 amq,或者直接用它做后端。

1
2
3
4
5
6
7
8
9
10
11
12
13

# tasks.py

from celery import Celery

redis_url = 'redis://localhost:6379/0'
app = Celery('tasks', backend=redis_url, broker=redis_url) # Celery 第一个参数同文件名


# 新异步任务
@app.task
def upload_file(url, fh):
r = requests.post(url, files=fh)

编写完成后就可以运行 Celery 的 Worker 进程:

celery -A tasks worker

就可以执行 Worker 进程,记得同时启动数据库:)

然后写一下前段处理模块:

1
2
3
4
5
6

from task import upload_file

result = upload_file.delay(url, fh) # 延迟执行
print result.ready() # 获取运行状态
print result.get(timeout=10) # 获取运行结果