Cross Dag Communication

Cross Dag Communication

Image source: Self made

Summary

We discussed so far:

This is part of our airflow series, we will go into details how you can pass data between different dags in case you want to seperate them a little bit better. When you prefer the video, you can jump to youtube using the following thumbnail:

or this link: https://youtu.be/U43Pwk_10k4

We will be discussing following topics:

  • Custom Operators
  • Cross Dag communication
  • Passing data between dags

Create Custom Operators

By using custom operators, you can extend the existing operators to do something special for your use case. Putting the code into an custom operator, allow you to reuse this code fragment on different dags.

Docker command

Compared to our previous posts, we need to add another mount point to our docker command, that allows us to add custom operators:

1
2
3
4
5
6
7
8
mkdir -p ~/airflow-test/dags
mkdir -p ~/airflow-test/plugins
cd ~/airflow-test
docker run -d \
--mount type=bind,src=${PWD}/dags,dst=/usr/local/airflow/dags \
--mount type=bind,src=${PWD}/plugins,dst=/usr/local/airflow/plugins \
-e LOAD_EX=n \
-p 8080:8080 puckel/docker-airflow webserver

I assume you (still) have your setup up and running, otherwise please revisit the introduction blogpost here,

How to trigger a different dag with passed data

As we initially implemented this for our airflow migration, their was no build-in possibility to execute a different dag from a existing one. We got the requirement, as the entire dag was split already. We wanted to leave the structure as is, with minimal adjustment. Our all friend stack overflow finally had the required hint - that I want to share with you here. You can call a dag by using the apache airflow cli as well. So, when you want to call a dag from a different dag, you can call it via the apache airflow cli.

This approach have two downsides:

  • In the airflow UI the dag was triggered "manually", even if you triggered it via the airflow cli.
  • The processing is done async. When you want to make sure everything was done before, the call to the other dag have to be the last task in your dag.

We need to define two custom operators. The first one will be TriggerDagOperator and will pass some data to a different dag that we need to name. The second operator will be FetchDagOperator that can consume the passed data from the different dag and allow the other tasks to read it.

TriggerDagOperator

Let us create a new file and place it under ~/airflow-test/plugins/cross_dag.py with following content:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from airflow.models import BaseOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.decorators import apply_defaults

class TriggerDagOperator(BaseOperator):
    @apply_defaults
    def __init__(self, trigger_dag_id: str, data: dict, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.trigger_dag_id = trigger_dag_id
        self.data = data
        self.ui_color = '#ff0'  # type str

    def execute(self, context):
        conf_str = json.dumps(self.data)
        command = f"airflow trigger_dag --conf '{conf_str}' {self.trigger_dag_id}"
        operator = BashOperator(
            task_id=f'execute_{self.trigger_dag_id}',
            bash_command=command
        )
        operator.execute(context)

Imports

We will go over the code in detail. First we add the BaseOperator class from the models package. This is the base class you need to extend from for implementing custom operators. We add the BashOperator again, as we use it to call the apache airflow cli command. And finally we add a decorator that provide us with default values.

__init__

After creating the class and apply the decorator, we add the self reference for the instance. The next argument is the trigger_dag_id dag we want to trigger. by data you can pass additional data to the called dag as well. We use it to pass run ids around - they need to be consistent in all dags that we call for a particular pipeline. The last two arguments are default argument that we just puss to the default constructor of the class we inherited of (Line 8). The following two lines save the passed variables to the instance and set a color for the task in the UI.

execute

This is the method you have to implement for your dag to work. Line 14 will convert the dict you provided as argument to the constructor to a string. Its required for the cli call to be a string. Airflow will convert it back to a dict. Line 15 build the bash command we want to call. Line 16 - 20 will create a task from the BashOperator and call the task explicitly using the execute method that is implemented in the operator.

FetchDagOperator

The second operator will read the provided config. Please add this code to ~/airflow-test/plugins/cross_dag.py as well.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class FetchDagOperator(BaseOperator):
    @apply_defaults
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.ui_color = '#ff0'  # type str

    def execute(self, context):
        config = context['dag_run'].conf
        self.do_xcom_push = True
        return config

__init__

Beside the custom color we are setting (to a different one compared to TriggerDagOperator, nothing new is happening here.

execute

The config we passed can be accessed by the .conf value from the dag_run key inside the provided context. We make sure that we want to push the result back to airflow and finally return the read configuration.

Using Custom Operators

In this section, we will use the newly created custom operators. First we will be using the TriggerDagOperator in our dag and than the FetchDagOperator

Using TriggerDagOperator

Save this as ~/airflow-test/dags/trigger_dag.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import pendulum
import shutil
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from cross_dag import TriggerDagOperator

args = {
    'start_date': datetime(2021, 1, 23,
        tzinfo=pendulum.timezone("Europe/Berlin")),
    'email': ['team@example.com'],
    'email_on_failure': False
}


dag = DAG(
    dag_id='trigger_dag',
    default_args={'start_date': datetime(2021,1,23)},
    schedule_interval=None,
    max_active_runs=1,
    catchup=False
)

with dag:
    trigger_dag = TriggerDagOperator(
        trigger_dag_id="fetch_dag",
        data={'some_data': True},
        task_id="trigger_dag",
    )

Imports

By default, airflow will not only watch for changes that are inside dag directory, but on the plugin directory as well. This is the place where we should place custom operators. Because they are add to the python path, its enough to add Line 7 to import the operators that we need.

use the operator

After we created the dag instance with the signature you are familiar with, we can use the with dag resource notation the omit the dag=dag line inside the operator. Than we create the Operator and provide the first two custom arguments to it: First is the target of this call the "fetch_dag" - dag that we will create just in a moment. To demonstrate the passing of data, I added a dict with example data that will be passed over to the dag fetch_dag . Make sure that this data can be converted to json. If not, consider using paths instead of object to pass the required information.

Using FetchDagOperator

This dag will be using the FetchDagOperator that we just created and render our example data.

Save this as ~/airflow-test/dags/fetch_dag.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import pendulum
import shutil
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from cross_dag import FetchDagOperator

def read_config(**kwargs):
    task_instance = kwargs['ti']
    return task_instance.xcom_pull(task_ids='fetch_dag')


args = {
    'start_date': datetime(2021, 1, 23,
        tzinfo=pendulum.timezone("Europe/Berlin")),
    'email': ['team@example.com'],
    'email_on_failure': False
}


dag = DAG(
    dag_id='fetch_dag',
    default_args={'start_date': datetime(2021,1,23)},
    schedule_interval=None,
    max_active_runs=1,
    catchup=False
)

with dag:
    fetch_dag = FetchDagOperator(task_id="fetch_dag")
    read_config = PythonOperator(
        task_id = 'read_config',
        python_callable=read_config,
        provide_context=True
    )

fetch_dag >> read_config

read_config

From Line 9 to 11 I created a small python function that allow us to read the config back from fetch_dag Operator.

with dag

Finally we will use our FetchDagOperator to fetch the data provided to use from the different dag (Line 31). We than use a PythonOperator to read the provided config. In the logs you can see that the data is passed correctly.

Play

Reload the Airflow UI after you created all required files and test that the implementation works as expected. Thank you for reading.