0x01 Introduction

Celery is a simple, flexible and reliable distributed system to process vast amounts of messages. It’s a task queue with focus on real-time processing, while also supporting task scheduling.

0x02 Model

Celery 对外开放的接口提供了方便的处理任务、获取任务状态和任务调度的功能,通过这些可以很好的分解预定任务提高处理能力。

0x03 Quickstart

  1. Install

Use pip for install:

1
pip install Celery[Redis]

Before use celery, should install Redis.

  1. Code

Let’s create file tasks.py

1
2
3
4
5
6
7
8
# tasks.py
from celery import Celery

app = Celery("tasks",backend="redis",broker="redis://localhost:6379/0")

@app.task
def add(x,y):
return x+y

Running the celery worker server

1
celery -A tasks worker --loglevel=info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-------------- celery@dev v3.1.20 (Cipater)
---- **** -----
--- * *** * -- Linux-4.0.0-kali1-amd64-x86_64-with-Kali-2.0-sana
-- * - **** ---
- ** ---------- [config]
- - ** ---------- .> app: tasks:0x7fa8fe97ae50
- - ** ---------- .> transport: redis://localhost:6379/0
- - ** ---------- .> results: redis
- - *** --- * --- .> concurrency: 4 (prefork)
- -- ******* ----
- --- ***** ----- [queues]
- -------------- .> celery exchange=celery(direct) key=celery
-
-
- [tasks]
- . tasks.add

Calling the task

1
2
3
4
5
6
7
>> from tasks import add
>>> add.delay(4,4)
<AsyncResult: 38450296-81f6-4774-951c-b51a61caf852>

# worker
[2016-02-18 21:48:02,578: INFO/MainProcess] Received task: tasks.add[38450296-81f6-4774-951c-b51a61caf852]
[2016-02-18 21:48:02,593: INFO/MainProcess] Task tasks.add[38450296-81f6-4774-951c-b51a61caf852] succeeded in 0.0118950859978s: 8

Geting the status

1
2
3
4
5
6
7
>>> result = add.delay(2,3)
>>> result.status
'SUCCESS'
>>> result.get()
5
>>> result.ready()
True

0x04 Summary

Celery分布式调度任务和Redis的高并发原子操作结合在一块可以有效的整合机器的运算资源。