Building a Basic ETL Pipeline in Python with OOP

In this blog post, we'll walk through creating a basic ETL (Extract, Transform, Load) pipeline in Python using object-oriented programming principles. We'll demonstrate how to extract data from various sources, transform it, and load it into a SQLite database. We'll also include pytest unit tests to ensure our pipeline works correctly.

This post originated from my motivation to better understand ETL pipelines. Since I grasp high level concepts better through coding, I've decided to build a basic ETL pipeline in Python for future reference.

An ETL data pipeline diagram

Table of Contents

  1. Introduction
  2. Setting Up the Project
  3. Extracting Data
  4. Transforming Data
  5. Loading Data
  6. Chaining the ETL Services
  7. Writing Unit Tests
  8. Running the ETL Pipeline
  9. Considerations & Next Steps
  10. Conclusion

Introduction

ETL pipelines are essential components in data engineering and analytics workflows. They allow us to gather data from different sources, apply transformations process, clean and categorize data, and load it into a destination data store for further analysis or reporting.

Here are some examples of how processed data can be used for:

  • Business Insights (BI)
    • Sales Analytics: Aggregate data from different stores or regions to analyze trends, forecast sales and optimize inventory management
    • Marketing Campaigns Analytics: Integrate data from marketing channels (social media, ads, email campaigns) to measure customer engagement, ROI
    • Customer Behaviour Analysis: Combining data from different CRM system, website interactions, transaction histories to categorize customers, predict and personalize marketing efforts
  • Training Data for ML/AI Models
    • Natural Language Processing (NLP): Using text data from customer reviews, social media posts, or support tickets to train sentiment analysis or chatbot models.
    • Predictive Maintenance: Processing sensor data from industrial equipment to predict failures and optimize maintenance schedules.
    • Image Recognition: Training models on labeled image datasets to automate tasks like image classification or object detection in applications ranging from medical diagnostics to autonomous vehicles.
  • Operational Efficiency
    • Supply Chain Optimization: Integrating data from suppliers, logistics partners, and inventory systems to optimize supply chain processes and reduce costs.
    • Workforce Management: Integrating data from employee schedules, task assignments, and productivity metrics to optimize workforce deployment.
  • Monitoring
    • Real-time Analytics: Streaming data from IoT devices or monitoring systems to detect anomalies, trigger alerts, and take immediate actions (e.g., in cybersecurity or network monitoring).
    • Cloud Provider Resource Usage Management: ollecting and analyzing data from cloud services to monitor compute resource usage (CPU, memory, storage) across different instances. This can help in identifying under-utilized resources, optimizing workloads, predicting future resource needs, and managing costs by rightsizing instances or scheduling non-critical tasks during off-peak times.
  • Risk Management
    • Fraud Detection: Analyzing transactional data in real-time to detect fraudulent activities and prevent financial losses.
    • Credit Risk Assessment: Using data from credit reports, transaction histories, and economic indicators to assess the creditworthiness of individuals or businesses.
  • Financial Analysis
    • Portfolio Management: Using historical market data and economic indicators to optimize investment strategies and manage portfolio risk.
    • Cash Flow Forecasting: Analyzing historical cash flow data, sales forecasts, and expenditure patterns to predict future cash flows.

We'll build a simple ETL pipeline that extracts data from different sources (CSV, Restful APIs, XML, and SQL), transforms the data, and loads it into a SQLite database.

Setting Up the Project

First, let's set up our project directory structure:

etl_pipeline/
|-- test_data/
|   |-- source_data.csv
|   |-- source_data.xml
|   |-- source_data.sql
|-- etl_services.py
|-- test_etl_pipeline.py

Install the necessary Python packages:

pip install pandas requests sqlalchemy pytest

Extracting Data

Create an abstract base class for our extract services and implement specific services for CSV, API JSON, XML, and SQL data sources.

import pandas as pd
import requests
import xml.etree.ElementTree as ET
import sqlite3
from sqlalchemy import create_engine
from datetime import datetime
from abc import ABC, abstractmethod

class ExtractService(ABC):
    @abstractmethod
    def extract(self):
        pass

class CSVExtractService(ExtractService):
    def __init__(self, file_path):
        self.file_path = file_path

    def extract(self):
        data = pd.read_csv(self.file_path)
        data['source'] = 'CSV'
        return data

class APIJSONExtractService(ExtractService):
    def __init__(self, api_url):
        self.api_url = api_url

    def extract(self):
        response = requests.get(self.api_url)
        data = pd.DataFrame(response.json())
        data['source'] = 'API'
        return data

class XMLExtractService(ExtractService):
    def __init__(self, file_path):
        self.file_path = file_path

    def extract(self):
        tree = ET.parse(self.file_path)
        root = tree.getroot()
        data = []
        for child in root:
            record = child.attrib
            record['source'] = 'XML'
            data.append(record)
        return pd.DataFrame(data)

class SQLExtractService(ExtractService):
    def __init__(self, db_path, query):
        self.db_path = db_path
        self.query = query

    def extract(self):
        conn = sqlite3.connect(self.db_path)
        data = pd.read_sql_query(self.query, conn)
        data['source'] = 'SQL'
        conn.close()
        return data

Transforming Data

We'll create a transformation service that cleans and processes the data. This service will handle missing values, remove duplicates, and add a total column.

class TransformService(ABC):
    @abstractmethod
    def transform(self, data):
        pass

class BasicTransformService(TransformService):
    def transform(self, data):
        # Example transformations: Remove duplicates, fill missing values, add new column
        data = data.drop_duplicates()
        data = data.fillna(method='ffill')
        data['total'] = data['quantity'].astype(float) * data['price']
        return data

BasicTransformService is just a generic transformation service, but as different data sources require different kind of data to be fixed and mapped, new transformation services can be created.

Loading Data

We'll create a loader service to load the transformed data into a SQLite database, which will be the destination data source for our aggregated data. This service will add a timestamp column when the record is stored. As an ETL evolves, different load services can also spawn, depending when the destination data is stored.

class LoadService(ABC):
    @abstractmethod
    def load(self, data):
        pass

class SQLiteLoadService(LoadService):
    def __init__(self, db_path, table_name):
        self.db_path = db_path
        self.table_name = table_name

    def load(self, data):
        engine = create_engine(f'sqlite:///{self.db_path}')
        data['timestamp'] = datetime.now()
        data.to_sql(self.table_name, con=engine, if_exists='replace', index=False)

Chaining the ETL Services

We can now create a class representing the pipeline to initialize the ETL services and execute them on after the other.

class ETLPipeline:
    def __init__(self, extractor: ExtractService, transformer: TransformService, loader: LoadService):
        self.extractor = extractor
        self.transformer = transformer
        self.loader = loader

    def run(self):
        extracted_data = self.extractor.extract()
        transformed_data = self.transformer.transform(extracted_data)
        self.loader.load(transformed_data)

Writing Unit Tests

We'll use pytest to write unit tests for each component of our ETL pipeline. We'll create test data for all four data types and write tests for each extraction and transformation service.

import os
import pandas as pd
import sqlite3
import pytest
from datetime import datetime

from etl_services import CSVExtractService, APIJSONExtractService, XMLExtractService, SQLExtractService
from etl_services import BasicTransformService, SQLiteLoadService, ETLPipeline

@pytest.fixture
def setup_sql_db():
    # source data store
    db_path = 'test_data/test_db.db'

    sql_script = 'test_data/source_data.sql'
    conn = sqlite3.connect(db_path)
    with open(sql_script, 'r') as f:
        conn.executescript(f.read())
    conn.close()
    yield db_path
    os.remove(db_path)

def test_csv_extraction():
    extractor = CSVExtractService('test_data/source_data.csv')
    data = extractor.extract()
    assert isinstance(data, pd.DataFrame)
    assert len(data) == 5
    assert 'source' in data.columns
    assert all(data['source'] == 'CSV')

def test_api_json_extraction(requests_mock):
    api_url = 'https://api.example.com/data'
    requests_mock.get(api_url, json=[
        {"identifier": 1, "date": "2024-07-01", "quantity": 10, "price": 9.99},
        {"identifier": 2, "date": "2024-07-02", "quantity": 15, "price": 19.99},
        {"identifier": 3, "date": "2024-07-03", "quantity": 7, "price": 14.99},
        {"identifier": 4, "date": "2024-07-04", "quantity": None, "price": 29.99},
        {"identifier": 5, "date": "2024-07-05", "quantity": 20, "price": 9.99}
    ])
    extractor = APIJSONExtractService(api_url)
    data = extractor.extract()
    assert isinstance(data, pd.DataFrame)
    assert len(data) == 5
    assert 'source' in data.columns
    assert all(data['source'] == 'API')

def test_xml_extraction():
    extractor = XMLExtractService('test_data/source_data.xml')
    data = extractor.extract()
    assert isinstance(data, pd.DataFrame)
    assert len(data) == 5
    assert 'source' in data.columns
    assert all(data['source'] == 'XML')

def test_sql_extraction(setup_sql_db):
    db_path = setup_sql_db
    extractor = SQLExtractService(db_path, 'SELECT * FROM sales')
    data = extractor.extract()
    assert isinstance(data, pd.DataFrame)
    assert len(data) == 5
    assert 'source' in data.columns
    assert all(data['source'] == 'SQL')

def test_basic_transformation():
    data = pd.DataFrame({
        'identifier': [1, 2, 3, 4, 5],
        'date': ['2024-07-01', '2024-07-02', '2024-07-03', '2024-07-04', '2024-07-05'],
        'quantity': [10, 15, 7, None, 20],
        'price': [9.99, 19.99, 14.99, 29.99, 9.99],
        'source': ['CSV', 'CSV', 'CSV', 'CSV', 'CSV']
    })
    transformer = BasicTransformService()
    transformed_data = transformer.transform(data)
    assert 'total' in transformed_data.columns
    assert len(transformed_data) == 5

def test_etl_pipeline(setup_sql_db):
    db_path = setup_sql_db
    extractor = CSVExtractService('test_data/source_data.csv')
    transformer = BasicTransformService()
    loader = SQLiteLoadService(db_path, 'transformed_sales')

    pipeline = ETLPipeline(extractor, transformer, loader)
    pipeline.run()

    conn = sqlite3.connect(db_path)
    loaded_data = pd.read_sql_query('SELECT * FROM transformed_sales', conn)
    conn.close()

    assert len(loaded_data) == 5
    assert 'total' in loaded_data.columns
    assert 'timestamp' in loaded_data.columns
    assert loaded_data['timestamp'].apply(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f')).notnull().all()

Running the ETL Pipeline

With our services and tests in place, we can run our ETL pipeline. First, run the tests using pytest:

pytest test_etl_pipeline.py

If all tests pass, you can run the ETL pipeline by creating an instance of the ETLPipeline class with the appropriate services and calling the run method.

Considerations and Next Steps

This example provides a basic representation of an ETL pipeline. However, it does not meet the non-functional requirements necessary for a production-ready solution. Real-world data pipelines often handle large volumes of data and need to be scalable, highly available, and auditable.

To meet these requirements, we should consider the following:

  1. Scalability: Use distributed data processing frameworks such as Apache Spark or Apache Flink to handle large datasets.
  2. Availability: Deploy your pipeline on cloud platforms like AWS, GCP, or Azure to ensure high availability and reliability.
  3. Auditability: Implement logging and monitoring using tools like Apache Kafka, Prometheus, and Grafana to track data processing and ensure data integrity.

Additionally, we could improve this solution by:

  • Implementing parallel processing to speed up data extraction and transformation.
  • Adding error handling and retry mechanisms to handle data extraction failures.
  • Using a more restrict data validation steps to ensure data quality before loading it into the destination.

Conclusion

In this blog post, we've built a simple ETL pipeline in Python, complete with extraction, transformation, and loading services. We've also written unit tests using pytest to ensure our pipeline works correctly. This example provides the building blocks to create more complex and robust ETL pipelines.