Dramatiq (using Redis)

We are working on a Windows project and it would be great if we could find a reliable queuing solution which supports Linux and Windows. I think Dramatiq meets this criteria, but we still have work to do before it can be used for projects in production.

Note

We started using dramatiq for a Windows project… (because the new version of Celery isn’t supported on Windows).

Provision

Add the following to your pillar file (scheduler: True and celery: False) e.g.

sites:
  hatherleigh_net:
    profile: django
    celery: False
    scheduler: True
    env:
      redis_host: localhost
      redis_port: 6379

Configure

Add the following to requirements/base.txt:

django-dramatiq==
dramatiq[redis, watch]==

Add the following to .env.fish:

set -x DOMAIN "dev"
set -x LOG_FOLDER ""
set -x LOG_SUFFIX "dev"

Add the following to settings/base.py:

THIRD_PARTY_APPS = (
    # add django_dramatiq to installed apps before any of your custom apps
    "django_dramatiq",

LOGGING = {
    "handlers": {
        "logfile": {
            # ...
            "filename": os.path.join(
                get_env_variable("LOG_FOLDER"),
                "{}-{}-logger.log".format(
                    get_env_variable("DOMAIN").replace("_", "-"),
                    get_env_variable("LOG_SUFFIX"),
                ),
            ),
            # ...
        },
    },

In your user settings e.g. settings/dev_patrick.py:

DATABASE_NAME = get_env_variable("DATABASE_NAME")

REDIS_HOST = get_env_variable("REDIS_HOST")
REDIS_PORT = get_env_variable("REDIS_PORT")
# https://dramatiq.io/reference.html#middleware
DRAMATIQ_BROKER = {
    "BROKER": "dramatiq.brokers.redis.RedisBroker",
    "OPTIONS": {"url": "redis://{}:{}/0".format(REDIS_HOST, REDIS_PORT)},
    "MIDDLEWARE": [
        # drops messages that have been in the queue for too long
        "dramatiq.middleware.AgeLimit",
        # cancels actors that run for too long
        "dramatiq.middleware.TimeLimit",
        # lets you chain success and failure callbacks
        "dramatiq.middleware.Callbacks",
        # automatically retries failed tasks with exponential backoff
        "dramatiq.middleware.Retries",
    ],
}
# KB Software queue name (to allow multiple sites on one server)
DRAMATIQ_QUEUE_NAME = DATABASE_NAME

In settings/dev_test.py:

DRAMATIQ_BROKER = {
    "BROKER": "dramatiq.brokers.stub.StubBroker",
    "OPTIONS": {},
    "MIDDLEWARE": [
        "dramatiq.middleware.AgeLimit",
        "dramatiq.middleware.TimeLimit",
        "dramatiq.middleware.Callbacks",
        "dramatiq.middleware.Pipelines",
        "dramatiq.middleware.Retries",
    ],
}
# KB Software queue name (to allow multiple sites on one server)
DRAMATIQ_QUEUE_NAME = DATABASE_NAME

Tip

Make sure you use a DATABASE_NAME variable in the DATABASES settings.

In settings/production.py:

REDIS_HOST = get_env_variable("REDIS_HOST")
REDIS_PORT = get_env_variable("REDIS_PORT")
# https://dramatiq.io/reference.html#middleware
DRAMATIQ_BROKER = {
    "BROKER": "dramatiq.brokers.redis.RedisBroker",
    "OPTIONS": {"url": "redis://{}:{}/0".format(REDIS_HOST, REDIS_PORT)},
    "MIDDLEWARE": [
        # drops messages that have been in the queue for too long
        "dramatiq.middleware.AgeLimit",
        # cancels actors that run for too long
        "dramatiq.middleware.TimeLimit",
        # lets you chain success and failure callbacks
        "dramatiq.middleware.Callbacks",
        # automatically retries failed tasks with exponential backoff
        "dramatiq.middleware.Retries",
    ],
}
# KB Software queue name (to allow multiple sites on one server)
DRAMATIQ_QUEUE_NAME = DATABASE_NAME

Tip

Make sure you use a DATABASE_NAME variable in the DATABASES settings (the DATABASE_NAME is used by APScheduler) e.g:

DOMAIN = get_env_variable("DOMAIN")
DATABASE_NAME = DOMAIN.replace(".", "_").replace("-", "_")

Tasks

Here is an example task: https://gitlab.com/kb/steps/blob/master/steps/tasks.py

To call the task:

transaction.on_commit(
    lambda: process_next_step.send_with_options(
        args=(next_process_step.pk,),
        queue_name=settings.DRAMATIQ_QUEUE_NAME,
    )
)

(copied from https://gitlab.com/kb/steps/blob/master/steps/models.py)

Note

It is important to set the DRAMATIQ_QUEUE_NAME when defining the actor e.g. @dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME) and when using send_with_options.

Management Commands

start_dramatiq_workers

Sets the queue names correctly for our projects. The source code can be found in our base app (start_dramatiq_workers).

Tip

To reload the Dramatiq workers when the code changes, add the --reload parameter.