How Often are DAG Definition Files Parsed and Processed in Airflow

Our Python source code files that contain the Airflow DAG definitions are parsed and processed at certain intervals. It can be useful to know the time intervals and conditions around when new and updated DAG files are (re-)processed by Airflow. Before learning about them and how they can be configured, I’d first suggest reading the following articles:

If you read the first article, you will know what the DagFileProcessorManager and DagFileProcessorProcess processes do. The former can either run as a standalone process or as a part of the Scheduler itself. Both of them together are responsible for processing the source files that contain the DAGs.

Scanning dags_folder

Now whenever you update the source files in the dags_folder, Airflow picks up the changes automatically. How does that happen? The DagFileProcessorManager process continuously scans the DAGs folders to prepare a list of Python files (second article linked above) that “may” contain DAGs, at a regular interval specified by the following configuration option (airflow.cfg):

# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 300 # or AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL env

So if you add, modify or deleted files, Airflow will scan or refresh the DAGs folders every dag_dir_list_interval seconds to prepare a list or queue of DAG files that have to be processed.

All the files from this list will be processed immediately the first time, but not so for subsequent runs. In the subsequent runs, a bunch of conditions are evaluated to decide whether a particular file from the list should be parsed or not.

Parsing and Processing Files

The processor manager internally spawns and offloads the job of parsing each file to the DagFileProcessorProcess. As each file is processed (by a separate instance of DagFileProcessorProcess), Airflow maintains a timestamp to denote the finish time of that file’s processing (last_finish_time).

The next time a file list/queue is prepared after dag_dir_list_interval, each file is evaluated against a couple of conditions before spawning a new DagFileProcessorProcess for parsing. For successful parsing, the following conditions must pass:

  • The file must not have last_finish_time stored against it in memory. If it’s not there, then that means this is the first time it will be processed (by DagFileProcessorProcess).
  • If the file does have last_finish_time stored against it in memory, then at least min_file_process_interval seconds must have elapsed since then. The default is 30 seconds which means the file will be processed if at least 30 seconds have elapsed since last_finish_time of this particular file. Do not that lowering it further may increase CPU usage (because files will be processed more often).
  • If file_parsing_sort_mode is set to modified_time (default), then the file’s last modified time must be greater than last_finish_time, i.e., if the file has been modified since its last parsing time, then it will be eligible for parsing.

Only if all the conditions above pass, then the file will be parsed and processed. You can check out the code for yourself.

The two config options discussed above are listed under the [scheduler] section in airflow.cfg:

# Number of seconds after which a DAG file is parsed. The DAG file is parsed every
# ``min_file_process_interval`` number of seconds. Updates to DAGs are reflected after
# this interval. Keeping this number low will increase CPU usage.
min_file_process_interval = 30

# One of ``modified_time``, ``random_seeded_by_host`` and ``alphabetical``.
# The scheduler will list and sort the dag files to decide the parsing order.
# * ``modified_time``: Sort by modified time of the files. This is useful on large scale to parse the
#   recently modified DAGs first.
# * ``random_seeded_by_host``: Sort randomly across multiple Schedulers but with same order on the
#   same host. This is useful when running with Scheduler in HA mode where each scheduler can
#   parse different DAG files.
# * ``alphabetical``: Sort by filename
file_parsing_sort_mode = modified_time

Heavyweight Parsing and Timeouts

Keen observers might think what if the files take too much time to parse and two file processing runs overlap after dag_dir_list_interval. It is possible that a particular file may not have completed processing from the previous run because of some CPU-intensive operation or large modules being imported.

In that case, Airflow intelligently skips all the files that are still processing from the previous run.

But what if a DAG parsing takes so long that it never finishes? Like 24 hours and still going? We can set a timeout for that in the configuration file:

# How long before timing out a DagFileProcessor, which processes a dag file
dag_file_processor_timeout = 50 # or AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT env

After 50s of processing (default), the DagFileProcessorProcess process will be killed, stopping the file parsing.

This should explain why you must ensure that your DAG file(s) parsing is as quick as possible – to avoid processing overlaps, timeouts and unnecessary CPU spikes. Simply run your source file with the python interpreter and check if that happens quickly or not:

$ python

If it takes a long time because of large imports or some serious CPU operations, then make sure you only do them inside DAG and task definitions (runtime) and not outside them (parse time).

Deactivating Stale DAGs

DAGs that are no longer present (deleted) in the source files have to be deactivated and their serialized objects must be deleted from the database. The DagFileProcessorManager automatically takes care of this every deactivate_stale_dags_interval seconds (default is 60) since the last time it had done the same.

# How often (in seconds) to check for stale DAGs (DAGs which are no longer present in
# the expected files) which should be deactivated.
deactivate_stale_dags_interval = 60 # or AIRFLOW__SCHEDULER__DEACTIVATE_STALE_DAGS_INTERVAL env

I hope this article gave you some understanding of the file parsing internals, how often they’re parsed and what configuration options can help you tune them.

Leave a Reply

Your email address will not be published. Required fields are marked *