Airflow orchestration for big data organisations

Airflow Orchestration is the most powerful platforms used for orchestrating workflows by Data Scientist and Engineers. Airflow was already gaining momentum among the data community going beyond hard-core data engineers.

Airflow maintains the complexity and ensures the system is scalable and performant. In this article series, we will walk you through Airflow overview, Approaches, concepts, objects, and their usage while writing the pipeline.

Airflow Overview

Airflow, an open-source tool for authoring and orchestrating big data workflows. Basically, a platform that can programmatically schedules and monitor workflows. It triggers task execution based on schedule interval and execution time. Think of it as a tool to coordinate work done by other services. While following the specified dependencies, the airflow scheduler executes your tasks on an array of workers.

Today, Airflow is used to solve a variety of data ingestion, preparation and consumption problems. Key problems such as behavioural analytical systems, CRMs, data warehouses, data lakes and BI tools solved by Airflow is Integrating data between disparate systems which are used for deeper analytics and AI. Airflow can also orchestrate complex ML workflows, plus it is designed as a configuration-as-a-code system and heavily customized with plugins.

An Approach- DAG (Directed Acyclic Graph)

In the workflow orchestration space, Airflow has become a go-to framework. It bridges the gap between GUI-based and purely programmatic orchestration. Its pipelines are code (Python) defined and pipeline management is via a GUI has written in a flask. Airflow decouples the processing stages from the orchestration.

A collection of inter-dependent processes is controlled first and foremost by a dependency DAG in which a graph where data flows in only one direction among the connected processes. Any number of arbitrarily complex dependencies can be modelled using the pythonic DAG design API.

As we assume that the user has this background, beyond the basics of configuring the DAGs, no attention has been given to the more detailed configurations of an Airflow DAG, such as starting dates and retries.

The flow we are using in this example is as follows:

– save-bash — print ‘Hello World’ the STDOUT and redirect it to a file called out.txt

– print-file — print the output of out.txt file to STDOUT

– copy-file — copy out.txt to out_copy.txt

– delete-files — delete the out.txt and out_copy.txt files

The workflow can be visualized as a DAG in the given above example.  The rectangle around the diagram represents that the inside one is a self-contained DAG.

SINGLE DAG APPROACH

Apache Airflow can be used to orchestrate the dependent execution of the pipelines when locating a bunch of Spark jobs that are interrelated. These dependencies are naturally expressed as a DAG (Directed Acyclic Graph) with the Airflow Python API.

TRIGGERED DAGS APPROACH

This approach is more advanced, but not overly so. This additional complexity is necessary if:

-You have complicated flows you need to break down.

-There are flows that should not reprocess computationally expensive stages.

-It is generally desirable to decouple the execution and orchestration of a flow.

DAG DEFINITION FILE

The Airflow Python script is just a configuration file specifying the DAG’s structure as code. As mentioned above it is a set of all the tasks you want to run in a workflow, so two core things which this file is handling are the right order and the right time. This contains the schedule level parameters, defined operations, tasks, and arguments.

import os
from airflow import DAG
from datetime import datetime
from airflow.utils.dates import days_ago
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

DAG_ID = os.path.basename(__file__).replace(\".pyc\", \"\").replace(\".py\", \"\")
DAG_OWNER_NAME = \"FACTSPAN\"
SCHEDULE_INTERVAL = None
START_DATE = days_ago(1)

def my_func():
    print(\'Hey Folks!\')
default_args = {
    \'owner\': DAG_OWNER_NAME,
    \'depends_on_past\': False,
    \'start_date\': START_DATE,
    \'email\': [\'******@factspan.com\'],
    \'email_on_failure\':False,
    \'email_on_retry\':False,
    \'retries\':5
}
dag = DAG(DAG_ID, default_args=default_args,
schedule_interval=SCHEDULE_INTERVAL, start_date=START_DATE)

t1=DummyOperator(task_id=\'dummy_task\',retries=3)
t2=PythonOperator(task_id=\'python_task\',python_callable=my_func)
t1>>t2
Operator

While DAGs describes how to run a workflow, the Operator defines an individual task that needs to be performed. A DAG consists of operators. An operator is simply a Python class with an “execute()” method, which gets called when it is being run. Operators generally run independently, except for those dependencies where the DAG makes sure that operators run in the correct order.

TYPES OF THE OPERATOR:

–  BashOpertor – executes a bash command

– PythonOperator – calls an arbitrary Python function

– EmailOperator – sends an email

– SimpleHttpOperator – sends an HTTP request

– DAG run for a specific dag_id

– MySqlOperator, SqliteOperator, OracleOperator, JdbcOperator, etc. – executes a SQL command

Sensor

Sensors are a special kind of operator. When they run, they will check to see if certain criteria are met before they complete and let their downstream tasks execute; and also fail when they time out. The basic function of this sensor is to monitor the execution. The sensor operator is a Python class with a “poke()” method, which gets called repeatedly until “True” is returned.

Dependencies

Setting up dependencies between tasks is the next most important step after following all the above steps. Basically defining the order in which tasks must be executed.

Task dependencies are set using the set_upstream and set_downstream operators (Though, in version ≥ 1.8, it’s also possible to use the bitshift operators << and >> to perform the same operations more concisely). A task can have multiple dependencies. There are different ways of defining dependencies

t2.set_upstream(t1)
t2<<t1
t1>>t2>>t3
t1.set_downstream([t2,t3])
t1>>[t2,t3]
t1>>(t2,d3)
[t2,t3]<<t1

After this vibrant overview of Airflow, we are looking forward to the next part uncovering most of the enigmas of Airflow Orchestration.

About the Author

Tanya Sharma is a budding business analyst at Factspan who likes to solve real-life problems and enjoys data analysis and working with different analytics and BI tools.  She has previously worked with Google’s partners and clients like NASA, World Bank, NOAA, Walmart, Target etc across multiple domains. When not doing her day job, she can be found sketching or wandering off to an offbeat getaway, and in bonus, she is always available for a quick chat.

Most Popular
AI based Medical Imaging Tools 2024 | Factspan

Top AI Medical Imaging tools for Healthc...

SnowPipe: Cloud Data Ingestion Tool Powe...

Ventilation Management in Emergency care with Machine Learning and DBT

Machine Learning and DBT Systems for Eme...

AWS in Data engineering Cover | Factspan

AWS Data Engineering Essentials Guideboo...

Let’s Connect

    Work Email*

    Phone Number (Optional)

    1000/1000

    Scroll to Top