We discussed so far:
This is part of our airflow series, we will go into details how you can pass data between different dags in case you want to seperate them a little bit better. When you prefer the video, you can jump to youtube using the following thumbnail:
or this link: https://youtu.be/U43Pwk_10k4
We will be discussing following topics:
By using custom operators, you can extend the existing operators to do something special for your use case. Putting the code into an custom operator, allow you to reuse this code fragment on different dags.
Compared to our previous posts, we need to add another mount point to our docker command, that allows us to add custom operators:
1 2 3 4 5 6 7 8 | mkdir -p ~/airflow-test/dags mkdir -p ~/airflow-test/plugins cd ~/airflow-test docker run -d \ --mount type=bind,src=${PWD}/dags,dst=/usr/local/airflow/dags \ --mount type=bind,src=${PWD}/plugins,dst=/usr/local/airflow/plugins \ -e LOAD_EX=n \ -p 8080:8080 puckel/docker-airflow webserver |
I assume you (still) have your setup up and running, otherwise please revisit the introduction blogpost here,
As we initially implemented this for our airflow migration, their was no build-in possibility to execute a different dag from a existing one. We got the requirement, as the entire dag was split already. We wanted to leave the structure as is, with minimal adjustment. Our all friend stack overflow finally had the required hint - that I want to share with you here. You can call a dag by using the apache airflow cli as well. So, when you want to call a dag from a different dag, you can call it via the apache airflow cli.
This approach have two downsides:
We need to define two custom operators. The first one will be TriggerDagOperator and will pass some data to a different dag that we need to name. The second operator will be FetchDagOperator that can consume the passed data from the different dag and allow the other tasks to read it.
Let us create a new file and place it under ~/airflow-test/plugins/cross_dag.py with following content:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | from airflow.models import BaseOperator from airflow.operators.bash_operator import BashOperator from airflow.utils.decorators import apply_defaults class TriggerDagOperator(BaseOperator): @apply_defaults def __init__(self, trigger_dag_id: str, data: dict, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.trigger_dag_id = trigger_dag_id self.data = data self.ui_color = '#ff0' # type str def execute(self, context): conf_str = json.dumps(self.data) command = f"airflow trigger_dag --conf '{conf_str}' {self.trigger_dag_id}" operator = BashOperator( task_id=f'execute_{self.trigger_dag_id}', bash_command=command ) operator.execute(context) |
We will go over the code in detail. First we add the BaseOperator class from the models package. This is the base class you need to extend from for implementing custom operators. We add the BashOperator again, as we use it to call the apache airflow cli command. And finally we add a decorator that provide us with default values.
After creating the class and apply the decorator, we add the self reference for the instance. The next argument is the trigger_dag_id dag we want to trigger. by data you can pass additional data to the called dag as well. We use it to pass run ids around - they need to be consistent in all dags that we call for a particular pipeline. The last two arguments are default argument that we just puss to the default constructor of the class we inherited of (Line 8). The following two lines save the passed variables to the instance and set a color for the task in the UI.
This is the method you have to implement for your dag to work. Line 14 will convert the dict you provided as argument to the constructor to a string. Its required for the cli call to be a string. Airflow will convert it back to a dict. Line 15 build the bash command we want to call. Line 16 - 20 will create a task from the BashOperator and call the task explicitly using the execute method that is implemented in the operator.
The second operator will read the provided config. Please add this code to ~/airflow-test/plugins/cross_dag.py as well.
1 2 3 4 5 6 7 8 9 10 | class FetchDagOperator(BaseOperator): @apply_defaults def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.ui_color = '#ff0' # type str def execute(self, context): config = context['dag_run'].conf self.do_xcom_push = True return config |
Beside the custom color we are setting (to a different one compared to TriggerDagOperator, nothing new is happening here.
The config we passed can be accessed by the .conf value from the dag_run key inside the provided context. We make sure that we want to push the result back to airflow and finally return the read configuration.
In this section, we will use the newly created custom operators. First we will be using the TriggerDagOperator in our dag and than the FetchDagOperator
Save this as ~/airflow-test/dags/trigger_dag.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | import pendulum import shutil from datetime import datetime from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from cross_dag import TriggerDagOperator args = { 'start_date': datetime(2021, 1, 23, tzinfo=pendulum.timezone("Europe/Berlin")), 'email': ['team@example.com'], 'email_on_failure': False } dag = DAG( dag_id='trigger_dag', default_args={'start_date': datetime(2021,1,23)}, schedule_interval=None, max_active_runs=1, catchup=False ) with dag: trigger_dag = TriggerDagOperator( trigger_dag_id="fetch_dag", data={'some_data': True}, task_id="trigger_dag", ) |
By default, airflow will not only watch for changes that are inside dag directory, but on the plugin directory as well. This is the place where we should place custom operators. Because they are add to the python path, its enough to add Line 7 to import the operators that we need.
After we created the dag instance with the signature you are familiar with, we can use the with dag resource notation the omit the dag=dag line inside the operator. Than we create the Operator and provide the first two custom arguments to it: First is the target of this call the "fetch_dag" - dag that we will create just in a moment. To demonstrate the passing of data, I added a dict with example data that will be passed over to the dag fetch_dag . Make sure that this data can be converted to json. If not, consider using paths instead of object to pass the required information.
This dag will be using the FetchDagOperator that we just created and render our example data.
Save this as ~/airflow-test/dags/fetch_dag.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | import pendulum import shutil from datetime import datetime from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from cross_dag import FetchDagOperator def read_config(**kwargs): task_instance = kwargs['ti'] return task_instance.xcom_pull(task_ids='fetch_dag') args = { 'start_date': datetime(2021, 1, 23, tzinfo=pendulum.timezone("Europe/Berlin")), 'email': ['team@example.com'], 'email_on_failure': False } dag = DAG( dag_id='fetch_dag', default_args={'start_date': datetime(2021,1,23)}, schedule_interval=None, max_active_runs=1, catchup=False ) with dag: fetch_dag = FetchDagOperator(task_id="fetch_dag") read_config = PythonOperator( task_id = 'read_config', python_callable=read_config, provide_context=True ) fetch_dag >> read_config |
From Line 9 to 11 I created a small python function that allow us to read the config back from fetch_dag Operator.
Finally we will use our FetchDagOperator to fetch the data provided to use from the different dag (Line 31). We than use a PythonOperator to read the provided config. In the logs you can see that the data is passed correctly.
Reload the Airflow UI after you created all required files and test that the implementation works as expected. Thank you for reading.