mateuszb mateuszb - 12 days ago 6
Python Question

Can't invoke celery task in Django tests synchronously

I'd like to invoke celery tasks synchronously during my Django tests without need to run celery worker. To achieve this I have specified

CELERY_ALWAYS_EAGER=True
in my settings.py but it doesn't seem to work. So I decided to apply override_settings decorator to specific test that looks like this

@override_settings(CELERY_ALWAYS_EAGER=True, BROKER_BACKEND='memory',
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True)
def test_foo(self):
...


Unfortunately, this test still invokes task in my celery worker. What I can be missing? To be specific, I'm using Django 1.10 with Celery 4.0.0.

Answer

In celery 4.0 configuration parameters has changed,

Try these instead in your tests,

@override_settings(
    task_eager_propagates=True,
    task_always_eager=True,
    broker_url='memory://',
    backend='memory'
)

I was facing same issue, solved using new lowercase names for tests as well as in default celery settings.

Here is new settings to original settings map,
http://docs.celeryproject.org/en/latest/userguide/configuration.html#new-lowercase-settings

Celery settings change info:
http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#lowercase-setting-names

Comments