3

I am unit testing celery tasks. I have chain tasks that also have groups, so a chord is resulted.

The test should look like:

  • run celery task ( delay )
  • wait for task and all subtasks
  • assert

I tried the following:

def wait_for_result(result):
    result.get()
    for child in result.children or list():
        if isinstance(child, GroupResult):
           # tried looping over task result in group
           # until tasks are ready, but without success 
           pass
        wait_for_result(child)

This creates a deadlock, chord_unlock being retried forever. I am not interested in task results. How can I wait for all the subtasks to finish?

dynamicmindset
  • 31
  • 1
  • 1
  • 5

2 Answers2

4

Although this is an old question, I just wanted to share how I got rid of the deadlock issue, just in case it helps somebody.

Like the celery logs says, never use get() inside a task. This indeed will create a deadlock.

I have a similar set of celery tasks which includes chain of group tasks, hence making it a chord. I'm calling these tasks using tornado, by making HTTP request. So what I did was something like this:

@task
def someFunction():
    ....


@task
def someTask():
    ....


@task
def celeryTask():
    groupTask = group([someFunction.s(i) for i in range(10)])

    job = (groupTask| someTask.s())

    return job

When celeryTask() is being called by tornado, the chain will start executing, & the UUID of someTask() will be held in job. It will look something like

AsyncResult: 765b29a8-7873-4b28-b05c-7e19c33e950c

This UUID is returned and the celeryTask() exits before even the chain starts executing(ideally), hence leaving space for another process to run.

I then used the tornado layer to check the status of the task. Details on the tornado layer can be found in this stackoverflow question

Community
  • 1
  • 1
Nitheesh A S
  • 367
  • 1
  • 13
1

Have you tried chord + callback ?

http://docs.celeryproject.org/en/latest/userguide/canvas.html#chords

>>> callback = tsum.s()
>>> header = [add.s(i, i) for i in range(100)]
>>> result = chord(header)(callback)
>>> result.get()
9900
atv
  • 1,880
  • 4
  • 21
  • 26