what is it?
celery allows you to spin up additional queues so you can have more control over which workers process which tasks
configuration
from kombu import Queue
CELERY_TASK_DEFAULT_QUEUE = 'default'
# Force all queues to be explicitly listed in `CELERY_TASK_QUEUES` to help prevent typos
CELERY_TASK_CREATE_MISSING_QUEUES = False
CELERY_TASK_QUEUES = (
# need to define default queue here or exception would be raised
Queue('default'),
Queue('high_priority'),
Queue('low_priority'),
)- the celery default queue name is
celery. Here, we usedCELERY_TASK_DEFAULT_QUEUEto change the name todefault CELERY_TASK_CREATE_MISSING_QUEUES = Falseprevents Celery from auto-creating queues for us that we don’t have defined inCELERY_TASK_QUEUES
update worker to use high_priority and default queues:
celery -A django_celery_example worker --loglevel=info -Q high_priority,default
specify the destination for a particular task in the following places:
apply_async:divide.apply_async(args=[1, 2], queue='game_of_thrones')- defined on the task itself:
@shared_task(name='myapp.tasks.my_task', queue='my_custom_queue') def my_task(arg1, arg2): # Task logic goes here print(f"Running task my_task with args: {arg1}, {arg2}") - defined in
CELERY_TASK_ROUTES-
manual
settings.py CELERY_TASK_ROUTES = { 'django_celery_example.celery.*': { 'queue': 'high_priority', }, } -
dynamic
app/tasks.py @shared_task(name='default:dynamic_example_one') def dynamic_example_one(): logger.info('Example One') @shared_task(name='low_priority:dynamic_example_two') def dynamic_example_two(): logger.info('Example Two') @shared_task(name='high_priority:dynamic_example_three') def dynamic_example_three(): logger.info('Example Three')settings.py def route_task(name, args, kwargs, options, task=None, **kw): if ':' in name: queue, _ = name.split(':') return {'queue': queue} return {'queue': 'default'} CELERY_TASK_ROUTES = (route_task,)
-