Celery (using Redis)

From Using Celery with Django

To use a Celery queue in your project…

Add the following to requirements/base.txt:

celery
django-redis
redis

Create a celery.py file in the project folder:

# -*- encoding: utf-8 -*-
from __future__ import absolute_import

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
# PJK 15/11/2016, Why do we set this here?  I don't think we should!
# PJK 20/12/2016, I am pretty sure we don't need this.  Testing now!
# os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')

from django.conf import settings

app = Celery('project')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

If you are using Opbeat in your project (see Monitor and Opbeat and Celery), then add the following to your celery.py file:

from opbeat.contrib.django.models import client, logger, register_handlers
from opbeat.contrib.celery import register_signal

try:
    register_signal(client)
except Exception as e:
    logger.exception('Failed installing celery hook: %s' % e)
    if 'opbeat.contrib.django' in settings.INSTALLED_APPS:
        register_handlers()

Add the following to your project/__init__.py file:

from .celery import app as celery_app

In your settings/production.py file, you will have the following:

DOMAIN = get_env_variable('DOMAIN')
DATABASE = DOMAIN.replace('.', '_')

DATABASES = {
    ...

Under DATABASES, add the following:

# Celery
from kombu import Exchange, Queue
# transport
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
# number of worker processes (will be 3 == controller, worker and beat)
CELERYD_CONCURRENCY = 1
# rate limits
CELERY_DISABLE_RATE_LIMITS = True
# serializer
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
# queue
CELERY_DEFAULT_QUEUE = DATABASE
CELERY_QUEUES = (
    Queue(DATABASE, Exchange(DATABASE), routing_key=DATABASE),
)

Note

If you are writing an example application, then just use CELERY_ALWAYS_EAGER (as shown below).

Tip

If you want to test this locally, then copy DOMAIN, DATABASE, and the Celery section into your dev file e.g. dev_patrick.py. Check your environment settings to make sure the DOMAIN is set to the value you want it set to.

Note

To use RabbitMQ, just remove BROKER_URL and CELERY_RESULT_BACKEND.

In your settings/dev_test.py file (below DATABASES), add the following:

# http://docs.celeryproject.org/en/2.5/django/unit-testing.html
CELERY_ALWAYS_EAGER = True

To start the task queue on your development system:

celery -A project worker --loglevel=info

To deploy, add celery and redis to your pillar e.g:

# sites/my.sls

sites:
  www.hatherleigh.info:
    package: hatherleigh_info
    profile: django
    celery: True

Create a redis sls:

# config/redis.sls
redis:
  True

And add it to the config for the server e.g:

# top.sls
'test-a':
  - config.redis
  - sites.my

Task

To create a task, create a function in your app/tasks.py file (where app is an application in your project) e.g:

from celery import task

@task()
def my_task():
    # some example code
    with transaction.atomic():
        qs = TestModel.objects.select_for_update().filter(complete=True)

To add this task to the queue:

from .tasks import my_task
my_task.delay()

Warning

Remember to use the correct pattern for transactions when adding tasks to the queue. For details, see Next

To get the ID of the current task (from How do I get the task ID):

@app.task(bind=True)
def mytask(self):
    # self.request.id is the ID of the current task
    cache.set(self.request.id, "Running")

cron

To create a periodic (cron like task), start by create a function in your app/tasks.py file (where app is an application in your project):

from celery import task

@task()
def process_periodic_task():
    """Nothing to do... just testing."""
    pass

In your settings/base.py file, set-up the schedule e.g:

# periodic tasks (requires 'beat')
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
    'process-every-minute': {
        'task': 'app.tasks.process_periodic_task',
        'schedule': crontab(minute='*/1'),
    },
}

Warning

If the tasks should only be run on a production system, then add to settings/production.py

To start the cron queue on your development system:

celery -A project beat --loglevel=info

Development

To purge existing tasks:

celery -A project purge

Logging

Just append the logfile option e.g:

celery -A project worker --loglevel=info --logfile="celery.log"

Monitor

List the queues:

redis-cli keys \*

List the number of messages in a queue:

redis-cli llen www_hatherleigh_info