My notes about “DeepLearning.AI Data Engineering” (It will still be updated, I’m making a dump for easy access in the future)
🧠 Used for: operating systems, shared drives, and application-level file systems.
Concept: Files are divided into small, fixed-size blocks; each block is tracked in a lookup table.
⚙️ Typical use cases: databases, application servers, and transactional systems.
Concept: Stores immutable data objects, each addressed by a unique key instead of a file path.
⚙️ Examples: AWS S3, Google Cloud Storage, Azure Blob Storage.
A distributed system can only fully guarantee two of the following three:
| Property | Description |
|---|---|
| Consistency (C) | Every read reflects the most recent write (ACID-like). |
| Availability (A) | Every request receives a response (even if outdated). |
| Partition Tolerance (P) | The system remains operational despite network failures. |
SELECT to compute the sum of price.Use Case: Low-latency reads/writes for transactional systems.
Use Case: Analytical workloads with large aggregations.
SELECT * FROM table or frequent updates/inserts.Definition: Row–columnar hybrid storage formats optimized for analytics.
min, max, count, etc.SNAPPY (recommended) or gzip (small size, slower time)Use Case: Model and query complex data relationships.
Applications:
Common Implementations:
Data Model:
( )[ ](source_node)-[relation]->(target_node)Basic Queries:
-- Return all nodes in the graph
MATCH (n) RETURN n;
-- Count total nodes
MATCH (n) RETURN COUNT(n) AS total_nodes;
-- Return distinct node labels (types)
MATCH (n) RETURN DISTINCT labels(n);
-- Count nodes of specific type
MATCH (n:Order) RETURN COUNT(n) AS order_count;
-- Show properties of one Order node
MATCH (n:Order) RETURN properties(n) LIMIT 1;
Aggregation Queries:
-- Compute average order value across all relationships
MATCH ()-[r:ORDERS]->()
RETURN AVG(r.quantity * r.unitPrice) AS avg_order_value;
-- Average price per product category
MATCH ()-[r:ORDERS]->()-[:PART_OF]->(c:Category)
RETURN c.CategoryName, AVG(r.quantity * r.unitPrice) AS avg_price
ORDER BY avg_price DESC;
-- Top 10 customers by total spending
MATCH (c:Customer)-[p:PURCHASED]->()
RETURN c.CustomerID, c.name, SUM(p.amount) AS total_spent
ORDER BY total_spent DESC
LIMIT 10;
Filtering and Pattern Matching:
-- List products in "Meat" category
MATCH (p:Product)-[:PART_OF]->(c:Category {CategoryName: "Meat"})
RETURN p.ProductName, p.UnitPrice
ORDER BY p.UnitPrice DESC;
-- Find customers who bought the same product as "Carlos" (collaborative filtering)
MATCH (c1:Customer {CustomerID: "Carlos"})-[:PURCHASED]->()-[:ORDERS]->(p:Product)
<-[:ORDERS]-()<-[:PURCHASED]-(c2:Customer)
WHERE c1 <> c2
RETURN DISTINCT c2.CustomerID, c2.name;
-- Find friend recommendations (friend-of-friend not already connected)
MATCH (me:Person {name: "Alice"})-[:FRIENDS_WITH]->(friend)-[:FRIENDS_WITH]->(fof)
WHERE NOT (me)-[:FRIENDS_WITH]->(fof) AND me <> fof
RETURN fof.name, COUNT(friend) AS mutual_friends
ORDER BY mutual_friends DESC;
Write Operations:
-- Create new node with properties
CREATE (p:Product {
country: 'US',
description: "SmartTV",
code: 'BWC',
price: 599.99
}) RETURN p;
-- Create relationship between existing nodes
MATCH (a:Product {code: 'ABC'}), (b:Product {code: 'CDE'})
CREATE (a)-[r:RELATED_TO {similarity: 0.85, dist: 12}]->(b)
RETURN r;
-- Update node properties
MATCH (p:Product) WHERE p.code = 'BWC'
SET p.price = 199, p.discount = true
RETURN p;
-- Delete node and all its relationships
MATCH (p:Product)-[r]-() WHERE p.code = 'CLR'
DELETE r, p;
Advanced Queries (WITH clause for aggregation pipelines):
-- Find products with exactly 1 related product
MATCH (p:Product)-[f:RELATED_TO]->(related:Product)
WITH p, COUNT(f) AS relation_count
WHERE relation_count = 1 AND p.code = 'RAA'
RETURN p.code, p.description, relation_count
LIMIT 10;
-- Multi-hop fraud detection: flag accounts with shared payment methods
MATCH (account1:Account)-[:USES_CARD]->(card:CreditCard)<-[:USES_CARD]-(account2:Account)
WHERE account1 <> account2
WITH account1, account2, COUNT(card) AS shared_cards
WHERE shared_cards > 2
RETURN account1.id, account2.id, shared_cards AS fraud_score
ORDER BY fraud_score DESC;
Performance Optimization:
CREATE INDEX ON :Customer(CustomerID))[:KNOWS*1..3] for 1-3 hops)| Feature / Aspect | 🏛️ Data Warehouse | 🪣 Data Lake | ⚡ Data Lakehouse |
|---|---|---|---|
| Core Purpose | Centralized analytical store SQL/BI | Raw data repository for all data types | Unified platform combining data lake flexibility with warehouse reliability |
| Data Type | Structured | Structured, Semi-structured, Unstructured | Structured and Semi-structured |
| Schema | Predefined (schema-on-write) | Flexible (schema-on-read) | Hybrid: supports both |
| Storage Format | Columnar storage (e.g., Parquet, ORC) | Object storage (e.g., S3, GCS, Azure Blob) | Object storage with metadata layers |
| Performance | Optimized for analytical queries | Lower performance for direct queries | Optimized through caching and metadata |
| Cost | High (due to compute and management) | Low (commodity storage) | Moderate: low storage cost with efficient compute |
| Scalability | Moderate | High | High: scalable with compute/storage decoupling |
| Use Cases | BI, Reporting, Historical Analysis | Data exploration, Data science, ETL staging | Unified analytics, ML, BI, real-time analytics |
| Management Risk | Data Silos | Risk of Data Swamp (if ungoverned) | Centralized governance, schema enforcement |
| Examples / Tech | Snowflake, BigQuery, Redshift | AWS S3, Azure Data Lake, Hadoop HDFS | Databricks Delta Lake, Apache Iceberg, Apache Hudi |
Landing Zone:
Raw files (e.g., .csv, .json, .png, .mp3).
Processing Zone: Data cleaning, validation, standardization, and PII removal.
Cleaned/Transformed Zone:
Optimized storage formats (.parquet, .avro, .orc).
Modeling/Business Zone: Business logic transformations and enrichment.
Curated/Enriched Zone: Final structured data ready for analytics or ML.
Provide transactional capabilities on top of data lakes.
| Format | Core Abstractions & Strengths | Ecosystem Alignment |
|---|---|---|
| Delta Lake | Transaction log stored as JSON + Parquet checkpoints; strong integration with Apache Spark; excellent ACID semantics and time travel | Deep integration with Databricks ecosystem and Spark; growing native support in Flink, Trino, and Snowflake |
Apache Iceberg
Shared Capabilities:
Purpose: Centralized governance and orchestration for AWS-based data lakes.
Data Sources: S3, Relational databases, NoSQL stores, etc.
Ingestion: Tools: AWS Kinesis, Firehose, DataSync, Data Migration Service (DMS).
Processing: AWS EMR, AWS Glue, Apache Flink, SQL-based ELT.
Catalog: Lake Formation + AWS Glue Data Catalog (includes metadata and IAM policies).
Purpose: Efficient I/O streaming for very large files.
Highlights:
open().| Layer | Focus | Example Tools |
|---|---|---|
| Data Warehouse | Structured, analytical | AWS Redshift |
| Data Lake | Flexible, raw data | Hadoop, Spark, S3 |
| Data Lakehouse | Unified storage + analytics | Databricks, Delta Lake |
| Governance | Metadata, access control | AWS Lake Formation, Glue |
| ETL/Processing | Transformation & ingestion | EMR, Glue, Flink, Wrangler |
Normalization progressively eliminates redundancy and dependency anomalies to ensure data integrity.
Rule: Each column contains only atomic values (no arrays, no nested objects), and each row is uniquely identifiable.
Requirements:
phone1, phone2, phone3 columns)Example Violation:
| order_id | customer_name | products |
|---|---|---|
| 1 | Alice | [“Laptop”, “Mouse”] |
1NF Correction:
| order_id | customer_name | product |
|---|---|---|
| 1 | Alice | Laptop |
| 1 | Alice | Mouse |
Python Tools for 1NF:
import pandas as pd
# Flatten nested JSON
df = pd.json_normalize(json_data)
# Explode lists into separate rows
df = df.explode('products')
# Encode categories as integers
df['category_id'], categories = pd.factorize(df['category'])
Rule: Already in 1NF + no partial dependencies (non-key columns must depend on the entire composite key).
When this matters: Tables with composite primary keys (e.g., (order_id, product_id)).
Example Violation:
| order_id | product_id | product_name | customer_name | order_date |
|---|---|---|---|---|
| 1 | 101 | Laptop | Alice | 2025-01-01 |
| 1 | 102 | Mouse | Alice | 2025-01-01 |
Problem: customer_name and order_date depend only on order_id (partial dependency).
2NF Correction:
Orders Table:
| order_id | customer_name | order_date |
|---|---|---|
| 1 | Alice | 2025-01-01 |
Order_Items Table:
| order_id | product_id | product_name |
|---|---|---|
| 1 | 101 | Laptop |
| 1 | 102 | Mouse |
Rule: Already in 2NF + no transitive dependencies (non-key columns must not depend on other non-key columns).
Example Violation:
| order_id | customer_name | city | state | country |
|---|---|---|---|---|
| 1 | Alice | Boston | MA | USA |
| 2 | Bob | Cambridge | MA | USA |
Problem: state → country (transitive dependency: order_id → city → state → country)
3NF Correction:
Orders Table:
| order_id | customer_name | city_id |
|---|---|---|
| 1 | Alice | 101 |
| 2 | Bob | 102 |
Cities Table:
| city_id | city | state | country |
|---|---|---|---|
| 101 | Boston | MA | USA |
| 102 | Cambridge | MA | USA |
Benefits:
When to Normalize vs. Denormalize:
dim_store (surrogate key, store info)dim_item (product details)dim_date (calendar attributes: day, month, quarter, weekday)item_quantity, item_pricestore_id, item_id, date_id(order_id, line_number)MD5(order_id + line_number)Apache Spark is a distributed computing framework for large-scale data processing. It generalizes MapReduce by performing operations in-memory, drastically reducing I/O overhead.
show(), collect(), write()) is triggered.Explicit schema definition improves:
# TODO: improve this example
from pyspark.sql.types import *
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
test_df = spark.createDataFrame(list_of_tuples, schema)
test_df.show()
test_df.write.jdbc(
url=jdbc_url,
table="test_schema.test_table",
mode="overwrite",
properties=jdbc_properties
)
customers_df = spark.read.jdbc(
url=jdbc_url,
table="classicmodels.customers",
properties=jdbc_properties
)
# used to register a DataFrame as a temporary
# SQL-queryable view within the current Spark session
customers_df.createOrReplaceTempView("customers")
from pyspark.sql.types import StringType
def titleCase(text: str):
return ' '.join(word.capitalize() for word in text.split())
spark.udf.register("titleUDF", titleCase, StringType())
spark.sql("SELECT book_id, titleUDF(book_name) AS title FROM books")
dim_customers_df = spark.sql("""
SELECT
CAST(customerNumber AS STRING) AS customer_number,
...
FROM customers
""")
# Add columns
dim_customers_df = dim_customers_df.withColumn(
"customer_key",
surrogateUDF(array("customer_number"))
)
# 📆 Date Dimension Generation
from pyspark.sql.functions import (
col, explode, sequence, year, month, dayofweek,
dayofmonth, dayofyear, weekofyear, date_format, lit
)
from pyspark.sql.types import DateType
start_date = "2003-01-01"
end_date = "2005-12-31"
date_range_df = spark.sql(f"""
SELECT explode(sequence(to_date('{start_date}'), to_date('{end_date}'), interval 1 day)) AS date_day
""")
date_dim_df = date_range_df \
.withColumn("day_of_week", dayofweek("date_day")) \
.withColumn("day_of_month", dayofmonth("date_day")) \
.withColumn("day_of_year", dayofyear("date_day")) \
.withColumn("week_of_year", weekofyear("date_day")) \
.withColumn("month_of_year", month("date_day")) \
.withColumn("year_number", year("date_day")) \
.withColumn("month_name", date_format("date_day", "MMMM")) \
.withColumn("quarter_of_year", get_quarter_of_year_udf("date_day"))
date_dim_df.show()
Change Data Capture (CDC) is a pattern for tracking and propagating database changes in real-time to downstream systems.
Traditional Batch ETL Limitations:
CDC Advantages:
[OLTP Database] → [CDC Engine] → [Event Stream] → [Stream Processor] → [Target System]
MySQL Debezium Kafka Flink/Spark PostgreSQL/S3
Example Change Event (Debezium):
{
"before": {"id": 101, "name": "Alice", "balance": 500},
"after": {"id": 101, "name": "Alice", "balance": 750},
"op": "u", // Operation: c=create, u=update, d=delete
"ts_ms": 1705392000000,
"source": {"db": "customers", "table": "accounts"}
}
mysql.customers.accounts)Flink CDC Processing Example:
# Pseudocode: Flink SQL for CDC processing
CREATE TABLE customers_cdc (
id INT,
name STRING,
balance DECIMAL,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'mysql.customers.accounts',
'format' = 'debezium-json'
);
-- Materialize current state (UPSERT semantics)
CREATE TABLE customer_snapshot (
id INT PRIMARY KEY,
name STRING,
total_balance DECIMAL
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://warehouse/analytics'
);
INSERT INTO customer_snapshot
SELECT id, name, balance FROM customers_cdc;
Scenario: Customer updates account balance
UPDATE accounts SET balance = 750 WHERE id = 101;
mysql.customers.accounts| Use Case | Description |
|---|---|
| Real-time analytics | Keep data warehouse in sync with OLTP databases (sub-second latency) |
| Microservices sync | Propagate changes across service boundaries without tight coupling |
| Caching invalidation | Update Redis/Memcached when source data changes |
| Search index updates | Sync Elasticsearch with database changes for fresh search results |
| Audit logging | Complete change history for compliance (GDPR, SOX) |
| Data lake ingestion | Stream changes to S3/Delta Lake for long-term storage |
Technology Stack:
| Layer | Technology | Purpose |
|---|---|---|
| Source | MySQL 8.0 | Transactional database (binlog enabled) |
| CDC | Debezium 2.x | Reads MySQL binlog, publishes change events |
| Stream | Apache Kafka 3.x | Event backbone (durable, distributed queue) |
| Processing | Apache Flink 1.17 | Real-time transformations and aggregations |
| Target | PostgreSQL 15 | Analytical serving layer (materialized views) |