When you have to do something more than once, automate it.
In this article, we are going to give an introduction to Airflow and how we could leverage its features in a Python factory to automatically build DAG files (python artifacts) just by setting configuration parameters in a YAML configuration files. This article will be followed by another article that will deep dive into the DAG factory we have built at Astrafy and open sourced on a Gitlab repository.
Apache Airflow, developed by Airbnb is a workflow management platform open-source that manages Direct Acyclic Graphs (DAGs) and their associated tasks. In other words, It is an orchestrated framework to programmatically schedule and monitor workflows and data processing pipelines using Python as the programming language.
What is a workflow?
A workflow is a concatenation and combination of actions, calculations, commands, python scripts, SQL queries to achieve a final object.
A workflow is divided into one or multiple tasks, which combined with the respective dependencies, form a DAG (i.e. Direct Acyclic Graph).
What is a DAG?
A DAG represents the workflow which is this collection of tasks that execute each task in a specific order.
What is a Task?
Tasks determine what actually gets done in each phase of the workflow within the DAG. Each task is represented by an operator. Operators are most of the time atomic, meaning that they can stand on their own. However there is the possibility to share information between Tasks, if needed, using the Airflow operator cross-communication called XCom.
So in simple words: operators are pre-defined templates used to build most of the Tasks as you can find in the Airflow Documentation.
With Airflow 2.0 the concept of TaskGroup has been introduced where tasks can be grouped together to have a better representation through the UI and to optimise the dependency-handling between tasks. TaskGroups are purely a UI grouping concept. Tasks in TaskGroups live on the same original DAG, and honour all the DAG settings and pool configurations. More details here.
When you start to use airflow and to install locally or in the cloud you will have a structure similar to this one:
│ └── dag_hello_world.py
Where the core is the airflow.cfg which contains all the configuration about the Airflow instance and the DAG folder.
The DAG folder will contain all the dag python scripts that the Airflow server will render in the UI and that the Airflow scheduler will schedule on the UI (either via a cron job or manually). All your Airflow DAGs must reside in this folder and same concept applies for instance with Cloud Composer (fully managed version of Airlfow on Google Cloud) where it expects all the Airlfow DAGs to be a in a specific Cloud Storage bucket.
In order to be able to use airflow you need to have airflow installed and the DAG python files that you want to execute. To facilitate the generation of those python DAGs, we have developed a DAG factory at Astrafy. The goal of this factory is as follows:
Build DAG Python file from a simple YAML configuration file
We did not want to reinvent the wheel and we took quite some inspiration from this open-source codebase (https://github.com/ajbosco/dag-factory) to develop this factory. The factory is composed as follows:
│ ├── README.md
│ ├── dag_factory
│ │ ├── functions
│ │ | ├── hello.py
│ │ ├── __init__.py
│ │ ├── create_dag.py
│ │ ├── custom_operators.py
│ │ ├── dag.template
│ │ ├── dagbuilder.py
│ │ ├── dagfactory.py
│ │ └── utils.py
│ ├── dags_configuration
│ │ ├── configuration_dag1.yaml
│ │ ├── configuration_dag2.yaml
│ ├── requirements.txt
│ ├── setup.py
In the dags_configuration folder, as the name suggests, resides all the configuration YAML files that will be translated in DAG python script from the dag_factory.
An example of configuration file:
Where task_2, task_3 and task_4 are belonging to the same task_group: task_group1.
The magic happens in the dag_factory folder; in particular the create_dag python script which expects a list of YAML files as parameters to be loaded, processed and translated in a DAG.
In the case of the previous yaml file, the factory will generate a python DAG file as follows:
This DAG will then be rendered as follows by the Airflow UI:
If you compare the previous yaml with the ones in ajbosco/dag-factory repository, it looks the same. So what is the catch ?
We actually revamped and refactored all the code. The ajbosco/dag-factory is using the concept of global scope in order to build the DAG (more information here):
Python has a function called globals() that returns variables you can set and get values just like if there were in a dictionary. After setting the DAG code within this “globals” variable, Airflow will load every valid DAG that resides within this dictionary.
This option is working fine but with the drawback you do not see the python file in your dag folder. The DAGs are generated dynamically and you therefore have less visibility on the code.
We decided to go with another approach by using a jinja template file (“dag.template”) in order to generate the DAG python file directly. The main benefit is that you can immediately see the python file generated and explore it.
In addition to that we have expanded the code to integrate more airflow operators to exploit the TaskGroup feature and last but certainly not least to integrate dbt as per the original two articles from astronomer in the “Building a Scalable Analytics Architecture With Airflow and dbt” serie (article 1 and article 2).
At Astrafy we are using dbt for all our analytical transformations and it was a no-brainer for us to integrate it within this Airflow DAG factory.
This integration can be used in two different flavours:
- Compact: define the Operator running the dbt command for each dbt group of models that you want to run (BashOperator/KubernetesPodOperator). See here for a good discussion about this topic. We recommend using “Kubernetes Pod Operator” to separate the execution of dbt from the scheduling. A good practice is also to not run your entire dbt project within one task. You can for instance split your different dbt runs into “staging”, “vault central” and “datamart”. You can find more details in this article written by my colleague Charles.
- Explosion: based on the layout defined by Astronomer (with the difference to exploit the dbt tag feature instead of selector and the airflow task groups) we explode every single dbt models from the manifest.json file. What does this mean? Each dbt model is translated into an Airflow BigQuery Operator and it is grouped using a dedicated Task Group for all the dbt models. Moreover all the queries dependency will be automatically handled within the factory.
Considering the following configuration:
It will be translated in the following TaskGroup in the Airflow UI:
Then clicking on this TaskGroup will open up all the different dbt tasks. Hereafter one stage represents one dbt model and dependencies are taken from the dbt manifest.json file.
Which one should you choose ? It really depends on your use case and how powerful is your Airflow instance.
The compact mode will have an Airflow UI cleaner and simpler to visualise but it will be more difficult to get the logs of which dbt model/stage is running/failing. On the other hand the explosion representation could be trickier and difficult to read (in case you have more than 50 models running in a single DAG), but each dbt stage/model will be represented as an Airflow Task; this makes it easier to monitor and troubleshoot in case something goes wrong. You can also instantly spot the bottleneck queries that are then candidates for refactoring (those will have Airflow stages that have a long running time).
It is worth noting that each YAML configuration file will generate two different DAG Python scripts with the following difference:
In Airflow you can run dbt commands using the BashOperator but it implies that you have to install your DBT dependencies in the machine where your Airflow instance is running. This works but we normally recommend to decouple the execution of DBT from the orchestration (ie Airlfow) by running DBT on a separate node pool. This has several benefits such as avoiding potential dependency hell on your Airflow instance and not overwhelming the load on your Airflow instance.
Why not install the dbt libraries in the composer itself ?
The reason is to have easier maintenance of dependencies and libraries in a single image without having the risk to crash Cloud Composer when updating those libraries. In case your Cloud Composer instance is fully private, there is the benefit that your composer won’t need to download external libraries to run your dbt code; those libraries will be on dedicated images that will run on a separate GKE cluster via a GKEStartPodOperator.
Another advantage using GKEStartPodOperator/KubernetesPodOperator within the factory is that we can integrate any other tool to interact with our data pipeline like for example “Great Expectations” (a data quality tool). The same logic applies as for dbt; you dockerize “Great Expectations” within an image and then run this image as a task within Airflow.
Within the factory there is the possibility to use different operators like PubSubPublishMessageOperator, DataflowOperator and so on in order to handle your ingestion from the start till the end. You can read more on this series of articles about a holistic approach on how to get an end-to-end data pipeline that integrates ingestion till distribution of data with a granular and data mesh approach.
The main benefits of using the Airflow Dag Factory could be summarised as follows:
- Easy to use and to build data ETL pipeline without knowing Airflow primitives and Python. What you need to do is just configure a YAML file and using the factory you will learn the technology.
- Easy integration within your CI/CD pipeline in order to be under version control and to give you the possibility to deploy your code till production.
Now we have defined and build a fully automated factory to generate DAG with full integration but there are still a few open points:
- How do we test all of this?
- How can we version the different changes to run in different environments?
- Could we create a python package?
Stay tuned and we will answer these questions into a future article where we will also open source the codebase of this DAG factory.
If you are looking for some support on your Airflow or dbt implementation, feel free to reach out to us at email@example.com .
You can also check out our website at https://www.astrafy.io
 Airflow documentation: https://airflow.apache.org/docs/apache-airflow/stable/index.html
 Dag-Facotry structure: https://github.com/ajbosco/dag-factory
 DBT documentation: https://docs.getdbt.com/docs
 DBT-Airflow integration: https://www.astronomer.io/blog/airflow-dbt-1/
 DBT-Airflow integration: https://www.astronomer.io/blog/airflow-dbt-2
 Cloud Composer: https://cloud.google.com/composer