Dramatiq (using Redis) ********************** .. highlight:: python We are working on a Windows project and it would be great if we could find a reliable queuing solution which supports Linux and Windows. I think Dramatiq meets this criteria, but we still have work to do before it can be used for projects in production. - :doc:`dev-apscheduler` - https://dramatiq.io/ - https://github.com/Bogdanp/django_dramatiq - Also see :doc:`celery` for our current queuing solution for Linux based projects. .. note:: We started using ``dramatiq`` for a *Windows project*... (because the new version of Celery isn't supported on Windows). Provision ========= Add the following to your ``pillar`` file (``dramatiq: True``, ``scheduler: True`` and ``celery: False``) e.g. .. code-block:: yaml sites: hatherleigh_net: profile: django celery: False dramatiq: True scheduler: True env: redis_host: localhost redis_port: 6379 .. _dramatiq_configure: Configure ========= Add the following to ``requirements/base.txt``:: # linux etc... django-dramatiq== # or for windows kb-django-dramatiq== dramatiq[redis, watch]== .. tip:: For windows deployments we use ``kb-django-dramatiq`` instead of ``django-dramatiq``. For details, see https://github.com/Bogdanp/django_dramatiq/issues/109 and https://github.com/pkimber/django_dramatiq/blob/master/README.rst .. tip:: See :doc:`dev-requirements` for the current version. Add the following to ``.env.fish``: .. code-block:: bash set -x DOMAIN "dev" set -x LOG_FOLDER "" set -x LOG_SUFFIX "dev" Add the following to ``.env.fish`` (if you are :doc:`dev-kubernetes`): .. code-block:: bash set -x REDIS_HOST (kubectl get nodes --namespace default -o jsonpath="{.items[0].status.addresses[0].address}") set -x REDIS_PORT (kubectl get --namespace default -o jsonpath="{.spec.ports[0].nodePort}" services kb-redis-master) Add the following to ``.gitlab-ci.yml``: .. code-block:: yaml - export DOMAIN=dev - export LOG_FOLDER= - export LOG_SUFFIX=dev Add the following to ``settings/base.py``:: THIRD_PARTY_APPS = ( # add django_dramatiq to installed apps before any of your custom apps "django_dramatiq", LOGGING = { "handlers": { "logfile": { "level": "WARNING", "class": "logging.handlers.RotatingFileHandler", "filename": os.path.join( get_env_variable("LOG_FOLDER"), "{}-{}-logger.log".format( get_env_variable("DOMAIN").replace("_", "-"), get_env_variable("LOG_SUFFIX"), ), ), "maxBytes": 100000000, "backupCount": 10, "formatter": "standard", }, }, .. tip:: See :doc:`dev-logging` for a sample ``LOGGING`` configuration. In your user settings e.g. ``settings/dev_patrick.py``:: DATABASE_NAME = get_env_variable("DATABASE_NAME") REDIS_HOST = get_env_variable("REDIS_HOST") REDIS_PORT = get_env_variable("REDIS_PORT") # https://dramatiq.io/reference.html#middleware DRAMATIQ_BROKER = { "BROKER": "dramatiq.brokers.redis.RedisBroker", "OPTIONS": {"url": "redis://{}:{}/0".format(REDIS_HOST, REDIS_PORT)}, "MIDDLEWARE": [ # drops messages that have been in the queue for too long "dramatiq.middleware.AgeLimit", # cancels actors that run for too long "dramatiq.middleware.TimeLimit", # lets you chain success and failure callbacks "dramatiq.middleware.Callbacks", # automatically retries failed tasks with exponential backoff "dramatiq.middleware.Retries", # # Cleans up db connections on worker shutdown. # # This middleware is vital in taking care of closing expired # connections after each message is processed. "django_dramatiq.middleware.DbConnectionsMiddleware", ], } # KB Software queue name (to allow multiple sites on one server) DRAMATIQ_QUEUE_NAME = DATABASE_NAME DRAMATIQ_QUEUE_NAME_PIPELINE = DRAMATIQ_QUEUE_NAME In ``settings/dev_test.py``:: DRAMATIQ_BROKER = { "BROKER": "dramatiq.brokers.stub.StubBroker", "OPTIONS": {}, "MIDDLEWARE": [ "dramatiq.middleware.AgeLimit", "dramatiq.middleware.TimeLimit", "dramatiq.middleware.Callbacks", "dramatiq.middleware.Pipelines", "dramatiq.middleware.Retries", ], } # KB Software queue name (to allow multiple sites on one server) DRAMATIQ_QUEUE_NAME = DATABASE_NAME .. tip:: Make sure you use a ``DATABASE_NAME`` variable in the ``DATABASES`` settings. In ``settings/production.py``:: REDIS_HOST = get_env_variable("REDIS_HOST") REDIS_PORT = get_env_variable("REDIS_PORT") # https://dramatiq.io/reference.html#middleware DRAMATIQ_BROKER = { "BROKER": "dramatiq.brokers.redis.RedisBroker", "OPTIONS": {"url": "redis://{}:{}/0".format(REDIS_HOST, REDIS_PORT)}, "MIDDLEWARE": [ # drops messages that have been in the queue for too long "dramatiq.middleware.AgeLimit", # cancels actors that run for too long "dramatiq.middleware.TimeLimit", # lets you chain success and failure callbacks "dramatiq.middleware.Callbacks", # automatically retries failed tasks with exponential backoff "dramatiq.middleware.Retries", # # Cleans up db connections on worker shutdown. # # This middleware is vital in taking care of closing expired # connections after each message is processed. "django_dramatiq.middleware.DbConnectionsMiddleware", ], } # KB Software queue name (to allow multiple sites on one server) DRAMATIQ_QUEUE_NAME = DATABASE_NAME DRAMATIQ_QUEUE_NAME_PIPELINE = "{}_pipeline".format(DATABASE_NAME) .. tip:: Make sure you use a ``DATABASE_NAME`` variable in the ``DATABASES`` settings (the ``DATABASE_NAME`` is used by APScheduler) e.g:: DOMAIN = get_env_variable("DOMAIN") DATABASE_NAME = DOMAIN.replace(".", "_").replace("-", "_") Maintenance / Debug =================== Flush all / drop all messages from the queue:: import dramatiq broker = dramatiq.get_broker() broker.flush_all() .. tip:: There is a ``flush(queue_name)`` method if you don't want to clear all of the queues. Outstanding messages:: import dramatiq from django.conf import settings broker = dramatiq.get_broker() # list the queues broker.get_declared_queues() consumer = broker.consume(settings.DRAMATIQ_QUEUE_NAME) # how many outstanding messages (does this work) consumer.outstanding_message_count Tasks ===== :: import dramatiq # https://dramatiq.io/cookbook.html#binding-worker-groups-to-queues @dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME) def process_steps(): logger.info("pipeline.tasks.process_steps") from pipeline.models import PipelineProcessStep PipelineProcessStep.objects.process() logger.info("pipeline.tasks.process_steps - complete") .. tip:: To limit the retry count:: @dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME, max_retries=0) Here are some example tasks: https://gitlab.com/kb/pipeline/blob/master/pipeline/tasks.py To call the task:: transaction.on_commit( lambda: process_step.send_with_options( args=(pipeline_process_step.pk,) ) ) .. tip:: You can add a delay... e.g. ``count_words.send_with_options(args=("example",), delay=10000)`` or:: transaction.on_commit(lambda: process_mail.send()) or:: process_steps.send() .. note:: Copied from https://gitlab.com/kb/pipeline/blob/master/pipeline/models.py .. note:: It is important to set the ``DRAMATIQ_QUEUE_NAME`` when defining the ``actor`` e.g. ``@dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME)``. .. note:: It doesn't seem to be possible to select the queue name when calling ``send_with_options`` (I tried and failed). Management Commands =================== Create a ``start_dramatiq_workers`` management command in your project e.g:: # -*- encoding: utf-8 -*- from django.conf import settings from base.dramatiq_utils import DramatiqBaseCommandMixin class Command(DramatiqBaseCommandMixin): """Start Dramatiq workers - using the correct queue name.""" PROCESSES = 3 def get_queue_name(self): return settings.DRAMATIQ_QUEUE_NAME .. tip:: To reload the Dramatiq workers when the code changes, add the ``--reload`` parameter. .. _apscheduler: https://apscheduler.readthedocs.io/en/latest/ .. _start_dramatiq_workers: https://gitlab.com/kb/base/blob/2292-dramatiq/base/management/commands/start_dramatiq_workers.py