Creating ETL pipeline using Python

An ETL pipeline is a fundamental type of workflow in data engineering. The goal is to take data which might be unstructured or difficult to use and serve a source of clean, structured data. It is very easy to build a simple data pipeline as a python script. In this article, we tell you about ETL process, ETL tools and creating a data pipeline using simple python script.


Extract-Transform-Load (Source: Astera)

What is ETL process?

ETL is a process in data warehousing which stands for Extract, Transform and Load. It’s a process of extracting huge amounts of data from a variety of sources and transforming the extracted data into a well organized and readable format via techniques like data aggregation and data normalization and finally at last loading it into a storage system like database, data warehouses.

What are ETL Tools?

ETL tools are tools which have been developed to simplify and enhance the process of transferring raw data from a variety of systems to a data analytics warehouse. This could involve extracting, transforming and loading it onto the new infrastructure.

Is Python good for ETL?

Using the programming capabilities of python, it becomes flexible for organizations to create ETL pipelines that not only manage data but also transform it in accordance with business requirements.

Python ETL tools are generally ETL tools written in Python and support other python libraries for extracting, loading and transforming different types of tables of data imported from multiple data sources into data warehouses. Python ETL tools are fast, reliable and deliver high performance.

The list of best Python ETL tools that can manage well set of ETL process are:

·   Apache Airflow

·   petl

·   Spark

·   pandas

·   Luigi

Here we do not use any ETL tool for creating data pipeline.

Creating a simple ETL data pipeline using Python script from source (MYSQL) to sink (MongoDB).

We will try to create a ETL pipeline using easy python script and take the data from mysql, do some formatting on it and then push the data to mongodb. Let’s look at different steps involved in it.

STEP 1. Extracting the data from data source MYSQL.

Since we are working with Python and python is famous for its build in library to accomplish these tasks. We will start by importing the libraries that will be useful. We need to connect mongo, and mysql so we will import pymongo and mysql connector.

import pymongo
from pymongo import MongoClient
import mysql.connector as msql
from mysql.connector import error
import pandas as pd

Here we have to extract the table diamond_record from diamond database of MySQL and the source dataset is given below:

Now the code for extracting the dataset from MySQL to python is given below :


# Extracting data from mysql database

try:
connection = msql.connect(host='localhost’,database='diamond’,user='root',password='****') 

# creating a connection to mysql database

if connection.is_connected():
    db_Info = connection.get_server_info() 
    print("Connected to MySQL Server version ", db_Info) # getting the 
    server info
    cursor = connection.cursor() 
    cursor.execute("select database();") # selecting the database diamond
    record = cursor.fetchone()
    print("You're connected to database: ", record)
    mycursor = connection.cursor()

# executing the query to fetch all record from diamond record

    mycursor.execute("SELECT * from diamond_record") 
    table_rows = mycursor.fetchall()
 
except Error as e:
    print("Error while connecting to MySQL", e)
 
finally:
    if connection.is_connected():
       cursor.close()
       connection.close()
       print('mysql connection is closed')

You can use Jupyter Notebook to execute the above code. Below is how you can install and use it.

STEP 2. Transform the data using Python Pandas.

Transformation refers to the cleansing and aggregation that may need to happen to data to prepare it for analysis. There are some basic transformation given below:

  1. Cleaning
  2. Deduplication
  3. Format revision
  4. Key restructuring

After gathering the data from extraction phase , we’ll go on to the transform phase of the process. Here suppose we don’t require fields like product class, index_id, cut in the source data set. So, we clean the data dataset using pandas dataframe.

Now the code for transforming the dataset is given below :

# Transform data using pandas dataframe

# Creating dataframe

df = pd.DataFrame(table_rows,columns=["upc","title",
"product_class","index_id","shape","price","carat","color","cut","depth","girdle"])

# Dropping unwanted fields

new_df = df.drop(columns=["product_class","index_id","cut"])
 
print(new_df)

After Transformation the resultant dataset look like:

STEP 3. Load the data to sink MongoDB.

Load is the process of moving transformed data from a staging area into a target data warehouse. And here the target data warehouse is MongoDB cloud.

Finally, for loading the dataframe into mongodB cloud we have the quickest way. Using the insert_many method from pymongo 3 and the ‘records’ parameter of to_dict method.

Now the code for Loading the final dataset to MongoDB cloud is given below :

# Loading data to sink (__mongodb__)

# Making connection to Mongocloud

cluster =
pymongo.MongoClient("mongodb+srv://siddhartha:*****@cluster0.s9h5or8.mongodb.net/?retryWrites=true&w=majority")

# creating collection testdb 

db = cluster["testdb"]
collection = db["test"]
 
# Inserting values to table test 

x = collection.insert_many(new_df.to_dict('records')) #myresult comes from mysql cursor
print(len(x.inserted_ids))

and the loaded data on mongoDB cloud look like:

Here we learn , step by step process to create a simple data pipline from extracting phase to loading phase using python script .
In the next step, we learn how to automate ETL processes using Python ETL tool  Apache Airflow.


Siddhartha Sharma

Aspiring data analyst currently working as a freelancer tech blogger. Having proficiency in python, sql, advance excel , powerBI ,and metabase.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.