Building a Basic ETL Pipeline in Python with OOP
In this blog post, we'll walk through creating a basic ETL (Extract, Transform, Load) pipeline in Python using object-oriented programming principles. We'll demonstrate how to extract data from various sources, transform it, and load it into a SQLite database. We'll also include pytest unit tests to ensure our pipeline works correctly.
This post originated from my motivation to better understand ETL pipelines. Since I grasp high level concepts better through coding, I've decided to build a basic ETL pipeline in Python for future reference.
Table of Contents
- Introduction
- Setting Up the Project
- Extracting Data
- Transforming Data
- Loading Data
- Chaining the ETL Services
- Writing Unit Tests
- Running the ETL Pipeline
- Considerations & Next Steps
- Conclusion
Introduction
ETL pipelines are essential components in data engineering and analytics workflows. They allow us to gather data from different sources, apply transformations process, clean and categorize data, and load it into a destination data store for further analysis or reporting.
Here are some examples of how processed data can be used for:
- Business Insights (BI)
- Sales Analytics: Aggregate data from different stores or regions to analyze trends, forecast sales and optimize inventory management
- Marketing Campaigns Analytics: Integrate data from marketing channels (social media, ads, email campaigns) to measure customer engagement, ROI
- Customer Behaviour Analysis: Combining data from different CRM system, website interactions, transaction histories to categorize customers, predict and personalize marketing efforts
- Training Data for ML/AI Models
- Natural Language Processing (NLP): Using text data from customer reviews, social media posts, or support tickets to train sentiment analysis or chatbot models.
- Predictive Maintenance: Processing sensor data from industrial equipment to predict failures and optimize maintenance schedules.
- Image Recognition: Training models on labeled image datasets to automate tasks like image classification or object detection in applications ranging from medical diagnostics to autonomous vehicles.
- Operational Efficiency
- Supply Chain Optimization: Integrating data from suppliers, logistics partners, and inventory systems to optimize supply chain processes and reduce costs.
- Workforce Management: Integrating data from employee schedules, task assignments, and productivity metrics to optimize workforce deployment.
- Monitoring
- Real-time Analytics: Streaming data from IoT devices or monitoring systems to detect anomalies, trigger alerts, and take immediate actions (e.g., in cybersecurity or network monitoring).
- Cloud Provider Resource Usage Management: ollecting and analyzing data from cloud services to monitor compute resource usage (CPU, memory, storage) across different instances. This can help in identifying under-utilized resources, optimizing workloads, predicting future resource needs, and managing costs by rightsizing instances or scheduling non-critical tasks during off-peak times.
- Risk Management
- Fraud Detection: Analyzing transactional data in real-time to detect fraudulent activities and prevent financial losses.
- Credit Risk Assessment: Using data from credit reports, transaction histories, and economic indicators to assess the creditworthiness of individuals or businesses.
- Financial Analysis
- Portfolio Management: Using historical market data and economic indicators to optimize investment strategies and manage portfolio risk.
- Cash Flow Forecasting: Analyzing historical cash flow data, sales forecasts, and expenditure patterns to predict future cash flows.
We'll build a simple ETL pipeline that extracts data from different sources (CSV, Restful APIs, XML, and SQL), transforms the data, and loads it into a SQLite database.
Setting Up the Project
First, let's set up our project directory structure:
etl_pipeline/
|-- test_data/
| |-- source_data.csv
| |-- source_data.xml
| |-- source_data.sql
|-- etl_services.py
|-- test_etl_pipeline.py
Install the necessary Python packages:
pip install pandas requests sqlalchemy pytest
Extracting Data
Create an abstract base class for our extract services and implement specific services for CSV, API JSON, XML, and SQL data sources.
import pandas as pd
import requests
import xml.etree.ElementTree as ET
import sqlite3
from sqlalchemy import create_engine
from datetime import datetime
from abc import ABC, abstractmethod
class ExtractService(ABC):
@abstractmethod
def extract(self):
pass
class CSVExtractService(ExtractService):
def __init__(self, file_path):
self.file_path = file_path
def extract(self):
data = pd.read_csv(self.file_path)
data['source'] = 'CSV'
return data
class APIJSONExtractService(ExtractService):
def __init__(self, api_url):
self.api_url = api_url
def extract(self):
response = requests.get(self.api_url)
data = pd.DataFrame(response.json())
data['source'] = 'API'
return data
class XMLExtractService(ExtractService):
def __init__(self, file_path):
self.file_path = file_path
def extract(self):
tree = ET.parse(self.file_path)
root = tree.getroot()
data = []
for child in root:
record = child.attrib
record['source'] = 'XML'
data.append(record)
return pd.DataFrame(data)
class SQLExtractService(ExtractService):
def __init__(self, db_path, query):
self.db_path = db_path
self.query = query
def extract(self):
conn = sqlite3.connect(self.db_path)
data = pd.read_sql_query(self.query, conn)
data['source'] = 'SQL'
conn.close()
return data
Transforming Data
We'll create a transformation service that cleans and processes the data. This service will handle missing values, remove duplicates, and add a total
column.
class TransformService(ABC):
@abstractmethod
def transform(self, data):
pass
class BasicTransformService(TransformService):
def transform(self, data):
# Example transformations: Remove duplicates, fill missing values, add new column
data = data.drop_duplicates()
data = data.fillna(method='ffill')
data['total'] = data['quantity'].astype(float) * data['price']
return data
BasicTransformService
is just a generic transformation service, but as different data sources require different kind of data to be fixed and mapped, new transformation services can be created.
Loading Data
We'll create a loader service to load the transformed data into a SQLite database, which will be the destination data source for our aggregated data. This service will add a timestamp
column when the record is stored. As an ETL evolves, different load services can also spawn, depending when the destination data is stored.
class LoadService(ABC):
@abstractmethod
def load(self, data):
pass
class SQLiteLoadService(LoadService):
def __init__(self, db_path, table_name):
self.db_path = db_path
self.table_name = table_name
def load(self, data):
engine = create_engine(f'sqlite:///{self.db_path}')
data['timestamp'] = datetime.now()
data.to_sql(self.table_name, con=engine, if_exists='replace', index=False)
Chaining the ETL Services
We can now create a class representing the pipeline to initialize the ETL services and execute them on after the other.
class ETLPipeline:
def __init__(self, extractor: ExtractService, transformer: TransformService, loader: LoadService):
self.extractor = extractor
self.transformer = transformer
self.loader = loader
def run(self):
extracted_data = self.extractor.extract()
transformed_data = self.transformer.transform(extracted_data)
self.loader.load(transformed_data)
Writing Unit Tests
We'll use pytest
to write unit tests for each component of our ETL pipeline. We'll create test data for all four data types and write tests for each extraction and transformation service.
import os
import pandas as pd
import sqlite3
import pytest
from datetime import datetime
from etl_services import CSVExtractService, APIJSONExtractService, XMLExtractService, SQLExtractService
from etl_services import BasicTransformService, SQLiteLoadService, ETLPipeline
@pytest.fixture
def setup_sql_db():
# source data store
db_path = 'test_data/test_db.db'
sql_script = 'test_data/source_data.sql'
conn = sqlite3.connect(db_path)
with open(sql_script, 'r') as f:
conn.executescript(f.read())
conn.close()
yield db_path
os.remove(db_path)
def test_csv_extraction():
extractor = CSVExtractService('test_data/source_data.csv')
data = extractor.extract()
assert isinstance(data, pd.DataFrame)
assert len(data) == 5
assert 'source' in data.columns
assert all(data['source'] == 'CSV')
def test_api_json_extraction(requests_mock):
api_url = 'https://api.example.com/data'
requests_mock.get(api_url, json=[
{"identifier": 1, "date": "2024-07-01", "quantity": 10, "price": 9.99},
{"identifier": 2, "date": "2024-07-02", "quantity": 15, "price": 19.99},
{"identifier": 3, "date": "2024-07-03", "quantity": 7, "price": 14.99},
{"identifier": 4, "date": "2024-07-04", "quantity": None, "price": 29.99},
{"identifier": 5, "date": "2024-07-05", "quantity": 20, "price": 9.99}
])
extractor = APIJSONExtractService(api_url)
data = extractor.extract()
assert isinstance(data, pd.DataFrame)
assert len(data) == 5
assert 'source' in data.columns
assert all(data['source'] == 'API')
def test_xml_extraction():
extractor = XMLExtractService('test_data/source_data.xml')
data = extractor.extract()
assert isinstance(data, pd.DataFrame)
assert len(data) == 5
assert 'source' in data.columns
assert all(data['source'] == 'XML')
def test_sql_extraction(setup_sql_db):
db_path = setup_sql_db
extractor = SQLExtractService(db_path, 'SELECT * FROM sales')
data = extractor.extract()
assert isinstance(data, pd.DataFrame)
assert len(data) == 5
assert 'source' in data.columns
assert all(data['source'] == 'SQL')
def test_basic_transformation():
data = pd.DataFrame({
'identifier': [1, 2, 3, 4, 5],
'date': ['2024-07-01', '2024-07-02', '2024-07-03', '2024-07-04', '2024-07-05'],
'quantity': [10, 15, 7, None, 20],
'price': [9.99, 19.99, 14.99, 29.99, 9.99],
'source': ['CSV', 'CSV', 'CSV', 'CSV', 'CSV']
})
transformer = BasicTransformService()
transformed_data = transformer.transform(data)
assert 'total' in transformed_data.columns
assert len(transformed_data) == 5
def test_etl_pipeline(setup_sql_db):
db_path = setup_sql_db
extractor = CSVExtractService('test_data/source_data.csv')
transformer = BasicTransformService()
loader = SQLiteLoadService(db_path, 'transformed_sales')
pipeline = ETLPipeline(extractor, transformer, loader)
pipeline.run()
conn = sqlite3.connect(db_path)
loaded_data = pd.read_sql_query('SELECT * FROM transformed_sales', conn)
conn.close()
assert len(loaded_data) == 5
assert 'total' in loaded_data.columns
assert 'timestamp' in loaded_data.columns
assert loaded_data['timestamp'].apply(lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f')).notnull().all()
Running the ETL Pipeline
With our services and tests in place, we can run our ETL pipeline. First, run the tests using pytest:
pytest test_etl_pipeline.py
If all tests pass, you can run the ETL pipeline by creating an instance of the ETLPipeline class with the appropriate services and calling the run
method.
Considerations and Next Steps
This example provides a basic representation of an ETL pipeline. However, it does not meet the non-functional requirements necessary for a production-ready solution. Real-world data pipelines often handle large volumes of data and need to be scalable, highly available, and auditable.
To meet these requirements, we should consider the following:
- Scalability: Use distributed data processing frameworks such as Apache Spark or Apache Flink to handle large datasets.
- Availability: Deploy your pipeline on cloud platforms like AWS, GCP, or Azure to ensure high availability and reliability.
- Auditability: Implement logging and monitoring using tools like Apache Kafka, Prometheus, and Grafana to track data processing and ensure data integrity.
Additionally, we could improve this solution by:
- Implementing parallel processing to speed up data extraction and transformation.
- Adding error handling and retry mechanisms to handle data extraction failures.
- Using a more restrict data validation steps to ensure data quality before loading it into the destination.
Conclusion
In this blog post, we've built a simple ETL pipeline in Python, complete with extraction, transformation, and loading services. We've also written unit tests using pytest to ensure our pipeline works correctly. This example provides the building blocks to create more complex and robust ETL pipelines.