My notes about โDeepLearning.AI Data Engineeringโ (It will still be updated, Iโm making a dump for easy access in the future)
๐น 1NF โ First Normal Form
colors = ["red","blue"]
, explode into multiple rows.pd.json_normalize
โ Flatten nested JSON.pd.explode
โ Convert list to multiple rows.pd.factorize
โ Encode categorical values.๐น 2NF โ Second Normal Form
(order_id, product_id)
, the customer_name
depends only on order_id
. โ Move customer data to a separate table.๐น 3NF โ Third Normal Form
city โ state
, and state โ country
, then city
should not sit with country
in the same table. Normalize into separate entities.dim_store
(surrogate key, store info)dim_item
(product details)dim_date
(calendar attributes: day, month, quarter, weekday)item_quantity
, item_price
store_id
, item_id
, date_id
(order_id, line_number)
MD5(order_id + line_number)
Apache Spark is a distributed computing framework for large-scale data processing.
It generalizes MapReduce by performing operations in-memory, drastically reducing I/O overhead.
show()
, collect()
, write()
) is triggered.Explicit schema definition improves:
# TODO: improve this example
from pyspark.sql.types import *
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
test_df = spark.createDataFrame(list_of_tuples, schema)
test_df.show()
test_df.write.jdbc(
url=jdbc_url,
table="test_schema.test_table",
mode="overwrite",
properties=jdbc_properties
)
customers_df = spark.read.jdbc(
url=jdbc_url,
table="classicmodels.customers",
properties=jdbc_properties
)
# used to register a DataFrame as a temporary
# SQL-queryable view within the current Spark session
customers_df.createOrReplaceTempView("customers")
from pyspark.sql.types import StringType
def titleCase(text: str):
return ' '.join(word.capitalize() for word in text.split())
spark.udf.register("titleUDF", titleCase, StringType())
spark.sql("SELECT book_id, titleUDF(book_name) AS title FROM books")
dim_customers_df = spark.sql("""
SELECT
CAST(customerNumber AS STRING) AS customer_number,
...
FROM customers
""")
# Add columns
dim_customers_df = dim_customers_df.withColumn(
"customer_key",
surrogateUDF(array("customer_number"))
)
# ๐ Date Dimension Generation
from pyspark.sql.functions import (
col, explode, sequence, year, month, dayofweek,
dayofmonth, dayofyear, weekofyear, date_format, lit
)
from pyspark.sql.types import DateType
start_date = "2003-01-01"
end_date = "2005-12-31"
date_range_df = spark.sql(f"""
SELECT explode(sequence(to_date('{start_date}'), to_date('{end_date}'), interval 1 day)) AS date_day
""")
date_dim_df = date_range_df \
.withColumn("day_of_week", dayofweek("date_day")) \
.withColumn("day_of_month", dayofmonth("date_day")) \
.withColumn("day_of_year", dayofyear("date_day")) \
.withColumn("week_of_year", weekofyear("date_day")) \
.withColumn("month_of_year", month("date_day")) \
.withColumn("year_number", year("date_day")) \
.withColumn("month_name", date_format("date_day", "MMMM")) \
.withColumn("quarter_of_year", get_quarter_of_year_udf("date_day"))
date_dim_df.show()
The CDC pipeline ensures real-time synchronization between source and target systems through event streaming.
Flow:
Technologies:
Layers Overview
Layer | Technology | Purpose |
---|---|---|
Source | MySQL | Transactional data source |
CDC | Debezium | Change data capture |
Stream | Kafka | Event streaming backbone |
Processing | Flink | Real-time transformations |
Serving | PostgreSQL | Analytical serving store |