Dramatiq (using Redis)

We are working on a Windows project and it would be great if we could find a reliable queuing solution which supports Linux and Windows. I think Dramatiq meets this criteria, but we still have work to do before it can be used for projects in production.

Note

We are using dramatiq for a Windows project

  • We still need to find a solution for scheduling (probably apscheduler).

  • For Linux we need to build Salt states for uwsgi and supervisord before using in production.

Provision

Add the following to your pillar file (scheduler: True and celery: False) e.g.

sites:
  hatherleigh_net:
    profile: django
    celery: False
    scheduler: True
    env:
      redis_host: localhost
      redis_port: 6379

Configure

Add the following to requirements/base.txt:

django-dramatiq==0.8.0
dramatiq[redis, watch]==1.7.0

Add the following to .env.fish:

set -x DOMAIN "dev"
set -x LOG_FOLDER ""

Add the following to settings/base.py:

THIRD_PARTY_APPS = (
    # add django_dramatiq to installed apps before any of your custom apps
    "django_dramatiq",

LOG_FOLDER = get_env_variable("LOG_FOLDER")
LOGGING = {
    "handlers": {
        "logfile": {
            "filename": os.path.join(
                LOG_FOLDER, "{}-logger.log".format(get_env_variable("DOMAIN")),
            ),
        },
    },

In settings/dev_patrick.py:

DATABASE_NAME = get_env_variable("DATABASE_NAME")

REDIS_HOST = get_env_variable("REDIS_HOST")
REDIS_PORT = get_env_variable("REDIS_PORT")
# https://dramatiq.io/reference.html#middleware
DRAMATIQ_BROKER = {
    "BROKER": "dramatiq.brokers.redis.RedisBroker",
    "OPTIONS": {"url": "redis://{}:{}/0".format(REDIS_HOST, REDIS_PORT)},
    "MIDDLEWARE": [
        # drops messages that have been in the queue for too long
        "dramatiq.middleware.AgeLimit",
        # cancels actors that run for too long
        "dramatiq.middleware.TimeLimit",
        # lets you chain success and failure callbacks
        "dramatiq.middleware.Callbacks",
        # automatically retries failed tasks with exponential backoff
        "dramatiq.middleware.Retries",
        # keeps track of task executions
        # "django_dramatiq.middleware.AdminMiddleware",
        # cleans up db connections on worker shutdown
        # "django_dramatiq.middleware.DbConnectionsMiddleware",
    ],
}

# KB Software queue name (to allow multiple sites on one server)
DRAMATIQ_QUEUE_NAME = DATABASE_NAME
# Defines which database should be used to persist Task objects when the
# AdminMiddleware is enabled.  The default value is "default".
DRAMATIQ_TASKS_DATABASE = "default"

# Results and result backends are not enabled by default and you should avoid
# using them until you have a really good use case
# https://dramatiq.io/reference.html#results
# DRAMATIQ_RESULT_BACKEND = {
#     "BACKEND": "dramatiq.results.backends.redis.RedisBackend",
#     "BACKEND_OPTIONS": {"url": "redis://{}:{}".format(REDIS_HOST, REDIS_PORT),},
#     "MIDDLEWARE_OPTIONS": {"result_ttl": 60000},
# }

In settings/dev_test.py:

DRAMATIQ_BROKER = {
    "BROKER": "dramatiq.brokers.stub.StubBroker",
    "OPTIONS": {},
    "MIDDLEWARE": [
        "dramatiq.middleware.AgeLimit",
        "dramatiq.middleware.TimeLimit",
        "dramatiq.middleware.Callbacks",
        "dramatiq.middleware.Pipelines",
        "dramatiq.middleware.Retries",
        # "django_dramatiq.middleware.AdminMiddleware",
        # "django_dramatiq.middleware.DbConnectionsMiddleware",
    ],
}
# KB Software queue name (to allow multiple sites on one server)
DRAMATIQ_QUEUE_NAME = DATABASE_NAME

Tip

Make sure you use a DATABASE_NAME variable in the DATABASES settings.

In settings/production.py:

REDIS_HOST = get_env_variable("REDIS_HOST")
REDIS_PORT = get_env_variable("REDIS_PORT")
# https://dramatiq.io/reference.html#middleware
DRAMATIQ_BROKER = {
    "BROKER": "dramatiq.brokers.redis.RedisBroker",
    "OPTIONS": {"url": "redis://{}:{}/0".format(REDIS_HOST, REDIS_PORT)},
    "MIDDLEWARE": [
        # drops messages that have been in the queue for too long
        "dramatiq.middleware.AgeLimit",
        # cancels actors that run for too long
        "dramatiq.middleware.TimeLimit",
        # lets you chain success and failure callbacks
        "dramatiq.middleware.Callbacks",
        # automatically retries failed tasks with exponential backoff
        "dramatiq.middleware.Retries",
        # keeps track of task executions
        # "django_dramatiq.middleware.AdminMiddleware",
        # cleans up db connections on worker shutdown
        # "django_dramatiq.middleware.DbConnectionsMiddleware",
    ],
}
# KB Software queue name (to allow multiple sites on one server)
DRAMATIQ_QUEUE_NAME = DATABASE_NAME

Tip

Make sure you use a DATABASE_NAME variable in the DATABASES settings (the DATABASE_NAME is used by APScheduler) e.g:

DOMAIN = get_env_variable("DOMAIN")
DATABASE_NAME = DOMAIN.replace(".", "_").replace("-", "_")

Tasks

Here is an example task: https://gitlab.com/kb/steps/blob/master/steps/tasks.py

To call the task:

transaction.on_commit(
    lambda: process_next_step.send_with_options(
        args=(next_process_step.pk,),
        queue_name=settings.DRAMATIQ_QUEUE_NAME,
    )
)

(copied from https://gitlab.com/kb/steps/blob/master/steps/models.py)

Note

It is important to set the DRAMATIQ_QUEUE_NAME when defining the actor e.g. @dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME) and when using send_with_options.

Management Commands

We have a start_dramatiq_workers management command which sets the queue names correctly for our projects. The source code can be found in our base app.