mateuszb mateuszb - 3 months ago 24
Python Question

Can't invoke celery task in Django tests synchronously

I'd like to invoke celery tasks synchronously during my Django tests without need to run celery worker. To achieve this I have specified

CELERY_ALWAYS_EAGER=True
in my settings.py but it doesn't seem to work. So I decided to apply override_settings decorator to specific test that looks like this

@override_settings(CELERY_ALWAYS_EAGER=True, BROKER_BACKEND='memory',
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True)
def test_foo(self):
...


Unfortunately, this test still invokes task in my celery worker. What I can be missing? To be specific, I'm using Django 1.10 with Celery 4.0.0.

Answer

In celery 4.0 configuration parameters has changed,

Try these instead in your tests,

@override_settings(
    task_eager_propagates=True,
    task_always_eager=True,
    broker_url='memory://',
    backend='memory'
)

I was facing same issue, solved using new lowercase names for tests as well as in default celery settings.

Here is new settings to original settings map,
http://docs.celeryproject.org/en/latest/userguide/configuration.html#new-lowercase-settings

Celery settings change info:
http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#lowercase-setting-names