Akshay Shah Akshay Shah - 3 years ago 210
Python Question

Celery send mail for every failed task

I am using celery in my django application and I have set-up celery flower to monitor the tasks of celery. I have setup tasks where emails are send to the user when they register/submit/FP etc events. Now Flower gives me a nice details of the task and it's status. Now for every failed task I wanted an email to send to my account so that I don't have check the flower everyday for the failed task. I did the following configuration in my settings.py file

CELERY_SEND_TASK_ERROR_EMAILS = True


and
ADMINS
.

EMAIL_USE_TLS = True
EMAIL_HOST_USER = 'noreply@xyz.com'
EMAIL_HOST_PASSWORD = 'xyz123@'
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
SERVER_EMAIL = EMAIL_HOST_USER
EMAIL_HOST = 'xyz.abc.com'
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'


These are settings of "From" email address.
Few days back,one of my team member accidentally changed the password of the above email_host and forgot to update settings file. It wasn't after it was too later that the tasks are failing because of SMTP Authentication error.

Is there any way around this that even if SMTP authentication error occurs, I immediately get the email from celery ? I am not so sure about this.

Are there any other tools that will monitor my tasks and for every failed task it will send me the mail.

Answer Source

I found out that there is a table created for maintaining the tasks in the database. So I just simply created a script which will check for failed task every hour for the past hour records in the table and if found any it will send an email.

script.py

#!venv/bin/python2

import os
from django.conf import settings

if __name__ == '__main__' and __package__ is None:
    os.sys.path.append(
        os.path.dirname(
            os.path.dirname(
                os.path.abspath(__file__))))

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "rest_apis.settings")

import django

django.setup()
from django.core.mail import EmailMessage
from djcelery.models import TaskMeta
from datetime import datetime, timedelta, time


USERS_TO_NOTIFY = ['ops@yopmail.com']
TIME_THRESHOLD_INTERVAL = 60

def send_email(email_subject_line, email_body):
    email = EmailMessage(email_subject_line,
                         email_body,
                         settings.EMAIL_HOST_USER,
                         USERS_TO_NOTIFY
                         )
    email.send()


def main():
    current_time = datetime.now()   # Get Current TimeStamp
    time_threshold = current_time - timedelta(minutes=TIME_THRESHOLD_INTERVAL) # Get 60 minutes past current time stamp
    celery_taskmeta_objects = TaskMeta.objects.filter(status="FAILURE", date_done__gte=time_threshold)

    email_body = "Below are the tasks which failed : "
    if celery_taskmeta_objects.exists():
        for celery_taskmeta in celery_taskmeta_objects:
            print celery_taskmeta.task_id
            email_body += "\n\ntask_id : %s" % celery_taskmeta.task_id
            email_body += "\nstatus : %s" % celery_taskmeta.status
            email_body += "\ndate : %s" % celery_taskmeta.date_done
            email_body += "\ntraceback :"
            email_body += "\n%s\n\n" % celery_taskmeta.traceback
        email_subject_line = '[URGENT] Celery task failure in last %s minutes' % (TIME_THRESHOLD_INTERVAL)
        send_email(email_subject_line, email_body)


main()

Now in an email I get a full stack trace too and the id of the task. Now my requirement was to check every hour so I just put the script in a crontab. Now you can change the time threshold as per your basic need and work accordingly.

Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download