Celery (using Redis)

To use a Celery queue in your project…

Add the following to requirements/base.txt:

celery
redis

Create a celery.py file in the project folder:

# -*- encoding: utf-8 -*-
from celery import Celery
from django.conf import settings

app = Celery('project')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

Add the following to your project/__init__.py file:

from .celery import app as celery_app

In your settings/production.py file, you will have the following:

DOMAIN = get_env_variable('DOMAIN')
DATABASE = DOMAIN.replace('.', '_').replace('-', '_')

DATABASES = {
    ...

Under DATABASES, add the following:

# Celery
from kombu import Exchange, Queue
# transport
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
# number of worker processes (will be 3 == controller, worker and beat)
CELERYD_CONCURRENCY = 1
# rate limits
CELERY_DISABLE_RATE_LIMITS = True
# serializer
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
# queue
CELERY_DEFAULT_QUEUE = DATABASE
CELERY_QUEUES = (
    Queue(DATABASE, Exchange(DATABASE), routing_key=DATABASE),
)

Note

If you are writing an example application, then just use CELERY_ALWAYS_EAGER (as shown below).

Tip

If you want to test this locally, then copy DOMAIN, DATABASE, and the Celery section into your dev file e.g. dev_patrick.py. Check your environment settings to make sure the DOMAIN is set to the value you want it set to.

Note

To use RabbitMQ, just remove BROKER_URL and CELERY_RESULT_BACKEND.

In your settings/dev_test.py file (below DATABASES), add the following:

# http://docs.celeryproject.org/en/2.5/django/unit-testing.html
CELERY_ALWAYS_EAGER = True

To start the task queue on your development system:

celery -A project worker --loglevel=info

If you are using an example app, then replace project with the folder of the example app e.g:

celery -A example_xero worker --loglevel=info --logfile="logger-worker.log"

To deploy, add celery and redis to your pillar e.g:

# sites/my.sls

sites:
  www.hatherleigh.info:
    package: hatherleigh_info
    profile: django
    celery: True

Create a redis sls:

# config/redis.sls
redis:
  True

And add it to the config for the server e.g:

# top.sls
'test-a':
  - config.redis
  - sites.my

cron

To create a periodic (cron like task), start by create a function in your app/tasks.py file (where app is an application in your project):

from celery import task

@task()
def process_periodic_task():
    """Nothing to do... just testing."""
    pass

In your settings/base.py file, set-up the schedule e.g:

# periodic tasks (requires 'beat')
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
    'process-every-minute': {
        'task': 'app.tasks.process_periodic_task',
        'schedule': crontab(minute='*/1'),
    },
}

Warning

If the tasks should only be run on a production system, then add to settings/production.py

To start the cron queue on your development system:

celery -A project beat --loglevel=info

Development

Tasks

Warning

Remember to use the correct pattern for transactions when adding tasks to the queue. For details, see below…

To create a task, create a function in your app/tasks.py file (where app is an application in your project) e.g:

from celery import task

@task()
def sync_document():
    # some example code
    with transaction.atomic():
        qs = TestModel.objects.select_for_update().filter(complete=True)

To add this task to the queue (if you are not in a transaction):

from workflow.tasks import create_workflows

create_workflows.apply_async(args=[workflow.pk], countdown=1)
# or
create_workflows.delay()

Django provides the on_commit function to register callback functions that should be executed after a transaction is successfully committed:

from django.db import transaction
from dash.tasks import sync_document

with transaction.atomic():
    # if you are in a transaction
    transaction.on_commit(lambda: sync_document.apply_async(
        args=[self.object.pk],
        countdown=2,
    ))
    # or
    transaction.on_commit(lambda: sync_document.delay(self.object.pk))
    # or
    transaction.on_commit(lambda: process_mail.delay())
  • countdown is a shortcut to set the estimated time of arrival by seconds into the future.

To get the ID of the current task (from How do I get the task ID):

@app.task(bind=True)
def mytask(self):
    # self.request.id is the ID of the current task
    cache.set(self.request.id, "Running")

Logging

Just append the logfile option e.g:

celery -A project worker --loglevel=info --logfile="celery.log"

Monitor

List the queues:

redis-cli keys \*

List the number of messages in a queue:

redis-cli llen www_hatherleigh_info

Purge

To purge existing tasks:

celery -A project purge