implements a scalable ETL pipeline using PySpark
This project aims to build an end-to-end ETL (Extract, Transform, Load) pipeline for analyzing customer transactions. The goal is to identify customers who bought AirPods after purchasing an iPhone and customers who bought only these two products, iPhone and AirPods, without any other products. The pipeline reads data from various data sources, applies business logic to filter the relevant data, and loads the processed data into different storage locations.
The project involves working with three types of data sources:
The workflow follows the ETL pattern:
reader_factory notebook)This notebook contains code that defines how data from different sources (CSV, Parquet, Delta) will be read. The Factory Design Pattern is used here to abstract the data loading logic for each source type.
class DataSource:
"""
Abstract class to represent a data source.
"""
def __init__(self, path):
self.path = path
def get_data_frame(self):
"""
Abstract method to be implemented in subclasses.
"""
raise ValueError("Not Implemented")
DataSource Class: An abstract class that serves as a base for all data source types. It requires a path parameter to be provided for loading data and defines an abstract method get_data_frame to be implemented by subclasses.class CSVDataSource(DataSource):
def get_data_frame(self):
return (
spark.read.format("csv").option("header", True).load(self.path)
)
CSVDataSource Class: Inherits from DataSource. Implements get_data_frame to read CSV files using Spark’s read method with headers.class ParquetDataSource(DataSource):
def get_data_frame(self):
return (
spark.read.format("parquet").load(self.path)
)
ParquetDataSource Class: Similar to the CSVDataSource, this class implements get_data_frame for reading Parquet files.class DeltaDataSource(DataSource):
def get_data_frame(self):
table_name = self.path
return (
spark.read.table(table_name)
)
DeltaDataSource Class: This class reads data from a Delta table using its name provided as the path.def get_data_source(data_type, file_path):
if data_type == "csv":
return CSVDataSource(file_path)
elif data_type == "parquet":
return ParquetDataSource(file_path)
elif data_type == "delta":
return DeltaDataSource(file_path)
else:
raise ValueError(f"Not implemented for data_type: {data_type}")
get_data_source Function: This factory function takes a data_type and a file_path and returns an instance of the appropriate data source class based on the input type.extractor notebook)This notebook defines the Extract phase of the ETL pipeline. It extracts data from different sources and stores them in data frames for further processing.
class Extractor:
"""
Abstract class for data extraction.
"""
def __init__(self):
pass
def extract(self):
pass
Extractor Class: An abstract class for defining data extraction logic.class AirpodsAfterIphoneExtractor(Extractor):
def extract(self):
transcatioInputDF = get_data_source(
data_type="csv",
file_path="local_path_to/Transaction_Updated.csv"
).get_data_frame()
customerInputDF = get_data_source(
data_type="delta",
file_path="default.customer_delta_table_persist"
).get_data_frame()
inputDFs = {
"transcatioInputDF": transcatioInputDF,
"customerInputDF": customerInputDF
}
return inputDFs
AirpodsAfterIphoneExtractor Class: Inherits from Extractor. This class extracts transaction data from a CSV file and customer data from a Delta table. It uses the get_data_source function to get the appropriate data frames and returns them in a dictionary.transform notebook)The Transform phase of the pipeline applies business logic to filter and transform the data to meet the requirements.
class Transformer:
def __init__(self):
pass
def transform(self, inputDFs):
pass
Transformer Class: An abstract class for defining data transformation logic.class AirpodsAfterIphoneTransformer(Transformer):
def transform(self, inputDFs):
transcatioInputDF = inputDFs.get("transcatioInputDF")
windowSpec = Window.partitionBy("customer_id").orderBy("transaction_date")
transformedDF = transcatioInputDF.withColumn(
"next_product_name", lead("product_name").over(windowSpec)
)
filteredDF = transformedDF.filter(
(col("product_name") == "iPhone") & (col("next_product_name") == "AirPods")
)
customerInputDF = inputDFs.get("customerInputDF")
joinDF = customerInputDF.join(
broadcast(filteredDF),
"customer_id"
)
return joinDF.select(
"customer_id",
"customer_name",
"location"
)
AirpodsAfterIphoneTransformer Class: Inherits from Transformer. This class identifies customers who bought AirPods after purchasing an iPhone. It uses a window function to look ahead at the next product purchased by each customer and filters the resulting DataFrame to find the relevant transactions. Finally, it joins this filtered DataFrame with customer information based on customer_id.class OnlyAirpodsAndIphone(Transformer):
def transform(self, inputDFs):
transcatioInputDF = inputDFs.get("transcatioInputDF")
groupedDF = transcatioInputDF.groupBy("customer_id").agg(
collect_set("product_name").alias("products")
)
filteredDF = groupedDF.filter(
(array_contains(col("products"), "iPhone")) &
(array_contains(col("products"), "AirPods")) &
(size(col("products")) == 2)
)
customerInputDF = inputDFs.get("customerInputDF")
joinDF = customerInputDF.join(
broadcast(filteredDF),
"customer_id"
)
return joinDF.select(
"customer_id",
"customer_name",
"location"
)
OnlyAirpodsAndIphone Class: Also inherits from Transformer. This class identifies customers who bought only iPhones and AirPods by grouping transactions per customer and checking the product set size. It then joins this information with customer details.loader_factory notebook)The Load phase saves the transformed data into the appropriate sinks, using the Factory Design Pattern to abstract the logic of writing data.
class DataSink:
"""
Abstract class for data sinks.
"""
def __init__(self, df, path, method, params):
self.df = df
self.path = path
self.method = method
self.params = params
def load_data_frame(self):
raise ValueError("Not Implemented")
DataSink Class: An abstract class for defining how data will be loaded into different formats.class LoadToLocal(DataSink):
def load_data_frame(self):
self.df.write.mode(self.method).save(self.path)
LoadToLocal Class: Implements load_data_frame to save DataFrames as files in local storage.class LoadToDeltaTable(DataSink):
def load_data_frame(self):
self.df.write.format("delta").mode(self.method).saveAsTable(self.path)
LoadToDeltaTable Class: This class saves a DataFrame as a Delta table in the specified path.def get_sink_source(sink_type, df, path, method, params=None):
if sink_type == "local":
return LoadToLocal(df, path, method, params)
elif sink_type == "delta":
return LoadToDeltaTable(df, path, method, params)
else:
raise ValueError(f"Not implemented for sink_type: {sink_type}")
get_sink_source Function: This factory function returns an instance of the appropriate sink class based on the input type (local or Delta).loader notebook)This notebook contains the logic for saving the transformed data to the appropriate sinks, using the loader classes defined in the loader factory.
class Loader:
def __init__(self):
pass
def load(self):
pass
Loader Class: An abstract class for defining data loading logic.class AirPodsAfterIphoneLoader(Loader):
def load(self, transformedDF):
sink = get_sink_source(
sink_type="local",
df=transformedDF,
path="local_path_to/airpods_after_iphone.csv",
method="overwrite"
)
sink.load_data_frame()
AirPodsAfterIphoneLoader Class: Inherits from Loader. This class loads the transformed data into a local CSV file.class OnlyAirpodsAndIphoneLoader(Loader):
def load(self, transformedDF):
sink = get_sink_source(
sink_type="delta",
df=transformedDF,
path="default.only_airpods_and_iphone_delta_table",
method="overwrite"
)
sink.load_data_frame()
OnlyAirpodsAndIphoneLoader Class: This class loads the data into a Delta table.apple_analysis notebook)This is the main orchestration notebook that runs the entire ETL pipeline.
class FirstWorkFlow:
def __init__(self):
self.extractor = AirpodsAfterIphoneExtractor()
self.transformer = AirpodsAfterIphoneTransformer()
self.loader = AirPodsAfterIphoneLoader()
def run(self):
extracted_data = self.extractor.extract()
transformed_data = self.transformer.transform(extracted_data)
self.loader.load(transformed_data)
FirstWorkFlow Class: This class handles the first workflow where customers who bought AirPods after an iPhone are identified. It coordinates extraction, transformation, and loading steps.class SecondWorkFlow:
def __init__(self):
self.extractor = AirpodsAfterIphoneExtractor()
self.transformer = OnlyAirpodsAndIphone()
self.loader = OnlyAirpodsAndIphoneLoader()
def run(self):
extracted_data = self.extractor.extract()
transformed_data = self.transformer.transform(extracted_data)
self.loader.load(transformed_data)
SecondWorkFlow Class: This class manages the second workflow, focusing on customers who bought only iPhones and AirPods.class WorkFlowRunner:
def run(self):
first_workflow = FirstWorkFlow()
second_workflow = SecondWorkFlow()
first_workflow.run()
second_workflow.run()
WorkFlowRunner Class: This class orchestrates the execution of both workflows.This project successfully implements a scalable ETL pipeline using Apache Spark, Delta Lake, and the factory design pattern to handle different data sources and sinks. The flexibility provided by this design allows for future extensions and modifications, ensuring maintainability and scalability for large-scale data processing tasks. The use of local file storage and Delta tables allows for efficient processing and storage of large datasets while enabling ACID compliance and versioning.