Advanced utils you would love in Airflow

There are some objects in airflow which are usually not in any demo from the offical website, sometimes we need to read the source code to get inspired by some pieces of code.
In this post, I will try to collect some of the usages that I have tested and hopefully someone lands in this page will take them away and use them in their project.

To get connection details in a DAG

In my working case, I have some db details that we are noting using airflow operator to fetch data from, instead of that I need to give those db details, such as host, username and password
to another out system as parameters. I don’t want to have them in my code, the best concept for keeping those information in airflow is the connections!
So I create a connection in airflow and I will need to get those details in my dag. here is the working example.

  • Create a connection from cli
    1
    2
    3
    4
    5
    $ airflow connections -a --conn_id test_connection_name \
    --conn_type http \
    --conn_host my_host_name.com \
    --conn_port 8999 \
    --conn_password 123456
  • Create a dag
    We can use BaseHook class method to get connection details by id, A full dag can be found here
    1
    2
    3
    4
    from airflow.hooks.base_hook import BaseHook
    ...
    connection = BaseHook.get_connection("username_connection")
    password = connection.password # This is a getter that returns the unencrypted password.
  • Backfill the DAG and check the log from the task in UI
    1
    $ airflow backfill test-connection-hook -s 2020-08-01 -e 2020-08-01
    1
    2
    3
    4
    5
    [2020-08-13 21:54:40,333] {standard_task_runner.py:78} INFO - Job 9150: Subtask getDetailsFromConnection
    ...
    [2020-08-13 21:54:40,478] {logging_mixin.py:112} INFO - 41, 123456
    ...
    [2020-08-13 21:54:45,256] {local_task_job.py:102} INFO - Task exited with return code 0

To render a Jinja template by using your own context

We usually provide operator args with a jinja template, if the args is templated, in the doc or code, you will find template_fields defined, such as template_fields= ['bash_command', 'env'] in the bash_operator

However, if you have a python code where you want to render your own variables, you can using following method from helpers module.

There is an helper method which is built on top of jinja in airflow, you can import it in your dag file

1
from airflow.utils.helpers import parse_template_string

Suppose you have a template string in your dag definition, however, you only know the context when the dag task is running.
For example, the execution_date which is provided in context.ds
Then you can use parse_template_string method to get a template and use the render with context to get your filename as following.

1
2
3
4
5
6
7
8
9
10
11
filename_template='abc-{{my_name}}.csv'

def my_sleeping_function(**context):
filename_template, filename_jinja_template = parse_template_string(filename_template)
filename = filename_jinja_template.render(my_name='Kai')

task = PythonOperator(
task_id='sleep'
python_callable=my_sleeping_function,
dag=dag,
)

I have another post with more details.