A DAG (directed acyclic graph) is a collection of tasks with directional dependencies. A dag also has a schedule time, a start date and an end date (optional).
For each schedule, (say daily or hourly), the DAG needs to run each individual tasks as their dependencies are met.
Certain tasks have the property of depending on their own past, meaning that they can’t run until their previous schedule (and upstream tasks) are completed.
A task is unit of work in airflow which runs some python code for some works for an execution_date
Execution_date, in airflow, each dag is running for a specific date to handle some data for that date.
The execution_date is the logical date and time which the DAG Run, and its task instances, are running for.
While a task_instance or DAG run might have a physical start date of now, their logical date might be 3 months ago because we are busy reloading something.
A DAG run and all task instances created within it are instanced with the same execution_date
Basically, a dag consists of following 3 things and working as an airflow DAG developer, you need to define it in a DAG file.
- schedule, how often we run this DAG
- tasks or operators, what to do in this DAG
- own past condition (previous scheduled task)
- dependencies, how tasks are depending on each other
Note: a pipeline definition and a dag definition are the same in the context of airflow
DAG definition file or pipeline definition
A DAG definition file is an python file where we defines all 3 elements above, it will be picked by airflow server, parsed and persisted into an database.
When airflow server parses the DAG definition, it will get the meta information from the python file, however it won’t execute the tasks defined in the file.
The actual tasks defined will be run in a different context when the scheduled time is met from the context where the script is parsed.
Different tasks run on different workers at different points in time, which means that this script cannot be used to cross communicate between tasks.
To define a DAG
1 | from airflow import DAG |
- Explain:
DAGs essentially act as namespaces for tasks. A task_id can only be added once to a DAG.
If a dictionary of default_args is passed to a DAG, it will apply them to any of its operators. This makes it easy to apply a common parameter to many operators without having to type it many times. (Code 1)
Args to DAG:
dag_id, the id of the DAG, developer defined when creating the dag in python file.
description, the description for the DAG to e.g. be shown on the webserver
schedule_interval, defines how often that DAG runs
Values meaning
datetime.timedelta, will be added to your latest task instance’s execution_date to figure out the next schedule
dateutil.relativedelta.relativedelta
str that acts as a cron expression
start_date (datetime.datetime) – The timestamp from which the scheduler will attempt to backfill, e.g. das_ago(2)
To define a Task
1 | from airflow.operators.bash_operator import BashOperator |
- Explain:
task_id is defined by DAG developer
BashOperator is used to run command ‘date’
the task is assgined to the dag defined above
Dag Context with task define
Add keyword arguments when creating dag
1 | task1 = Task(task_id='', dag = dag, ...) |
Using python DAG context manager
1 | With dag: |
Dependency Define
Method call
1 | task1.set_downstream(task2) |
Bitshift Composition
1 | task1 >> task2 |
Code Examples:
1 | default_args = { |
Code 2: A simple DAG definition
1 | from airflow import DAG |