Pipeline

https://gitlab.com/kb/pipeline

Icon:

<i class="fa fa-paw"></i>

Management Commands

Process outstanding steps (was called pipeline_process_steps):

django-admin.py pipeline_process_outstanding_steps

Warning

Only run one instance of this command at a time. It starts by getting a list of outstanding tasks and processing them all. We don’t want tasks being processed twice.

Queues

The pipeline app uses two Dramatiq queues:

  1. DRAMATIQ_QUEUE_NAME

  2. DRAMATIQ_QUEUE_NAME_PIPELINE

We use this for one of our projects where some of the tasks need to run on a Windows server. We make these tasks as follows:

# https://dramatiq.io/cookbook.html#binding-worker-groups-to-queues
@dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME_PIPELINE)
def pipeline_test_task():
    logger.info("pipeline.tasks.pipeline_test_task")

Default tasks are as follows:

# https://dramatiq.io/cookbook.html#binding-worker-groups-to-queues
@dramatiq.actor(queue_name=settings.DRAMATIQ_QUEUE_NAME)
def pipeline_test_task_default_queue():
    logger.info("pipeline.tasks.pipeline_test_task_default_queue")

The Dramatiq queues, for the example app, can be started using the management commands:

django-admin.py start_dramatiq_workers
django-admin.py start_dramatiq_workers_pipeline

To try out the example queues, create some test tasks:

django-admin.py pipeline_test_task
django-admin.py pipeline_test_task_default_queue

Testing

To start a test pipeline and run the background process:

django-admin.py create_test_pipeline_process

To view your test processes, browse to http://localhost:8000/pipeline/process/