Celery (using Redis) ******************** .. highlight:: python - From `Using Celery with Django`_ - Also see :doc:`dev-dramatiq` for an alternative to Celery which we are using for one of our Windows projects (still needs scheduling and Salt states). To use a Celery queue in your project... Add the following to ``requirements/base.txt``:: celery redis .. tip: See :doc:`requirements` for the current version. Create a ``celery.py`` file in the ``project`` folder:: # -*- encoding: utf-8 -*- from celery import Celery from django.conf import settings app = Celery('project') # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) Add the following to your ``project/__init__.py`` file:: from .celery import app as celery_app In your ``settings/production.py`` file, you will have the following:: DOMAIN = get_env_variable('DOMAIN') DATABASE = DOMAIN.replace('.', '_').replace('-', '_') DATABASES = { ... Under ``DATABASES``, add the following:: # Celery from kombu import Exchange, Queue # transport BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # number of worker processes (will be 3 == controller, worker and beat) CELERYD_CONCURRENCY = 1 # rate limits CELERY_DISABLE_RATE_LIMITS = True # serializer CELERY_TASK_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json'] # queue CELERY_DEFAULT_QUEUE = DATABASE CELERY_QUEUES = ( Queue(DATABASE, Exchange(DATABASE), routing_key=DATABASE), ) .. note:: If you are writing an example application, then just use ``CELERY_ALWAYS_EAGER`` (as shown below). .. tip:: If you want to test this locally, then copy ``DOMAIN``, ``DATABASE``, and the *Celery* section into your dev file e.g. ``dev_patrick.py``. Check your environment settings to make sure the ``DOMAIN`` is set to the value you want it set to. .. note:: To use RabbitMQ, just remove ``BROKER_URL`` and ``CELERY_RESULT_BACKEND``. In your ``settings/dev_test.py`` file (below ``DATABASES``), add the following:: # http://docs.celeryproject.org/en/2.5/django/unit-testing.html CELERY_ALWAYS_EAGER = True To start the task queue on your development system:: celery -A project worker --loglevel=info If you are using an example app, then replace ``project`` with the folder of the example app e.g:: celery -A example_xero worker --loglevel=info --logfile="logger-worker.log" To deploy, add ``celery`` and ``redis`` to your pillar e.g: .. code-block:: yaml # sites/my.sls sites: www.hatherleigh.info: package: hatherleigh_info profile: django celery: True Create a ``redis`` ``sls``: .. code-block:: yaml # config/redis.sls redis: True And add it to the config for the server e.g: .. code-block:: yaml # top.sls 'test-a': - config.redis - sites.my .. _celery_cron: cron ==== To create a periodic (``cron`` like task), start by create a function in your ``app/tasks.py`` file (where ``app`` is an application in your project):: from celery import task @task() def process_periodic_task(): """Nothing to do... just testing.""" pass In your ``settings/base.py`` file, set-up the schedule e.g:: # periodic tasks (requires 'beat') from celery.schedules import crontab CELERYBEAT_SCHEDULE = { 'process-every-minute': { 'task': 'app.tasks.process_periodic_task', 'schedule': crontab(minute='*/1'), }, } .. warning:: If the tasks should only be run on a production system, then add to ``settings/production.py`` To start the cron queue on your development system:: celery -A project beat --loglevel=info Development =========== .. _celery_tasks: Tasks ----- .. warning:: Remember to use the correct pattern for transactions when adding tasks to the queue. For details, see below... To create a task, create a function in your ``app/tasks.py`` file (where ``app`` is an application in your project) e.g:: from celery import task @task() def sync_document(): # some example code with transaction.atomic(): qs = TestModel.objects.select_for_update().filter(complete=True) To add this task to the queue (if you are not in a transaction):: from workflow.tasks import create_workflows create_workflows.apply_async(args=[workflow.pk], countdown=1) # or create_workflows.delay() Django provides the ``on_commit`` function to register callback functions that should be executed after a transaction is successfully committed:: from django.db import transaction from dash.tasks import sync_document with transaction.atomic(): # if you are in a transaction transaction.on_commit(lambda: sync_document.apply_async( args=[self.object.pk], countdown=2, )) # or transaction.on_commit(lambda: sync_document.delay(self.object.pk)) # or transaction.on_commit(lambda: process_mail.delay()) - ``countdown`` is a shortcut to set the estimated time of arrival by seconds into the future. To get the ID of the current task (from `How do I get the task ID`_):: @app.task(bind=True) def mytask(self): # self.request.id is the ID of the current task cache.set(self.request.id, "Running") Logging ------- Just append the ``logfile`` option e.g:: celery -A project worker --loglevel=info --logfile="celery.log" Monitor ------- List the queues:: redis-cli keys \* List the number of messages in a queue:: redis-cli llen www_hatherleigh_info Purge ----- To purge existing tasks:: celery -A project purge .. _`How do I get the task ID`: http://celery.readthedocs.org/en/latest/faq.html#how-can-i-get-the-task-id-of-the-current-task .. _`Opbeat and Celery`: https://opbeat.com/docs/articles/get-started-with-django/#celery .. _`Using Celery with Django`: http://celery.readthedocs.org/en/latest/django/first-steps-with-django.html#django-first-steps