Cross Task Communication using XComs

Cross Task Communication using XComs

Image source: Self made

Summary

We discussed so far:

This is the third post about Coding dags and operators using Apache Airflow. In case you want to have it more interactive, jump to youtube using the following thumbnail:

or this link: https://youtu.be/JEM1EuEJiZQ

Hello World with XComs

As a next demonstration, we will be using the XCom feature of airflow to injects results of other tasks into a new task. I assume you (still) have your setup up and running, otherwise please revisit the previous blogpost here,

We want to create a small dag that do a backup of our dags that are present in our airflow installation. The general approach is:

  • List all dags
  • Filter out the python cache directory
  • create a backup directory
  • Copy all files to the backuo directory

Let us create a new file ~/airflow-test/dags/xcom.py` with following content, that matches this requirements:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import pendulum
import shutil
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

def backup_by_python_callable(**kwargs):
    task_instance = kwargs['ti']
    paths = task_instance.xcom_pull(task_ids='list_dags')
    for path in paths.split(" "):
        source = f"/usr/local/airflow/dags/{path}"
        target = f"/usr/local/airflow/backup/{path}"
        print(f"{source} -> {target}")
        shutil.copy(source, target)



args = {
    'start_date': datetime(2021, 1, 23,
        tzinfo=pendulum.timezone("Europe/Berlin")),
    'email': ['team@example.com'],
    'email_on_failure': False
}


dag = DAG(
    dag_id='xcoms',
    default_args={'start_date': datetime(2021,1,23)},
    schedule_interval=None,
    max_active_runs=1,
    catchup=False
)

list_dags = BashOperator(
    task_id = 'list_dags',
    bash_command='ls /usr/local/airflow/dags  | grep -v __py | xargs',
    dag=dag,
    xcom_push=True
)

create_backup_directory = BashOperator(
    task_id = 'create_backup_directory',
    bash_command='mkdir -p /usr/local/airflow/backup',
    dag=dag
)

backup_dags = BashOperator(
    task_id = 'backup_dags',
    bash_command = "echo '{{task_instance.xcom_pull(task_ids='list_dags')}}' ",
    dag=dag
)

backup_by_python = PythonOperator(
    task_id = 'backup_by_python',
    python_callable=backup_by_python_callable,
    provide_context=True,
    dag=dag
)

[create_backup_directory, list_dags] >> backup_dags >> backup_by_python

We will now go into every part in detail:

Import statements

1
2
3
4
5
6
import pendulum
import shutil
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

In addition to the previous import statements, we add the PythonOperator import as well, as we will be using in later on.

Default Arguments

1
2
3
4
5
6
args = {
    'start_date': datetime(2021, 1, 23,
        tzinfo=pendulum.timezone("Europe/Berlin")),
    'email': ['team@example.com'],
    'email_on_failure': False
}

We are using the same default arguments as previously.

Backup by Python function

1
2
3
4
5
6
7
8
def backup_by_python_callable(**kwargs):
    task_instance = kwargs['ti']
    paths = task_instance.xcom_pull(task_ids='list_dags')
    for path in paths.split(" "):
        source = f"/usr/local/airflow/dags/{path}"
        target = f"/usr/local/airflow/backup/{path}"
        print(f"{source} -> {target}")
        shutil.copy(source, target)

This python function will be called by the PythonOperator that is defined below. First we extract the task_instance from the provided kwargs/context variable. Using the task instance, we can use xcom_pull and provide the task that we want to extract data from. In our case, this will be a space separated list of path we want to copy from a source directory to a hardcoded target directory. As the files are split by space, we use the .split(" ") function, defined on str to have each element in the path variable. With this value, we can build up the total source and target paths. The print statement helps to see in the logs of airflow what is happening - you can remove it when you don't want to see it. Finally the shutil.copy command will do the real copy.

Dag instantiation

1
2
3
4
5
6
7
dag = DAG(
        dag_id='xcoms',
        default_args={'start_date': datetime(2021,1,23)},
        schedule_interval=None,
        max_active_runs=1,
        catchup=False
    )

We are using the same initialization code as before - make sure you use a different dag_id, otherwise you may not see your newly created dag.

List dags

1
2
3
4
5
6
list_dags = BashOperator(
    task_id = 'list_dags',
    bash_command='ls /usr/local/airflow/dags  | grep -v __py | xargs',
    dag=dag,
    xcom_push=True
)

This snippet will do our first requirement: extract every dag, filter out the pycache directory. To work with this list, we need to pipe them by xargs - otherwise only the last element of the ls command whould be propagated back to airflow. All other files would be gone. A second important aspect is xcom_push=True - without this setting, the result of our bash command will not be pushed back to airflow and just thrown away. In our case, we want to use them later on.

Create Backup directory

1
2
3
4
5
create_backup_directory = BashOperator(
    task_id = 'create_backup_directory',
    bash_command='mkdir -p /usr/local/airflow/backup',
    dag=dag
)

This task will create our backup directory, using another BashOperator. I use the -p here to make sure that this bash command will not break in case the directory is present already.

Bash Copy

1
2
3
4
5
backup_dags = BashOperator(
    task_id = 'backup_dags',
    bash_command = "echo '{{task_instance.xcom_pull(task_ids='list_dags')}}' ",
    dag=dag
)

In my initial preparation for this blog post, I just had one file to copy. It way easy to pass this data into a new task using the provided code snippet. But adding a second file to it, make it not work any longer as intended.

I just left this task here for demonstration purpose: Use can pass the content of a task using the jinja template notation and than add the task_instance.xcom_pull call to it, quite similar like we did in the python callable.

Backup using Python Operator

1
2
3
4
5
6
backup_by_python = PythonOperator(
    task_id = 'backup_by_python',
    python_callable=backup_by_python_callable,
    provide_context=True,
    dag=dag
)

This task will execute the provided python code and do the backup that we want. Its important to set the provide_context=True to make sure that the python function that we are using is able to fetch the task instance and the related xcom data.

Glue it together

1
[create_backup_directory, list_dags] >> backup_dags >> backup_by_python

This is the last line of our dag. It defined the dependencies that the tasks have to each other. First, we open up a python list - telling airflow that all this tasks are independent of each other and can run in parallel. After both finish, the backup_dags task will be executed and than finally the backup_by_python task. Checkout the logs to see whats happening. You can remove the backup_dags task as well if you don't want to use it and see what will happen in the airflow UI.

Thanks for reading and see you next time.