Understand some details about airflow dags

A DAG (directed acyclic graph) is a collection of tasks with directional dependencies. A dag also has a schedule time, a start date and an end date (optional).
For each schedule, (say daily or hourly), the DAG needs to run each individual tasks as their dependencies are met.
Certain tasks have the property of depending on their own past, meaning that they can’t run until their previous schedule (and upstream tasks) are completed.
A task is unit of work in airflow which runs some python code for some works for an execution_date

Execution_date, in airflow, each dag is running for a specific date to handle some data for that date.
The execution_date is the logical date and time which the DAG Run, and its task instances, are running for.
While a task_instance or DAG run might have a physical start date of now, their logical date might be 3 months ago because we are busy reloading something.

A DAG run and all task instances created within it are instanced with the same execution_date

Basically, a dag consists of following 3 things and working as an airflow DAG developer, you need to define it in a DAG file.

  • schedule, how often we run this DAG
  • tasks or operators, what to do in this DAG
  • own past condition (previous scheduled task)
  • dependencies, how tasks are depending on each other

Note: a pipeline definition and a dag definition are the same in the context of airflow

DAG definition file or pipeline definition

A DAG definition file is an python file where we defines all 3 elements above, it will be picked by airflow server, parsed and persisted into an database.
When airflow server parses the DAG definition, it will get the meta information from the python file, however it won’t execute the tasks defined in the file.
The actual tasks defined will be run in a different context when the scheduled time is met from the context where the script is parsed.
Different tasks run on different workers at different points in time, which means that this script cannot be used to cross communicate between tasks.

To define a DAG

1
2
3
4
5
6
7
8
9
from airflow import DAG
from airflow.utils.dates import days_ago

default_args = {
'start_date': days_ago(2),
'schedule_interval': timedelta(seconds=5)
}

dag = DAG('dag-id-data', default_args=default_args)
  • Explain:
    DAGs essentially act as namespaces for tasks. A task_id can only be added once to a DAG.
    If a dictionary of default_args is passed to a DAG, it will apply them to any of its operators. This makes it easy to apply a common parameter to many operators without having to type it many times. (Code 1)

Args to DAG:

dag_id, the id of the DAG, developer defined when creating the dag in python file.
description, the description for the DAG to e.g. be shown on the webserver
schedule_interval, defines how often that DAG runs

Values meaning

datetime.timedelta, will be added to your latest task instance’s execution_date to figure out the next schedule
dateutil.relativedelta.relativedelta
str that acts as a cron expression
start_date (datetime.datetime) – The timestamp from which the scheduler will attempt to backfill, e.g. das_ago(2)

To define a Task

1
2
3
4
5
6
from airflow.operators.bash_operator import BashOperator
op_print_date = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
  • Explain:
    task_id is defined by DAG developer
    BashOperator is used to run command ‘date’
    the task is assgined to the dag defined above

Dag Context with task define

Add keyword arguments when creating dag

1
task1 = Task(task_id='', dag = dag, ...)

Using python DAG context manager

1
2
With dag:
task1 = Task(task_id='', ...)

Dependency Define

Method call

1
task1.set_downstream(task2)

Bitshift Composition

1
task1 >> task2

Code Examples:

1
2
3
4
5
6
7
default_args = {
'start_date': datetime(2016, 1, 1),
'owner': 'airflow'
}
dag = DAG('dummy-dag', default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow the task will have the owner props inherited from the dag default args

Code 2: A simple DAG definition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
'depends_on_past': False,
'start_date': days_ago(2),
'schedule_interval': timedelta(seconds=5)
}

dag = DAG(
'dag-data-id',
default_args=default_args,
description='Data pipeline for data',
)

op_print_date = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)

References

Airflow official website
Python datetime delta