Our Python source code files that contain the Airflow DAG definitions are parsed and processed at certain intervals. It can be useful to know the time intervals and conditions around when new and updated DAG files are (re-)processed by Airflow. Before learning about them and how they can be configured, I’d first suggest reading the following articles:
If you read the first article, you will know what the
DagFileProcessorProcess processes do. The former can either run as a standalone process or as a part of the Scheduler itself. Both of them together are responsible for processing the source files that contain the DAGs.
Now whenever you update the source files in the
dags_folder, Airflow picks up the changes automatically. How does that happen? The
DagFileProcessorManager process continuously scans the DAGs folders to prepare a list of Python files (second article linked above) that “may” contain DAGs, at a regular interval specified by the following configuration option (
[scheduler] # How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes. dag_dir_list_interval = 300 # or AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL env
So if you add, modify or deleted files, Airflow will scan or refresh the DAGs folders every
dag_dir_list_interval seconds to prepare a list or queue of DAG files that have to be processed.
All the files from this list will be processed immediately the first time, but not so for subsequent runs. In the subsequent runs, a bunch of conditions are evaluated to decide whether a particular file from the list should be parsed or not.
Parsing and Processing Files
The processor manager internally spawns and offloads the job of parsing each file to the
DagFileProcessorProcess. As each file is processed (by a separate instance of
DagFileProcessorProcess), Airflow maintains a timestamp to denote the finish time of that file’s processing (
The next time a file list/queue is prepared after
dag_dir_list_interval, each file is evaluated against a couple of conditions before spawning a new
DagFileProcessorProcess for parsing. For successful parsing, the following conditions must pass:
- The file must not have
last_finish_timestored against it in memory. If it’s not there, then that means this is the first time it will be processed (by
- If the file does have
last_finish_timestored against it in memory, then at least
min_file_process_intervalseconds must have elapsed since then. The default is
30seconds which means the file will be processed if at least
30seconds have elapsed since
last_finish_timeof this particular file. Do not that lowering it further may increase CPU usage (because files will be processed more often).
file_parsing_sort_modeis set to
modified_time(default), then the file’s last modified time must be greater than
last_finish_time, i.e., if the file has been modified since its last parsing time, then it will be eligible for parsing.
Only if all the conditions above pass, then the file will be parsed and processed. You can check out the code for yourself.
The two config options discussed above are listed under the
[scheduler] section in
[scheduler] # Number of seconds after which a DAG file is parsed. The DAG file is parsed every # ``min_file_process_interval`` number of seconds. Updates to DAGs are reflected after # this interval. Keeping this number low will increase CPU usage. min_file_process_interval = 30 # One of ``modified_time``, ``random_seeded_by_host`` and ``alphabetical``. # The scheduler will list and sort the dag files to decide the parsing order. # # * ``modified_time``: Sort by modified time of the files. This is useful on large scale to parse the # recently modified DAGs first. # * ``random_seeded_by_host``: Sort randomly across multiple Schedulers but with same order on the # same host. This is useful when running with Scheduler in HA mode where each scheduler can # parse different DAG files. # * ``alphabetical``: Sort by filename file_parsing_sort_mode = modified_time
Heavyweight Parsing and Timeouts
Keen observers might think what if the files take too much time to parse and two file processing runs overlap after
dag_dir_list_interval. It is possible that a particular file may not have completed processing from the previous run because of some CPU-intensive operation or large modules being imported.
In that case, Airflow intelligently skips all the files that are still processing from the previous run.
But what if a DAG parsing takes so long that it never finishes? Like 24 hours and still going? We can set a timeout for that in the configuration file:
[core] # How long before timing out a DagFileProcessor, which processes a dag file dag_file_processor_timeout = 50 # or AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT env
50s of processing (default), the
DagFileProcessorProcess process will be killed, stopping the file parsing.
This should explain why you must ensure that your DAG file(s) parsing is as quick as possible – to avoid processing overlaps, timeouts and unnecessary CPU spikes. Simply run your source file with the
python interpreter and check if that happens quickly or not:
$ python dag_source_file.py
If it takes a long time because of large imports or some serious CPU operations, then make sure you only do them inside DAG and task definitions (runtime) and not outside them (parse time).
Deactivating Stale DAGs
DAGs that are no longer present (deleted) in the source files have to be deactivated and their serialized objects must be deleted from the database. The
DagFileProcessorManager automatically takes care of this every
deactivate_stale_dags_interval seconds (default is
60) since the last time it had done the same.
[scheduler] # How often (in seconds) to check for stale DAGs (DAGs which are no longer present in # the expected files) which should be deactivated. deactivate_stale_dags_interval = 60 # or AIRFLOW__SCHEDULER__DEACTIVATE_STALE_DAGS_INTERVAL env
I hope this article gave you some understanding of the file parsing internals, how often they’re parsed and what configuration options can help you tune them.