Data Engineering: Data Modeling

30 July 2025, Carlos Pena

My notes about “DeepLearning.AI Data Engineering” (It will still be updated, I’m making a dump for easy access in the future)

C3: Data Storage and Queries

1. File Storage

🧠 Used for: operating systems, shared drives, and application-level file systems.


2. Block Storage

Concept: Files are divided into small, fixed-size blocks; each block is tracked in a lookup table.

⚙️ Typical use cases: databases, application servers, and transactional systems.


3. Object Storage (e.g., Amazon S3)

Concept: Stores immutable data objects, each addressed by a unique key instead of a file path.

⚙️ Examples: AWS S3, Google Cloud Storage, Azure Blob Storage.


4. Distributed Storage Systems

The CAP Theorem

A distributed system can only fully guarantee two of the following three:

Property Description
Consistency (C) Every read reflects the most recent write (ACID-like).
Availability (A) Every request receives a response (even if outdated).
Partition Tolerance (P) The system remains operational despite network failures.

Data Access Scenarios and Storage Models

Scenario

Row-Oriented Databases (SQL / OLTP)

Use Case: Low-latency reads/writes for transactional systems.

Column-Oriented Databases (NoSQL / OLAP)

Use Case: Analytical workloads with large aggregations.

Hybrid Formats: Parquet and ORC

Definition: Row–columnar hybrid storage formats optimized for analytics.

Graph Databases

Use Case: Model and query complex data relationships.

Applications:

Common Implementations:

Data Model:


Cypher Query Language (Neo4j)

Basic Queries:

-- Return all nodes in the graph
MATCH (n) RETURN n;

-- Count total nodes
MATCH (n) RETURN COUNT(n) AS total_nodes;

-- Return distinct node labels (types)
MATCH (n) RETURN DISTINCT labels(n);

-- Count nodes of specific type
MATCH (n:Order) RETURN COUNT(n) AS order_count;

-- Show properties of one Order node
MATCH (n:Order) RETURN properties(n) LIMIT 1;

Aggregation Queries:

-- Compute average order value across all relationships
MATCH ()-[r:ORDERS]->()
RETURN AVG(r.quantity * r.unitPrice) AS avg_order_value;

-- Average price per product category
MATCH ()-[r:ORDERS]->()-[:PART_OF]->(c:Category)
RETURN c.CategoryName, AVG(r.quantity * r.unitPrice) AS avg_price
ORDER BY avg_price DESC;

-- Top 10 customers by total spending
MATCH (c:Customer)-[p:PURCHASED]->()
RETURN c.CustomerID, c.name, SUM(p.amount) AS total_spent
ORDER BY total_spent DESC
LIMIT 10;

Filtering and Pattern Matching:

-- List products in "Meat" category
MATCH (p:Product)-[:PART_OF]->(c:Category {CategoryName: "Meat"})
RETURN p.ProductName, p.UnitPrice
ORDER BY p.UnitPrice DESC;

-- Find customers who bought the same product as "Carlos" (collaborative filtering)
MATCH (c1:Customer {CustomerID: "Carlos"})-[:PURCHASED]->()-[:ORDERS]->(p:Product)
      <-[:ORDERS]-()<-[:PURCHASED]-(c2:Customer)
WHERE c1 <> c2
RETURN DISTINCT c2.CustomerID, c2.name;

-- Find friend recommendations (friend-of-friend not already connected)
MATCH (me:Person {name: "Alice"})-[:FRIENDS_WITH]->(friend)-[:FRIENDS_WITH]->(fof)
WHERE NOT (me)-[:FRIENDS_WITH]->(fof) AND me <> fof
RETURN fof.name, COUNT(friend) AS mutual_friends
ORDER BY mutual_friends DESC;

Write Operations:

-- Create new node with properties
CREATE (p:Product {
    country: 'US',
    description: "SmartTV",
    code: 'BWC',
    price: 599.99
}) RETURN p;

-- Create relationship between existing nodes
MATCH (a:Product {code: 'ABC'}), (b:Product {code: 'CDE'})
CREATE (a)-[r:RELATED_TO {similarity: 0.85, dist: 12}]->(b)
RETURN r;

-- Update node properties
MATCH (p:Product) WHERE p.code = 'BWC'
SET p.price = 199, p.discount = true
RETURN p;

-- Delete node and all its relationships
MATCH (p:Product)-[r]-() WHERE p.code = 'CLR'
DELETE r, p;

Advanced Queries (WITH clause for aggregation pipelines):

-- Find products with exactly 1 related product
MATCH (p:Product)-[f:RELATED_TO]->(related:Product)
WITH p, COUNT(f) AS relation_count
WHERE relation_count = 1 AND p.code = 'RAA'
RETURN p.code, p.description, relation_count
LIMIT 10;

-- Multi-hop fraud detection: flag accounts with shared payment methods
MATCH (account1:Account)-[:USES_CARD]->(card:CreditCard)<-[:USES_CARD]-(account2:Account)
WHERE account1 <> account2
WITH account1, account2, COUNT(card) AS shared_cards
WHERE shared_cards > 2
RETURN account1.id, account2.id, shared_cards AS fraud_score
ORDER BY fraud_score DESC;

Performance Optimization:


C4: Data Modeling, Transformation, and Serving

Modern Data Storage Architectures

Data Warehouse vs. Data Lake vs. Data Lakehouse

Feature / Aspect 🏛️ Data Warehouse 🪣 Data Lake Data Lakehouse
Core Purpose Centralized analytical store SQL/BI Raw data repository for all data types Unified platform combining data lake flexibility with warehouse reliability
Data Type Structured Structured, Semi-structured, Unstructured Structured and Semi-structured
Schema Predefined (schema-on-write) Flexible (schema-on-read) Hybrid: supports both
Storage Format Columnar storage (e.g., Parquet, ORC) Object storage (e.g., S3, GCS, Azure Blob) Object storage with metadata layers
Performance Optimized for analytical queries Lower performance for direct queries Optimized through caching and metadata
Cost High (due to compute and management) Low (commodity storage) Moderate: low storage cost with efficient compute
Scalability Moderate High High: scalable with compute/storage decoupling
Use Cases BI, Reporting, Historical Analysis Data exploration, Data science, ETL staging Unified analytics, ML, BI, real-time analytics
Management Risk Data Silos Risk of Data Swamp (if ungoverned) Centralized governance, schema enforcement
Examples / Tech Snowflake, BigQuery, Redshift AWS S3, Azure Data Lake, Hadoop HDFS Databricks Delta Lake, Apache Iceberg, Apache Hudi

Next-Generation Data Lake Architecture

  1. Landing Zone: Raw files (e.g., .csv, .json, .png, .mp3).

  2. Processing Zone: Data cleaning, validation, standardization, and PII removal.

  3. Cleaned/Transformed Zone: Optimized storage formats (.parquet, .avro, .orc).

  4. Modeling/Business Zone: Business logic transformations and enrichment.

  5. Curated/Enriched Zone: Final structured data ready for analytics or ML.

Open Table Formats

Provide transactional capabilities on top of data lakes.

Format Core Abstractions & Strengths Ecosystem Alignment
Delta Lake Transaction log stored as JSON + Parquet checkpoints; strong integration with Apache Spark; excellent ACID semantics and time travel Deep integration with Databricks ecosystem and Spark; growing native support in Flink, Trino, and Snowflake

Apache Iceberg

Shared Capabilities:

AWS Lake Formation

Purpose: Centralized governance and orchestration for AWS-based data lakes.

Components

  1. Data Sources: S3, Relational databases, NoSQL stores, etc.

  2. Ingestion: Tools: AWS Kinesis, Firehose, DataSync, Data Migration Service (DMS).

  3. Storage Layer:
    • S3: All data types (raw and processed).
    • Amazon Redshift: Structured and semi-structured data.
    • Redshift Spectrum: Integrates S3 and Redshift seamlessly (no ETL required).
  4. Processing: AWS EMR, AWS Glue, Apache Flink, SQL-based ELT.

  5. Catalog: Lake Formation + AWS Glue Data Catalog (includes metadata and IAM policies).

  6. Consumption Layer: AWS Athena, Redshift Spectrum, QuickSight, SageMaker.

smart_open Library

Purpose: Efficient I/O streaming for very large files.

Highlights:


Summary

Layer Focus Example Tools
Data Warehouse Structured, analytical AWS Redshift
Data Lake Flexible, raw data Hadoop, Spark, S3
Data Lakehouse Unified storage + analytics Databricks, Delta Lake
Governance Metadata, access control AWS Lake Formation, Glue
ETL/Processing Transformation & ingestion EMR, Glue, Flink, Wrangler

🔹 Denormalized Form

Normal Forms: Eliminating Redundancy

Normalization progressively eliminates redundancy and dependency anomalies to ensure data integrity.


1NF – First Normal Form

Rule: Each column contains only atomic values (no arrays, no nested objects), and each row is uniquely identifiable.

Requirements:

  1. Table has a Primary Key
  2. Each column contains single values (not lists or sets)
  3. No repeating groups (e.g., phone1, phone2, phone3 columns)

Example Violation:

order_id customer_name products
1 Alice [“Laptop”, “Mouse”]

1NF Correction:

order_id customer_name product
1 Alice Laptop
1 Alice Mouse

Python Tools for 1NF:

import pandas as pd

# Flatten nested JSON
df = pd.json_normalize(json_data)

# Explode lists into separate rows
df = df.explode('products')

# Encode categories as integers
df['category_id'], categories = pd.factorize(df['category'])

2NF – Second Normal Form

Rule: Already in 1NF + no partial dependencies (non-key columns must depend on the entire composite key).

When this matters: Tables with composite primary keys (e.g., (order_id, product_id)).

Example Violation:

order_id product_id product_name customer_name order_date
1 101 Laptop Alice 2025-01-01
1 102 Mouse Alice 2025-01-01

Problem: customer_name and order_date depend only on order_id (partial dependency).

2NF Correction:

Orders Table:

order_id customer_name order_date
1 Alice 2025-01-01

Order_Items Table:

order_id product_id product_name
1 101 Laptop
1 102 Mouse

3NF – Third Normal Form

Rule: Already in 2NF + no transitive dependencies (non-key columns must not depend on other non-key columns).

Example Violation:

order_id customer_name city state country
1 Alice Boston MA USA
2 Bob Cambridge MA USA

Problem: statecountry (transitive dependency: order_idcitystatecountry)

3NF Correction:

Orders Table:

order_id customer_name city_id
1 Alice 101
2 Bob 102

Cities Table:

city_id city state country
101 Boston MA USA
102 Cambridge MA USA

Benefits:


When to Normalize vs. Denormalize:


🔹 Star Schema (OLAP Modeling)

Steps to Build:

  1. Choose business process: e.g., Sales Transactions.
    • Questions:
      • Which products sell in which stores?
      • How do sales vary by store or brand?
  2. Declare the grain: e.g., Individual item in an order.
  3. Identify dimensions:
    • dim_store (surrogate key, store info)
    • dim_item (product details)
    • dim_date (calendar attributes: day, month, quarter, weekday)
  4. Define facts (order line):
    • item_quantity, item_price
    • Foreign Keys: store_id, item_id, date_id
    • Natural keys: (order_id, line_number)
    • Surrogate PK: e.g., MD5(order_id + line_number)

🔹 Data Vault Modeling

🔥 Apache Spark Overview

Apache Spark is a distributed computing framework for large-scale data processing. It generalizes MapReduce by performing operations in-memory, drastically reducing I/O overhead.

Key Concepts

🧩 Typed Data and Schemas

Explicit schema definition improves:

# TODO: improve this example
from pyspark.sql.types import *

schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

test_df = spark.createDataFrame(list_of_tuples, schema)
test_df.show()

💾 Read/Writing Data to Relational Databases (JDBC)

test_df.write.jdbc(
    url=jdbc_url,
    table="test_schema.test_table",
    mode="overwrite",
    properties=jdbc_properties
)

customers_df = spark.read.jdbc(
    url=jdbc_url,
    table="classicmodels.customers",
    properties=jdbc_properties
)

# used to register a DataFrame as a temporary
# SQL-queryable view within the current Spark session
customers_df.createOrReplaceTempView("customers")

Custom SQL Functions (UDFs)

from pyspark.sql.types import StringType

def titleCase(text: str):
    return ' '.join(word.capitalize() for word in text.split())

spark.udf.register("titleUDF", titleCase, StringType())
spark.sql("SELECT book_id, titleUDF(book_name) AS title FROM books")

Query

dim_customers_df = spark.sql("""
    SELECT
        CAST(customerNumber AS STRING) AS customer_number,
        ...
    FROM customers
""")

# Add columns

dim_customers_df = dim_customers_df.withColumn(
    "customer_key",
    surrogateUDF(array("customer_number"))
)

# 📆 Date Dimension Generation

from pyspark.sql.functions import (
    col, explode, sequence, year, month, dayofweek,
    dayofmonth, dayofyear, weekofyear, date_format, lit
)
from pyspark.sql.types import DateType

start_date = "2003-01-01"
end_date = "2005-12-31"

date_range_df = spark.sql(f"""
    SELECT explode(sequence(to_date('{start_date}'), to_date('{end_date}'), interval 1 day)) AS date_day
""")

date_dim_df = date_range_df \
    .withColumn("day_of_week", dayofweek("date_day")) \
    .withColumn("day_of_month", dayofmonth("date_day")) \
    .withColumn("day_of_year", dayofyear("date_day")) \
    .withColumn("week_of_year", weekofyear("date_day")) \
    .withColumn("month_of_year", month("date_day")) \
    .withColumn("year_number", year("date_day")) \
    .withColumn("month_name", date_format("date_day", "MMMM")) \
    .withColumn("quarter_of_year", get_quarter_of_year_udf("date_day"))

date_dim_df.show()

Part 4: Real-Time Data Integration

Change Data Capture (CDC) Pipelines

Change Data Capture (CDC) is a pattern for tracking and propagating database changes in real-time to downstream systems.

Why CDC?

Traditional Batch ETL Limitations:

CDC Advantages:


CDC Architecture Pattern

[OLTP Database] → [CDC Engine] → [Event Stream] → [Stream Processor] → [Target System]
      MySQL          Debezium        Kafka           Flink/Spark        PostgreSQL/S3

Components Explained

1. Source Database (MySQL, PostgreSQL, Oracle)

2. CDC Engine (Debezium)

Example Change Event (Debezium):

{
  "before": {"id": 101, "name": "Alice", "balance": 500},
  "after": {"id": 101, "name": "Alice", "balance": 750},
  "op": "u",  // Operation: c=create, u=update, d=delete
  "ts_ms": 1705392000000,
  "source": {"db": "customers", "table": "accounts"}
}

3. Event Stream (Apache Kafka)

Flink CDC Processing Example:

# Pseudocode: Flink SQL for CDC processing
CREATE TABLE customers_cdc (
    id INT,
    name STRING,
    balance DECIMAL,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'mysql.customers.accounts',
    'format' = 'debezium-json'
);

-- Materialize current state (UPSERT semantics)
CREATE TABLE customer_snapshot (
    id INT PRIMARY KEY,
    name STRING,
    total_balance DECIMAL
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://warehouse/analytics'
);

INSERT INTO customer_snapshot
SELECT id, name, balance FROM customers_cdc;

5. Target System (Data Warehouse, Data Lake)


CDC Pipeline Flow (Step-by-Step)

Scenario: Customer updates account balance

  1. Application writes to MySQL:
    UPDATE accounts SET balance = 750 WHERE id = 101;
    
  2. MySQL writes to binlog:
    • Binlog entry contains full before/after row state
  3. Debezium reads binlog:
    • Parses binary log entry
    • Converts to JSON change event
    • Publishes to Kafka topic mysql.customers.accounts
  4. Kafka persists event:
    • Event stored across multiple brokers (replicated)
    • Available to multiple consumers
  5. Flink consumes event:
    • Reads from Kafka topic
    • Applies transformations (e.g., compute new metrics)
    • Maintains internal state (e.g., running balance)
  6. Flink writes to PostgreSQL:
    • UPSERT operation (update if exists, insert if new)
    • Analytics database now reflects latest state

CDC Use Cases

Use Case Description
Real-time analytics Keep data warehouse in sync with OLTP databases (sub-second latency)
Microservices sync Propagate changes across service boundaries without tight coupling
Caching invalidation Update Redis/Memcached when source data changes
Search index updates Sync Elasticsearch with database changes for fresh search results
Audit logging Complete change history for compliance (GDPR, SOX)
Data lake ingestion Stream changes to S3/Delta Lake for long-term storage

CDC Implementation Example (Debezium + Kafka + PostgreSQL)

Technology Stack:

Layer Technology Purpose
Source MySQL 8.0 Transactional database (binlog enabled)
CDC Debezium 2.x Reads MySQL binlog, publishes change events
Stream Apache Kafka 3.x Event backbone (durable, distributed queue)
Processing Apache Flink 1.17 Real-time transformations and aggregations
Target PostgreSQL 15 Analytical serving layer (materialized views)