Airflow start_date with cron schedule_interval is not confused anymore when you know this

Airflow DAG start_date with days_ago is making us confused all the time. When a dag will be kick off? Will it be started?

The first DAG Run is created based on the minimum start_date for the tasks in your DAG.
Subsequent DAG Runs are created by the scheduler process, based on your DAG’s schedule_interval, sequentially.

The notes from airflow official website makes sense when you look at it in the first look, however, when you try to create you airflow DAG with a cron string, you never know what it means.
When my friend Yuxia comes to discuss about her case about running a dag every even day at 1 a.m., I thought it was so easy to do that.

1
2
3
4
default_args = {
start_date = days_ago(1)
}
dag = DAG('demo-cron-schedule-interval', default_args = default_args, schedule_interval='0 1 2-30/2 * *', ...)
  • I can check the correctness with (Crontab guru)[https://crontab.guru/#0_1_1-31/2_*_*]
    Today is 2020-08-14, initially from above quote, the start_date will be 2020-08-13 and the first DAG Run shall be created, but my cron says it shouldn’t create a DAG Run yesterday since it’s odd day.

In fact, when she checked the system, there was not DAG Run started at 2020-08-14 01:00:00.

So what’s wrong with the quote? Why Yuxia’s DAG was not running?

Curiously, I checked the code logic in the scheduler_job
In this post, I’ll try to explain the outcome.

What scope

A DAG has start_date not set as datetiem.timedelta, it could e.g. dags_ago(1)
The start_date is set in default args ONLY
A DAG is using cron string or preset as schedule_interval, 0 1 2-30/2 * *

Issue to explain

Will the first DAG Run be kicked off by airflow scheduler?

Concepts from code

  • DAG start_date resolve, the scheduler is parsing the DAGs every 5 seconds (depending on setup).
    Each time when the scheduler is running, it will calculate a start_date depending on current time(utcnow()).
    days_ago(1) will be resolved as following.

    1
    start_date = utcnow() - (1 day) and By default the time is set to midnight, e.g. day - 1 00:00:00

    It’s very important to realise that start_date + (1 day) != utcnow()

  • DAG start_date adjustment, airflow will start subprocesses to create DAG Runs, it firstly checks the schedule_interval and calculate previous cron time(previous_cron), the further previous time(previous_pre_cron) and next cron time(next_cron) based on current time.
    previous_pre_cron -> previous_cron -> utcnow() -> next_cron.
    The cron times
    The start_date of a DAG will be adjusted by the scheduler. In our scope, we can think the start_date will be adjusted as following rules.
    It picks the later one from previous_pre_cron and the resolved start_date and update dag.start_date

    1
    dag.start_date = max(previous_pre_cron, dag.start_date)
  • Normalize_schedule to next_run_date which is the execution date, which is named as normalize_schedule in the code logic. It is the adjusted start_date that will be normalized. The next_run_date will be DAG Run execution_date. It will try to align the start_date to one of the cron times.
    For examples, cron times is 08-14 01:00:00 and 08-16 01:00:00, any start_time in between, e.g. 08-15 00:00:00 shall be aligned to 08-16 01:00:00. which means next cron time from the start date. If a start_time equals to a cron time, then the result will be the same. e.g. normalize_schedule(08-14 01:00:00)=08-14 01:00:00

  • Period end
    From FAQ, we know that Airflow scheduler triggers the task soon after the start_date + schedule_interval is passed, which I doult it results in confusion when it comes to cron schedule_interval context.

    From the code logic, I think it means the execution_date + schedule_interval. If you cron means every 2 days, then the schedule_interval shall be 2 day.

Figure out when a dag will be scheduled

To answer the question, we need to do 4 steps to get the result

  • Cron time calculation, previous_pre_cron, previous_cron, next_cron
  • Resolve start_date
  • Adjust start_date to align with schedule_interval
  • Normalize adjusted start_date
  • Calcurate Period
  • Decide a DAG run

Let’s assume some facts to continue a calculation example.

1
2
3
cron is set: `0 1 2-30/2 * *` 
start_date: days_ago(1)
today: 2020-08-14
  • Calculate previous_pre_cron, previous_cron and next_cron time based on the time when the scheduler runs. Since it runs peridically, so those times probably are changing during the day. We can take 3 examples as following.
scheduler time previous_pre_cron previous_cron next_cron
08-14 00:30:00 08-10 01:00:00 08-12 01:00:00 08-14 01:00:00
08-14 02:00:00 08-12 01:00:00 08-14 01:00:00 08-16 01:00:00
08-15 08:00:00 08-12 01:00:00 08-14 01:00:00 08-16 01:00:00
  • Resolve start_date
    Calculate the start_date based on the time when the scheduler runs, it changes as well when given config such as days_ago(1).
    It will have the same start_date during different scheduler times in a day. They all have the mid night of previous day as you can see as folowing.

    scheduler time start_date
    08-14 00:30:00 08-13 00:00:00
    08-14 02:00:00 08-13 00:00:00
    08-15 08:00:00 08-14 00:00:00
  • Adjust start_date to align with schedule_interval
    As discussed above, we compare the start_date with previous-pre cron to get the real start_date. The bigger one wins!

    scheduler time previous_pre_cron previous_cron next_cron start_date adjusted_start
    08-14 00:30:00 08-10 01:00:00 08-12 01:00:00 08-14 01:00:00 08-13 00:00:00 08-13 00:00:00
    08-14 02:00:00 08-12 01:00:00 08-14 01:00:00 08-16 01:00:00 08-13 00:00:00 08-13 00:00:00
    08-15 08:00:00 08-12 01:00:00 08-14 01:00:00 08-16 01:00:00 08-14 00:00:00 08-14 00:00:00
  • Normalize the adjusted start_date to find possible execution_date.
    Nomalize start_date(execution_date) is calculated by two steps,

    • Find the next cron time (next_cron(adjusted_start)) and pre cron time (pre_cron(adjusted_start)) based on the adjusted_start_date. (which is different from now())
    • Compare to normalize
1
nomalize(adjusted_start) = adjusted_start == pre_cron(adjusted_start) ? pre_cron(adjusted_start) : next_cron(adjusted_start)`
adjusted_start pre_cron(adjusted_start) next_cron(adjusted_start) nomalize(adjusted_start)
08-13 00:00:00 08-12 01:00:00 08-14 01:00:00 08-14 01:00:00
08-13 00:00:00 08-12 01:00:00 08-14 01:00:00 08-14 01:00:00
08-14 00:00:00 08-14 01:00:00 08-16 01:00:00 08-16 01:00:00
  • Period end
    It’s easier to get period end from the normalized start date

    1
    Period end = nomalize(adjusted_start) + schedule_interval
    adjusted_start nomalize(adjusted_start) Period end
    08-13 00:00:00 08-14 01:00:00 08-16 01:00:00
    08-13 00:00:00 08-14 01:00:00 08-16 01:00:00
    08-14 00:00:00 08-16 01:00:00 08-18 01:00:00
  • Decide if a run will be started
    We need to compare the normalized start date and period end with the current time again, if either one is later then now(), then scheduler won’t create a DAG Run

    scheduler time adjusted_start nomalize(adjusted_start) Period end DAG Run?
    08-14 00:30:00 08-13 00:00:00 08-14 01:00:00 08-16 01:00:00 no
    08-14 02:00:00 08-13 00:00:00 08-14 01:00:00 08-16 01:00:00 no
    08-15 08:00:00 08-14 00:00:00 08-16 01:00:00 08-18 01:00:00 no

Let’s take the same example above, however, change the start_date to days_ago(2)

  • Cron times and adjusted_start

    scheduler time previous_pre_cron previous_cron next_cron start_date adjusted_start
    08-14 00:30:00 08-10 01:00:00 08-12 01:00:00 08-14 01:00:00 08-12 00:00:00 08-12 00:00:00
    08-14 02:00:00 08-12 01:00:00 08-14 01:00:00 08-16 01:00:00 08-12 00:00:00 08-12 01:00:00
  • normalize

    scheduler time adjusted_start pre_cron(adjusted_start) next_cron(adjusted_start) nomalize(adjusted_start)
    08-14 00:30:00 08-12 00:00:00 08-10 01:00:00 08-12 01:00:00 08-12 01:00:00
    08-14 02:00:00 08-12 01:00:00 08-12 01:00:00 08-14 01:00:00 08-12 01:00:00
  • Period end and decision

    scheduler time nomalize(adjusted_start) Period end Dag Run?
    08-14 00:30:00 08-12 01:00:00 08-14 01:00:00 no
    08-14 02:00:00 08-12 01:00:00 08-14 01:00:00 yes

Summary

  • The first DAG Run is created based on the minimum start_date for the tasks in your DAG.
    It says based on, which doesn’t mean it will run the DAG at start_date.

  • Airflow scheduler triggers the task soon after the start_date + schedule_interval is passed
    The start_date doesn’t mean the start_date you put in the default_args, In fact, it doesn’t mean any start_date, when the schedule interval is cron job.
    It means the normalized-adjusted-and-resolved start_date that you give.

  • Will a DAG Run be started?
    If we want to make sure a DAG Run started in a specific day(2020-08-14). When we think about airflow scheduler is runing for that day from (08-14 00:00:00 to 08-14 23:59:59), the start_date resolved from days_ago(2) is actually fixed (2020-08-12 00:00:00). It makes things easier to make sure a DAG Run triggered.
    The start_date

The simple rules is to setup the number in days_ago(number_of_days) the same as or larger than your interval in your cron. e.g. if cron is saying every 2 days, then start_date is days_ago(2).

  • More
    Once a DAG Run is triggered, the start date is not that important anymore. The sub sequential run will be calculated from previous DAG Run execution date, which is already normalized and fixed date.