Dramatiq (using Redis)

We are working on a Windows project and it would be great if we could find a reliable queuing solution which supports Linux and Windows. I think Dramatiq meets this criteria, but we still have work to do before it can be used for projects in production.

Note

We started using dramatiq for a Windows project… (because the new version of Celery isn’t supported on Windows).

Provision

Add the following to your pillar file (scheduler: True and celery: False) e.g.

sites:
  hatherleigh_net:
    profile: django
    celery: False
    scheduler: True
    env:
      redis_host: localhost
      redis_port: 6379

Configure

Add the following to requirements/base.txt:

django-dramatiq==
dramatiq[redis, watch]==

Add the following to .env.fish:

Add the following to .env.fish (if you are Development with Kubernetes):

Add the following to .gitlab-ci.yml:

Add the following to settings/base.py:

THIRD_PARTY_APPS = (
    # add django_dramatiq to installed apps before any of your custom apps
    "django_dramatiq",

LOGGING = {
    "handlers": {
        "logfile": {
            # ...
            "filename": os.path.join(
                get_env_variable("LOG_FOLDER"),
                "{}-{}-logger.log".format(
                    get_env_variable("DOMAIN").replace("_", "-"),
                    get_env_variable("LOG_SUFFIX"),
                ),
            ),
            # ...
        },
    },

In your user settings e.g. settings/dev_patrick.py:

DATABASE_NAME = get_env_variable("DATABASE_NAME")

REDIS_HOST = get_env_variable("REDIS_HOST")
REDIS_PORT = get_env_variable("REDIS_PORT")
# https://dramatiq.io/reference.html#middleware
DRAMATIQ_BROKER = {
    "BROKER": "dramatiq.brokers.redis.RedisBroker",
    "OPTIONS": {"url": "redis://{}:{}/0".format(REDIS_HOST, REDIS_PORT)},
    "MIDDLEWARE": [
        # drops messages that have been in the queue for too long
        "dramatiq.middleware.AgeLimit",
        # cancels actors that run for too long
        "dramatiq.middleware.TimeLimit",
        # lets you chain success and failure callbacks
        "dramatiq.middleware.Callbacks",
        # automatically retries failed tasks with exponential backoff
        "dramatiq.middleware.Retries",
    ],
}
# KB Software queue name (to allow multiple sites on one server)
DRAMATIQ_QUEUE_NAME = DATABASE_NAME
DRAMATIQ_QUEUE_NAME_PIPELINE = DRAMATIQ_QUEUE_NAME

In settings/dev_test.py:

DRAMATIQ_BROKER = {
    "BROKER": "dramatiq.brokers.stub.StubBroker",
    "OPTIONS": {},
    "MIDDLEWARE": [
        "dramatiq.middleware.AgeLimit",
        "dramatiq.middleware.TimeLimit",
        "dramatiq.middleware.Callbacks",
        "dramatiq.middleware.Pipelines",
        "dramatiq.middleware.Retries",
    ],
}
# KB Software queue name (to allow multiple sites on one server)
DRAMATIQ_QUEUE_NAME = DATABASE_NAME

Tip

Make sure you use a DATABASE_NAME variable in the DATABASES settings.

In settings/production.py:

REDIS_HOST = get_env_variable("REDIS_HOST")
REDIS_PORT = get_env_variable("REDIS_PORT")
# https://dramatiq.io/reference.html#middleware
DRAMATIQ_BROKER = {
    "BROKER": "dramatiq.brokers.redis.RedisBroker",
    "OPTIONS": {"url": "redis://{}:{}/0".format(REDIS_HOST, REDIS_PORT)},
    "MIDDLEWARE": [
        # drops messages that have been in the queue for too long
        "dramatiq.middleware.AgeLimit",
        # cancels actors that run for too long
        "dramatiq.middleware.TimeLimit",
        # lets you chain success and failure callbacks
        "dramatiq.middleware.Callbacks",
        # automatically retries failed tasks with exponential backoff
        "dramatiq.middleware.Retries",
    ],
}
# KB Software queue name (to allow multiple sites on one server)
DRAMATIQ_QUEUE_NAME = DATABASE_NAME
DRAMATIQ_QUEUE_NAME_PIPELINE = "{}_pipeline".format(DATABASE_NAME)

Tip

Make sure you use a DATABASE_NAME variable in the DATABASES settings (the DATABASE_NAME is used by APScheduler) e.g:

DOMAIN = get_env_variable("DOMAIN")
DATABASE_NAME = DOMAIN.replace(".", "_").replace("-", "_")

Maintenance / Debug

Flush all / drop all messages from the queue:

import dramatiq
broker = dramatiq.get_broker()
broker.flush_all()

Tip

There is a flush(queue_name) method if you don’t want to clear all of the queues.

Outstanding messages:

import dramatiq
from django.conf import settings

broker = dramatiq.get_broker()
# list the queues
broker.get_declared_queues()
consumer = broker.consume(settings.DRAMATIQ_QUEUE_NAME)
# how many outstanding messages (does this work)
consumer.outstanding_message_count

Tasks

# https://dramatiq.io/cookbook.html#binding-worker-groups-to-queues
@dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME)
def process_steps():
    logger.info("pipeline.tasks.process_steps")
    from pipeline.models import PipelineProcessStep
    PipelineProcessStep.objects.process()
    logger.info("pipeline.tasks.process_steps - complete")

Tip

To limit the retry count:

@dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME, max_retries=0)

Here are some example tasks: https://gitlab.com/kb/pipeline/blob/master/pipeline/tasks.py

To call the task:

transaction.on_commit(
    lambda: process_next_step.send_with_options(
        args=(next_process_step.pk,)
    )
)

Note

It is important to set the DRAMATIQ_QUEUE_NAME when defining the actor e.g. @dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME).

Note

It doesn’t seem to be possible to select the queue name when calling send_with_options (I tried and failed).

Management Commands

Create a start_dramatiq_workers management command in your project e.g:

# -*- encoding: utf-8 -*-
from django.conf import settings
from base.dramatiq_utils import DramatiqBaseCommandMixin

class Command(DramatiqBaseCommandMixin):
    """Start Dramatiq workers - using the correct queue name."""

    PROCESSES = 3

    def get_queue_name(self):
        return settings.DRAMATIQ_QUEUE_NAME

Tip

To reload the Dramatiq workers when the code changes, add the --reload parameter.