5

I have 2 tasks. In the first, python operator computes something and in the second I want to use the output of the python operator in the Http operator. Here is my code:

source_list = ['account', 'sales']

for source_type in source_list:
    t2 = PythonOperator(
                task_id='compute_next_gather_time_for_' + source_type,
                python_callable=compute_next_gather_time,
                provide_context=True,
                trigger_rule=TriggerRule.ALL_SUCCESS,
                op_args=[source_type],
                retries=3
            )

    t3 = SimpleHttpOperator(
                task_id='request_' + source_type + '_report',
                method='POST',
                http_conn_id='abc',
                endpoint=endpoint,
                data=json.dumps({
                    "query": {
                        "start": "{{ task_instance.xcom_pull(task_ids='prev_task_id') }}",
                        "stop": str(yesterday),
                        "fields": [
                            1
                        ]
                    }
                }),
                headers={"Content-Type": "application/json", "Authorization": 'abc'},
                response_check=lambda response: True if len(response.json()) == 0 else False,
                log_response=True,
                retries=3
            )

Query: I want to pass previous task id in t3 in its data variable. I am not sure how to do that since t2 task id is not constant. It changes with changing source_type. Evidently, when I tried it did not render it.

Gagan
  • 1,506
  • 2
  • 28
  • 53
  • Does this answer your question? [Apache Airflow - get all parent task\_ids](https://stackoverflow.com/questions/54728513/apache-airflow-get-all-parent-task-ids) – y2k-shubham Sep 24 '20 at 13:11

2 Answers2

3

I haven't used Jinja templating in any of my DAGs before, but I have been faced with similar problems where I was needing to retrieve XCOM values from a particular task that has a dynamically generated task_id.

You could define the task_ids in T3 in the same way you defined the task_id in T2. For example:

source_list = ['account', 'sales']

for source_type in source_list:

    task_id='compute_next_gather_time_for_' + source_type

    t2 = PythonOperator(
                task_id=task_id,
                python_callable=compute_next_gather_time,
                provide_context=True,
                trigger_rule=TriggerRule.ALL_SUCCESS,
                op_args=[source_type],
                retries=3
            )

    t3 = SimpleHttpOperator(
                task_id='request_' + source_type + '_report',
                method='POST',
                http_conn_id='abc',
                endpoint=endpoint,
                data=json.dumps({
                    "query": {
                        "start": "{{ task_instance.xcom_pull(task_ids=task_id) }}",
                        "stop": str(yesterday),
                        "fields": [
                            1
                        ]
                    }
                }),
                headers={"Content-Type": "application/json", "Authorization": 'abc'},
                response_check=lambda response: True if len(response.json()) == 0 else False,
                log_response=True,
                retries=3
            )
Josh
  • 1,005
  • 1
  • 7
  • 17
  • 1
    Thanks buddy. It worked. I changed 'data' into this and it worked fine. Instead of creating a variable, i just used it directly in jinja. data=json.dumps({ "query": { "start": "{{ task_instance.xcom_pull(task_ids='" + 'compute_next_gather_time_for_' + source_type + "') }}", "stop": str(yesterday), "fields": [ 1 ] } }), – Gagan Jun 04 '19 at 23:15
  • @Gagan, can you please add this as a new answer, since your answer shows how to implement the advice? – cdabel Apr 22 '20 at 19:05
3

I was able to get it by doing this:

next(iter(context['task'].upstream_task_ids))
cosbor11
  • 12,119
  • 9
  • 48
  • 64