Airflow variables in DAG

Variables are a generic way to store and retrieve arbitrary content or settings as a simple key value store within Airflow.

While your pipeline code definition and most of your constants and variables should be defined in code and stored in source control, it can be useful to have some variables or configuration items accessible and modifiable through the UI.

It can also be used as a context for different environments.

There are 3 ways to create variables

  • UI
  • CLI
  • code

UI

From the UI, we can navigate to Admin-Variables to manage.

CLI

From the CLI, we can use following commands

1
2
airflow variables -g key
airflow variables -s key value

Code

No matter where we setup a variable, in the end we want to read variables in a DAG so that we can easily change the context of a DAG run.

There are two ways to read variables in a DAG

  • Python Code

    1
    2
    3
    4
    5
    from airflow.models import Variable
    Variable.set("foo", "value")
    foo = Variable.get("foo")
    bar = Variable.get("bar", deserialize_json=True)
    baz = Variable.get("baz", default_var=None)
  • Jinja template
    You can use a variable from a jinja template with the syntax, such as a bash operator command:

    1
    echo {{ var.value.<variable_name> }}

    or if you need to deserialize a json object from the variable :

    1
    echo {{ var.json.<variable_name> }}

Best practice

You should avoid usage of Variables outside an operator’s execute() method or Jinja templates if possible, as Variables create a connection to metadata DB of Airflow to fetch the value, which can slow down parsing and place extra load on the DB.

Variables will create db connection every time scheduler parses a DAG

Example to understand best practice

  • Let’s set variable env=dev from CLI

    1
    $ airflow variables -s env dev
  • Create a DAG

    1
    2
    3
    4
    5
    6
    7
    8
    9
    from airflow.models import Variable

    env = Variable.get("env")
    print('' if env is None else env + 'parse time')

    with dag:
    os_operator = PythonOperator(task_id = "os_operator", python_callable=print_env)
    jinja_operator = BashOperator(task_id="get_variable_value", bash_command='echo {{ var.value.env }} ')

  • Running explaination
    When the scheduler parses the DAG, which shall happen every a few seconds, we will find devparse time in the log
    When the DAG is scheduled, we will see bashoperator print the dev variables

    1
    2
    3
    4
    5
    6
    7
    //os_operator
    [2020-04-08 14:56:50,752] {{logging_mixin.py:112}} INFO - devexecution time
    ...
    //get_variable_value
    [2020-04-08 14:56:59,133] {{bash_operator.py:115}} INFO - Running command: echo dev
    [2020-04-08 14:56:59,151] {{bash_operator.py:122}} INFO - Output:
    [2020-04-08 14:56:59,158] {{bash_operator.py:126}} INFO - dev