We discussed so far:
This is the third post about Coding dags and operators using Apache Airflow. In case you want to have it more interactive, jump to youtube using the following thumbnail:
or this link: https://youtu.be/JEM1EuEJiZQ
As a next demonstration, we will be using the XCom feature of airflow to injects results of other tasks into a new task. I assume you (still) have your setup up and running, otherwise please revisit the previous blogpost here,
We want to create a small dag that do a backup of our dags that are present in our airflow installation. The general approach is:
Let us create a new file ~/airflow-test/dags/xcom.py` with following content, that matches this requirements:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | import pendulum import shutil from datetime import datetime from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator def backup_by_python_callable(**kwargs): task_instance = kwargs['ti'] paths = task_instance.xcom_pull(task_ids='list_dags') for path in paths.split(" "): source = f"/usr/local/airflow/dags/{path}" target = f"/usr/local/airflow/backup/{path}" print(f"{source} -> {target}") shutil.copy(source, target) args = { 'start_date': datetime(2021, 1, 23, tzinfo=pendulum.timezone("Europe/Berlin")), 'email': ['team@example.com'], 'email_on_failure': False } dag = DAG( dag_id='xcoms', default_args={'start_date': datetime(2021,1,23)}, schedule_interval=None, max_active_runs=1, catchup=False ) list_dags = BashOperator( task_id = 'list_dags', bash_command='ls /usr/local/airflow/dags | grep -v __py | xargs', dag=dag, xcom_push=True ) create_backup_directory = BashOperator( task_id = 'create_backup_directory', bash_command='mkdir -p /usr/local/airflow/backup', dag=dag ) backup_dags = BashOperator( task_id = 'backup_dags', bash_command = "echo '{{task_instance.xcom_pull(task_ids='list_dags')}}' ", dag=dag ) backup_by_python = PythonOperator( task_id = 'backup_by_python', python_callable=backup_by_python_callable, provide_context=True, dag=dag ) [create_backup_directory, list_dags] >> backup_dags >> backup_by_python |
We will now go into every part in detail:
1 2 3 4 5 6 | import pendulum import shutil from datetime import datetime from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator |
In addition to the previous import statements, we add the PythonOperator import as well, as we will be using in later on.
1 2 3 4 5 6 | args = { 'start_date': datetime(2021, 1, 23, tzinfo=pendulum.timezone("Europe/Berlin")), 'email': ['team@example.com'], 'email_on_failure': False } |
We are using the same default arguments as previously.
1 2 3 4 5 6 7 8 | def backup_by_python_callable(**kwargs): task_instance = kwargs['ti'] paths = task_instance.xcom_pull(task_ids='list_dags') for path in paths.split(" "): source = f"/usr/local/airflow/dags/{path}" target = f"/usr/local/airflow/backup/{path}" print(f"{source} -> {target}") shutil.copy(source, target) |
This python function will be called by the PythonOperator that is defined below. First we extract the task_instance from the provided kwargs/context variable. Using the task instance, we can use xcom_pull and provide the task that we want to extract data from. In our case, this will be a space separated list of path we want to copy from a source directory to a hardcoded target directory. As the files are split by space, we use the .split(" ") function, defined on str to have each element in the path variable. With this value, we can build up the total source and target paths. The print statement helps to see in the logs of airflow what is happening - you can remove it when you don't want to see it. Finally the shutil.copy command will do the real copy.
1 2 3 4 5 6 7 | dag = DAG( dag_id='xcoms', default_args={'start_date': datetime(2021,1,23)}, schedule_interval=None, max_active_runs=1, catchup=False ) |
We are using the same initialization code as before - make sure you use a different dag_id, otherwise you may not see your newly created dag.
1 2 3 4 5 6 | list_dags = BashOperator( task_id = 'list_dags', bash_command='ls /usr/local/airflow/dags | grep -v __py | xargs', dag=dag, xcom_push=True ) |
This snippet will do our first requirement: extract every dag, filter out the pycache directory. To work with this list, we need to pipe them by xargs - otherwise only the last element of the ls command whould be propagated back to airflow. All other files would be gone. A second important aspect is xcom_push=True - without this setting, the result of our bash command will not be pushed back to airflow and just thrown away. In our case, we want to use them later on.
1 2 3 4 5 | create_backup_directory = BashOperator( task_id = 'create_backup_directory', bash_command='mkdir -p /usr/local/airflow/backup', dag=dag ) |
This task will create our backup directory, using another BashOperator. I use the -p here to make sure that this bash command will not break in case the directory is present already.
1 2 3 4 5 | backup_dags = BashOperator( task_id = 'backup_dags', bash_command = "echo '{{task_instance.xcom_pull(task_ids='list_dags')}}' ", dag=dag ) |
In my initial preparation for this blog post, I just had one file to copy. It way easy to pass this data into a new task using the provided code snippet. But adding a second file to it, make it not work any longer as intended.
I just left this task here for demonstration purpose: Use can pass the content of a task using the jinja template notation and than add the task_instance.xcom_pull call to it, quite similar like we did in the python callable.
1 2 3 4 5 6 | backup_by_python = PythonOperator( task_id = 'backup_by_python', python_callable=backup_by_python_callable, provide_context=True, dag=dag ) |
This task will execute the provided python code and do the backup that we want. Its important to set the provide_context=True to make sure that the python function that we are using is able to fetch the task instance and the related xcom data.
1 | [create_backup_directory, list_dags] >> backup_dags >> backup_by_python |
This is the last line of our dag. It defined the dependencies that the tasks have to each other. First, we open up a python list - telling airflow that all this tasks are independent of each other and can run in parallel. After both finish, the backup_dags task will be executed and than finally the backup_by_python task. Checkout the logs to see whats happening. You can remove the backup_dags task as well if you don't want to use it and see what will happen in the airflow UI.
Thanks for reading and see you next time.