I am using celery in my django application and I have set-up celery flower to monitor the tasks of celery. I have setup tasks where emails are send to the user when they register/submit/FP etc events. Now Flower gives me a nice details of the task and it's status. Now for every failed task I wanted an email to send to my account so that I don't have check the flower everyday for the failed task. I did the following configuration in my settings.py file
CELERY_SEND_TASK_ERROR_EMAILS = True
EMAIL_USE_TLS = True
EMAIL_HOST_USER = 'email@example.com'
EMAIL_HOST_PASSWORD = 'xyz123@'
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
SERVER_EMAIL = EMAIL_HOST_USER
EMAIL_HOST = 'xyz.abc.com'
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
I found out that there is a table created for maintaining the tasks in the database. So I just simply created a script which will check for failed task every hour for the past hour records in the table and if found any it will send an email.
#!venv/bin/python2 import os from django.conf import settings if __name__ == '__main__' and __package__ is None: os.sys.path.append( os.path.dirname( os.path.dirname( os.path.abspath(__file__)))) os.environ.setdefault("DJANGO_SETTINGS_MODULE", "rest_apis.settings") import django django.setup() from django.core.mail import EmailMessage from djcelery.models import TaskMeta from datetime import datetime, timedelta, time USERS_TO_NOTIFY = ['firstname.lastname@example.org'] TIME_THRESHOLD_INTERVAL = 60 def send_email(email_subject_line, email_body): email = EmailMessage(email_subject_line, email_body, settings.EMAIL_HOST_USER, USERS_TO_NOTIFY ) email.send() def main(): current_time = datetime.now() # Get Current TimeStamp time_threshold = current_time - timedelta(minutes=TIME_THRESHOLD_INTERVAL) # Get 60 minutes past current time stamp celery_taskmeta_objects = TaskMeta.objects.filter(status="FAILURE", date_done__gte=time_threshold) email_body = "Below are the tasks which failed : " if celery_taskmeta_objects.exists(): for celery_taskmeta in celery_taskmeta_objects: print celery_taskmeta.task_id email_body += "\n\ntask_id : %s" % celery_taskmeta.task_id email_body += "\nstatus : %s" % celery_taskmeta.status email_body += "\ndate : %s" % celery_taskmeta.date_done email_body += "\ntraceback :" email_body += "\n%s\n\n" % celery_taskmeta.traceback email_subject_line = '[URGENT] Celery task failure in last %s minutes' % (TIME_THRESHOLD_INTERVAL) send_email(email_subject_line, email_body) main()
Now in an email I get a full stack trace too and the id of the task. Now my requirement was to check every hour so I just put the script in a crontab. Now you can change the time threshold as per your basic need and work accordingly.