what is it?

how to automatically retry failed Celery tasks

examples

try/except

tasks.py
@shared_task(bind=True)
def task_process_notification(self):
    try:
        if not random.choice([0, 1]):
            # mimic random error
            raise Exception()
 
        # this would block the I/O
        requests.post('https://httpbin.org/delay/5')
    except Exception as e:
        logger.error('exception raised, it would be retry after 5 seconds')
        raise self.retry(exc=e, countdown=5)
  • setting bind=True means the first argument will be the current task instance (self)

decorator (from celery version 4.0)

tasks.py
@shared_task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 7, 'countdown': 5})
def task_process_notification(self):
    if not random.choice([0, 1]):
        # mimic random error
        raise Exception()
 
    requests.post('https://httpbin.org/delay/5')
  • autoretry_for is a tuple

  • can add exponential backoff: @shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 5})

  • exponential backoff with delay factor:
    @shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=5, retry_kwargs={'max_retries': 5})

  • by default exponential backoff has retry_jitter enabled to prevent thundering herd

Re-using retry arguments (from celery v4.4)

tasks.py
class BaseTaskWithRetry(celery.Task):
    autoretry_for = (Exception, KeyError)
    retry_kwargs = {'max_retries': 5}
    retry_backoff = True
 
 
@shared_task(bind=True, base=BaseTaskWithRetry)
def task_process_notification(self):
    raise Exception()