what is it?

celery allows you to spin up additional queues so you can have more control over which workers process which tasks

configuration

settings.py
from kombu import Queue
 
CELERY_TASK_DEFAULT_QUEUE = 'default'
 
# Force all queues to be explicitly listed in `CELERY_TASK_QUEUES` to help prevent typos
CELERY_TASK_CREATE_MISSING_QUEUES = False
 
CELERY_TASK_QUEUES = (
    # need to define default queue here or exception would be raised
    Queue('default'),
 
    Queue('high_priority'),
    Queue('low_priority'),
)
  • the celery default queue name is celery. Here, we used CELERY_TASK_DEFAULT_QUEUE to change the name to default
  • CELERY_TASK_CREATE_MISSING_QUEUES = False prevents Celery from auto-creating queues for us that we don’t have defined in CELERY_TASK_QUEUES

update worker to use high_priority and default queues: celery -A django_celery_example worker --loglevel=info -Q high_priority,default

specify the destination for a particular task in the following places:

  • apply_async: divide.apply_async(args=[1, 2], queue='game_of_thrones')
  • defined on the task itself:
    @shared_task(name='myapp.tasks.my_task', queue='my_custom_queue')
    def my_task(arg1, arg2):
        # Task logic goes here
        print(f"Running task my_task with args: {arg1}, {arg2}")
  • defined in CELERY_TASK_ROUTES
    • manual

      settings.py
      CELERY_TASK_ROUTES = {
          'django_celery_example.celery.*': {
              'queue': 'high_priority',
          },
      }
    • dynamic

      app/tasks.py
      @shared_task(name='default:dynamic_example_one')
      def dynamic_example_one():
          logger.info('Example One')
       
       
      @shared_task(name='low_priority:dynamic_example_two')
      def dynamic_example_two():
          logger.info('Example Two')
       
       
      @shared_task(name='high_priority:dynamic_example_three')
      def dynamic_example_three():
          logger.info('Example Three')
      settings.py
      def route_task(name, args, kwargs, options, task=None, **kw):
          if ':' in name:
              queue, _ = name.split(':')
              return {'queue': queue}
          return {'queue': 'default'}
       
       
      CELERY_TASK_ROUTES = (route_task,)