what is it?
how to automatically retry failed Celery tasks
examples
try/except
@shared_task(bind=True)
def task_process_notification(self):
try:
if not random.choice([0, 1]):
# mimic random error
raise Exception()
# this would block the I/O
requests.post('https://httpbin.org/delay/5')
except Exception as e:
logger.error('exception raised, it would be retry after 5 seconds')
raise self.retry(exc=e, countdown=5)- setting
bind=Truemeans the first argument will be the current task instance (self)
decorator (from celery version 4.0)
@shared_task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 7, 'countdown': 5})
def task_process_notification(self):
if not random.choice([0, 1]):
# mimic random error
raise Exception()
requests.post('https://httpbin.org/delay/5')-
autoretry_foris a tuple -
can add exponential backoff:
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 5}) -
exponential backoff with delay factor:
@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=5, retry_kwargs={'max_retries': 5}) -
by default exponential backoff has
retry_jitterenabled to prevent thundering herd
Re-using retry arguments (from celery v4.4)
class BaseTaskWithRetry(celery.Task):
autoretry_for = (Exception, KeyError)
retry_kwargs = {'max_retries': 5}
retry_backoff = True
@shared_task(bind=True, base=BaseTaskWithRetry)
def task_process_notification(self):
raise Exception()