Aldarund Aldarund - 1 month ago 17
Python Question

Celery use wrong broker

I have setup from the the docs

celery.py file:

from __future__ import absolute_import

import os

from celery import Celery

from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'cars.settings')

app = Celery('cars')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


supervisord config

[program:bbay-celery]
command = /opt/webapps/bbay/env/bin/celery worker -A cars.celery:app ; Command to start app
directory = /opt/webapps/bbay/
user = bbay ; User to run as
numprocs = 1
stdout_logfile = /opt/webapps/bbay/logs/celery.log ; Where to write log messages
redirect_stderr = true ; Save stderr in the same log
environment=LANG='en_US.UTF-8',LC_ALL='en_US.UTF-8',DJANGO_SETTINGS_MODULE='cars.settings',CELERYD_CHDIR='/opt/webapps/bbay/'
autostart = true
autorestart = true
startsecs = 10


django settings :

BROKER_URL = 'redis://localhost:6379/0'


when i start worker all seems fine and correct broker url used:

(env)bbay@djproj:/opt/webapps/bbay$ celery -A cars.celery:app beat
celery beat v3.1.17 (Cipater) is starting.
__ - ... __ - _
Configuration ->
. broker -> redis://localhost:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> now (0s)


Here is my task:

@shared_task
def send_mail_task(template, context, send_to):
....


Here is how i use it:

send_mail_task.delay('email/confirmation_message.html', context, [user.email, ])


But when the task called it tried to connect to default broker ( host

'127.0.0.1:5672' ). Here is stacktrace:

Stacktrace (most recent call last):

File "django/core/handlers/base.py", line 111, in get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "django/views/decorators/csrf.py", line 57, in wrapped_view
return view_func(*args, **kwargs)
File "django/views/generic/base.py", line 69, in view
return self.dispatch(request, *args, **kwargs)
File "rest_framework/views.py", line 452, in dispatch
response = self.handle_exception(exc)
File "rest_framework/views.py", line 449, in dispatch
response = handler(request, *args, **kwargs)
File "accounts/api/views.py", line 132, in post
send_mail_task.delay('email/contact_seller.html', context, [profile.user.email, ])
File "celery/app/task.py", line 453, in delay
return self.apply_async(args, kwargs)
File "celery/app/task.py", line 555, in apply_async
**dict(self._get_exec_options(), **options)
File "celery/app/base.py", line 355, in send_task
reply_to=reply_to or self.oid, **options
File "celery/app/amqp.py", line 305, in publish_task
**kwargs
File "kombu/messaging.py", line 168, in publish
routing_key, mandatory, immediate, exchange, declare)
File "kombu/connection.py", line 457, in _ensured
interval_max)
File "kombu/connection.py", line 369, in ensure_connection
interval_start, interval_step, interval_max, callback)
File "kombu/utils/__init__.py", line 243, in retry_over_time
return fun(*args, **kwargs)
File "kombu/connection.py", line 237, in connect
return self.connection
File "kombu/connection.py", line 741, in connection
self._connection = self._establish_connection()
File "kombu/connection.py", line 696, in _establish_connection
conn = self.transport.establish_connection()
File "kombu/transport/pyamqp.py", line 112, in establish_connection
conn = self.Connection(**opts)
File "amqp/connection.py", line 165, in __init__
self.transport = self.Transport(host, connect_timeout, ssl)
File "amqp/connection.py", line 186, in Transport
return create_transport(host, connect_timeout, ssl)
File "amqp/transport.py", line 299, in create_transport
return TCPTransport(host, connect_timeout)
File "amqp/transport.py", line 95, in __init__
raise socket.error(last_err)


So what is wrong and how to make celery connect to the specified broker and where in celery docs is it?

Answer

The problem was that i missed celery in __init__.py. __init__.py should contain following ( from the docs ):

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
Comments