Using Celery to Handle Asynchronous Processes
by Matthew Snider · September 2, 2011 @ 9:03 a.m.
This article continues the series on building a continuous deployment environment using Python and Django.
- Starting Your First Django Project
- Testing and Django
- Mock and Coverage
- Using Fabric for Painless Scripting
- Using Celery to handle Asynchronous Processes
- Deployment/Monitoring Strategies
Those of you following along, now have the tools to setup a Python/Django project, fully test it, and deploy it. Today we will be discussing the Celery package, which is an open source asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well
.
Simply put, processes can be run asynchronously and distributed, instead of by the main app. This allows complex calculation, heavy data processing, or third-party services to all be run without blocking the main Django/Python app. And when run on a remote server, without using resources of the main Django app. Celery is a Python project, but there is an app, django-celery
, which plugs into Django.
Getting ready
To install Celery, enter your virtualenv and call:
pip install celery
Install a technology to manage the Celery queue (RabbitMQ is recommended). Full RabbitMQ installation instructions are available at www.rabbitmq.com/install.html.
To install RabbitMQ on debian/ubuntu:
sudo apt-get install rabbitmq-server
RabbitMQ will start automatically upon installation. To start/stop RabbitMQ manually on debina/ubuntu:
invoke-rc.d rabbitmq-server start invoke-rc.d rabbitmq-server stop
On most systems the log file for RabbitMQ can be found at /var/log/rabbitmq/rabbit.log
.
To install RabbitMQ on OSX (this will take a long time):
sudo brew install rabbitmq
RabbitMQ will be installed to /usr/local/sbin
, so add this directory to your PATH, if you haven't already done so.To start/stop RabbitMQ manually on OSX:
sudo rabbitmq-server sudo rabbitmqctl stop
To install Celery for Django:
pip install django-celery
How to do it…
Setup RabbitMQ for use with Celery:
sudo rabbitmqctl add_user {CELERY_USER} {CELERY_USER_PASS} sudo rabbitmqctl add_vhost {CELERY_VHOST} sudo rabbitmqctl set_permissions -p {CELERY_VHOST} {CELERY_USER} ".*" ".*" ".*"
Don't use any periods in the {CELERY_VHOST}
or Celery won't be able to connect to RabbitMQ. If the server cannot connect on OSX, see Broker Installation for additional setup information.
Create your first task, task.py
:
from celery.task import task @task def add(x, y): return x + y
Create a configuration file to run your Celery task, celeryconfig.py
:
BROKER_HOST = "localhost" BROKER_PORT = 5672 BROKER_USER = "{CELERY_USER}" BROKER_PASSWORD = "{CELERY_USER_PASS}" BROKER_VHOST = "{CELERY_VHOST}" CELERY_RESULT_BACKEND = "amqp" CELERY_IMPORTS = ("tasks", )
If RabbitMQ is running remotely, change localhost
to the name of the remote server.
Now to test, start Celery (it will print to the console):
celeryd --loglevel=INFO
In another terminal, open the Python shell and run:
from tasks import add from celery.execute import send_task result = add.apply_async(args=[2, 2], kwargs={}) print result.get() result = send_task('task.add', [3, 3]) print result.get() result = add.delay(4, 4)
You should see something like the following in the terminal window running Celery:
[2011-08-31 23:43:44,242: INFO/MainProcess] Got task from broker: tasks.add[f5f5ee81-fef5-46d2-87de-0da005d588d0] [2011-08-31 23:43:44,294: INFO/MainProcess] Task tasks.add[f5f5ee81-fef5-46d2-87de-0da005d588d0] succeeded in 0.0111658573151s: 4 [2011-08-31 23:43:44,301: INFO/MainProcess] Got task from broker: tasks.add[cd2de0d1-35ad-4d7a-8212-47e800cd85bc] [2011-08-31 23:43:44,298: INFO/MainProcess] Task tasks.add[cd2de0d1-35ad-4d7a-8212-47e800cd85bc] succeeded in 0.0115258693695s: 6 [2011-08-31 23:43:44,301: INFO/MainProcess] Got task from broker: tasks.add[cd2de0d1-35ad-4d7a-8212-47e900cd85bc] [2011-08-31 23:43:44,329: INFO/MainProcess] Task tasks.add[cd2de0d1-35ad-4d7a-8212-47e900cd85bc] succeeded in 0.0115258693695s: 8
To run celery as a daemon process on debian/ubuntu, install https://github.com/ask/celery/tree/master/contrib/generic-init.d/, and run:
/etc/init.d/celeryd {start|stop|restart|status}
To configure your process, edit /etc/default/celeryd
:
# Name of nodes to start # here we have a single node CELERYD_NODES="w1" # or we could have three nodes: #CELERYD_NODES="w1 w2 w3" # Where to chdir at start. CELERYD_CHDIR="/opt/Myproject/" # Extra arguments to celeryd CELERYD_OPTS="–time-limit=300 –concurrency=8" # Name of the celery config module. CELERY_CONFIG_MODULE="celeryconfig" # %n will be replaced with the nodename. CELERYD_LOG_FILE="/var/log/celery/%n.log" CELERYD_PID_FILE="/var/run/celery/%n.pid" # Workers should run as an unprivileged user. CELERYD_USER="celery" CELERYD_GROUP="celery"
For deamon scripts on other operating systems and for more information on configuration, see Celery Daemonizing.
Django-Celery
If you use django-celery
, you won't need the celeryconfig.py
file, as the Celery configuration will live in the project's settings.py
.
To start add djcelery
to INSTALLED_APPS
, then add the following to settings.py
:
import djcelery djcelery.setup_loader() CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler" CELERY_RESULT_BACKEND = "amqp" BROKER_HOST = "localhost" BROKER_PORT = 5672 BROKER_USER = "{CELERY_USER}" BROKER_PASSWORD = "{CELERY_USER_PASS}" BROKER_VHOST = "{CELERY_VHOST}"
To create the necessary database tables:
python manage.py syncdb
For those using mod_wsgi, add the following to your *.wsgi
file:
import os os.environ["CELERY_LOADER"] = "django"
Celery will automatically look for files named tasks.py
in other installed apps and process them accordingly. Another great use for Celery are periodic tasks:
from datetime import timedelta from celery.decorators import periodic_task @periodic_task(run_every=timedelta(days=1)) def update_users(): aggregate_user_data() # not defined here, just an example
There is some additional information available at ask.github.com/django-celery/.
Lastly, start a process to snapshot the workers, so you can use Django to monitor celery:
nohup python manage.py celerycam &
More information on monitoring is available at Celery Monitoring.
How it works…
Celery uses tasks that are executed concurrently on one or more workers. Tasks can execute asynchronously or synchronously (result = task.get()
or task.wait()
). A third party service is used for managing queueing of tasks and storing of task results. RabbitMQ is recommended for the queue, although many DB technologies are supported, and I prefer AMQP or Redis for the results backend.
To create a Celery task, first create a file named tasks.py
and then add task functions there. Decorate each function that will be explicitly called with @task
and functions that execute periodically with @periodic_task(run_every=timedelta({FREQUENCY}))
. Tasks, including periodic tasks, can be called explicity using mytask.delay(*args, **kwargs)
. This returns a result
object, which can be used to lock the current thread until the task is completed using result.wait()
or response = result.get()
.
The Celery worker process will log all activity, so monitor it for errors (usually /var/logs/celeryd.log
). Try to make sure task functions gracefully handle errors, so that no data is ever lost.
The celery-django
package manges your Celery commands and configuration, adds an admin tool, and discovers tasks.py
automatically in all your apps. To monitor the Celery workers via Django start the celerycam
process, which will take periodic snapshots of the workers and write to the djcelery_taskstate
table.
This should provide enough information for you to start using Celery in your own projects. Feel free to leave any questions you may have.
There's more…
Much of this article was inspired by the Celery Documentation. I recommend starting there is you have any questions. For additional information:
- Celery Project Homepage
- Celery Project Wiki
- Django-Celery Wiki
- Slides for Django-Celery