19

I'm trying to access external files in a Airflow Task to read some sql, and I'm getting "file not found". Has anyone come across this?

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
    'my_dat',
    start_date=datetime(2017, 1, 1),
    catchup=False,
    schedule_interval=timedelta(days=1)
)

def run_query():
    # read the query
    query = open('sql/queryfile.sql')
    # run the query
    execute(query)

tas = PythonOperator(
    task_id='run_query', dag=dag, python_callable=run_query)

The log state the following:

IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'

I understand that I could simply copy and paste the query inside the same file, it's really not at neat solution. There are multiple queries and the text is really big, embed it with the Python code would compromise readability.

Alessandro Mariani
  • 1,092
  • 2
  • 9
  • 26

3 Answers3

17

Here is an example use Variable to make it easy.

  • First add Variable in Airflow UI -> Admin -> Variable, eg. {key: 'sql_path', values: 'your_sql_script_folder'}

  • Then add following code in your DAG, to use Variable from Airflow you just add.

DAG code:

import airflow
from airflow.models import Variable

tmpl_search_path = Variable.get("sql_path")

dag = airflow.DAG(
   'tutorial',
    schedule_interval="@daily",
    template_searchpath=tmpl_search_path,  # this
    default_args=default_args
)
  • Now you can use sql script name or path under folder Variable

  • You can learn more in this

Felipe Augusto
  • 6,879
  • 9
  • 34
  • 69
zhongjiajie
  • 1,656
  • 1
  • 10
  • 17
  • Please, could you provide a full example. Defining `template_searchpath`, what does that change the overall script behaviour, can I reference the file by its name now? For example would this complete your example: ``` with open(query_file_name, 'r') as file: query_content = file.read() ``` ? – Ricardo M S Feb 12 '20 at 20:57
  • 2
    I don't think this would work with the example DAG the OP uses with `PythonOperator` and Python's native `open()`. The PythonOperator runs in a pod which doesn't have access to the same set of locations as the process which parses the DAGs. – LondonRob Feb 17 '20 at 16:25
  • 2
    @RicardoMS Hi, when you want to define your own `airflow.models.Variable`, the easiest way is by Airflow UI, `homepage -> Admin -> Variables` to create new variable, eg: `{'Key': 'RicardoMS_variable', 'Val': '/opt/specific/path'}`. After you done, you could use the example code to load you variable by `tmpl_search_path = Variable.get("RicardoMS_variable")` instead of direct use `'/opt/specific/path'` – zhongjiajie Feb 18 '20 at 10:47
7

All relative paths are taken in reference to the AIRFLOW_HOME environment variable. Try:

  • Giving absolute path
  • place the file relative to AIRFLOW_HOME
  • try logging the PWD in the python callable and then decide what path to give (Best option)
Priyank Mehta
  • 2,255
  • 2
  • 21
  • 32
  • 2
    Good comment, but unfortunately AIRFLOW_HOME is an optional environment variable - Airflow works just fine without it - and you can't guarantee that it will be set. – Kirk Broadhurst Nov 15 '17 at 21:05
4

Assuming that the sql directory is relative to the current Python file, you can figure out the absolute path to the sql file like this:

import os

CUR_DIR = os.path.abspath(os.path.dirname(__file__))

def run_query():
    # read the query
    query = open(f"{CUR_DIR}/sql/queryfile.sql")
    # run the query
    execute(query)
Jake
  • 1,428
  • 2
  • 12
  • 20