Case Study: Design a Modern Data Lakehouse
“Năm 2015 em phải chọn: data warehouse (đắt, structured, ACID) hoặc data lake (rẻ, flexible, no ACID). Năm 2024 em không phải chọn — Lakehouse cho cả hai. Iceberg + Trino + Flink chạy trên S3 với cost 1/10 Snowflake và schema evolution mạnh hơn Hive.”
Tags: system-design lakehouse iceberg delta-lake hudi kappa flink data-engineering case-study bonus Student: Hieu (Backend Dev → Architect) Prerequisite: Tuan-07-Database-Sharding-Replication · Tuan-08-Message-Queue · Case-Design-Ad-Click-Event-Aggregation Liên quan: Tuan-Bonus-Outbox-Pattern · Tuan-Bonus-Consistency-Models-Isolation · Case-Design-Metrics-Monitoring-Alerting
Context & Why — Tại sao Lakehouse?
Analogy: Thư viện truyền thống vs Kho hàng vs Trung tâm logistics
Hieu, tưởng tượng 3 mô hình lưu trữ data:
Data Warehouse (Snowflake, Redshift, BigQuery) — Thư viện 5 sao:
- Sách được biên mục cẩn thận, có index, ACID
- Tìm kiếm cực nhanh
- Đắt: $40-100/TB/tháng
- Cứng: phải predefine schema (ETL trước)
- Vendor lock-in: data format proprietary
Data Lake (S3 + Parquet/JSON) — Kho hàng tổng hợp:
- Vứt mọi thứ vào, format gì cũng được (CSV, JSON, Parquet, video)
- Rẻ: $20-25/TB/tháng (S3 Standard)
- Flexible: lưu raw, biến đổi sau (ELT)
- Vấn đề: không có ACID → concurrent writes corrupt data, schema mỗi file mỗi khác, không update/delete được hiệu quả
Lakehouse (Iceberg + S3 + Trino/Spark/Flink) — Trung tâm logistics hiện đại:
- Storage rẻ như Data Lake (S3 Parquet)
- ACID + transactions như Warehouse
- Schema evolution mạnh
- Time travel (đọc data tại thời điểm cũ)
- Multi-engine query (Spark + Flink + Trino + DuckDB cùng đọc 1 table)
- Open: Apache Iceberg, Apache Hudi, Delta Lake — không vendor lock
Tại sao Backend Dev cần hiểu Lakehouse?
| Lý do | Giải thích |
|---|---|
| Mọi production app cần analytics | Data từ DB phải đi đâu đó để dashboard, ML, BI |
| CDC từ Outbox đi vào Lakehouse | Hiểu Iceberg = hiểu data destination |
| Cost matter at scale | TB → PB scale: Snowflake 400K/year |
| Real-time analytics | Kappa architecture với Flink → Iceberg = sub-minute analytics |
| ML feature store | Modern feature stores (Feast, Tecton) build trên Iceberg |
| Compliance & audit | Time travel + lineage = GDPR/SOX-ready |
Tại sao Alex Xu không cover Lakehouse?
Alex Xu vol 1+2 nói về Ad-Click Aggregation (Kappa architecture) nhưng không nhắc Iceberg/Delta Lake — vì đây là evolution 2020+. Năm 2024-2026, mọi data infrastructure mới đều base trên Lakehouse pattern. Đây là gap critical để cập nhật vault này tới state-of-the-art.
Tham chiếu chính
- Apache Iceberg docs — https://iceberg.apache.org/docs/
- Delta Lake docs — https://docs.delta.io/
- Apache Hudi docs — https://hudi.apache.org/docs/overview/
- Lakehouse: A New Generation of Open Platforms (Databricks 2021 paper) — http://www.cidrdb.org/cidr2021/papers/cidr2021_paper17.pdf
- DDIA Chapter 10 (Batch) + Chapter 11 (Stream Processing) — Kleppmann
- Streaming Systems (Akidau, Chernyak, Lax, O’Reilly 2018)
Step 1 — Understand the Problem & Establish Design Scope
1.1 Clarifying Questions
| Câu hỏi | Trả lời giả định | Implication |
|---|---|---|
| Volume? | 10 TB/day raw (1B events × 10KB) | Cần distributed processing |
| Latency requirements? | Real-time (< 1 min) + batch (hourly/daily) | Hybrid — Kappa with batch fallback |
| Source data? | Postgres CDC + Kafka events + S3 raw files + 3rd-party APIs | Multi-source ingestion |
| Query patterns? | Ad-hoc SQL (analyst) + ML training + dashboards (real-time) | Multi-engine: Trino + Spark + Flink |
| Update/delete needed? | Yes (GDPR right-to-be-forgotten, late-arriving data) | Cần ACID + UPSERT support |
| Schema stability? | Evolves over time (add columns, change types) | Cần schema evolution |
| Compliance? | GDPR, SOC 2, audit trail | Time travel, immutable history |
| Multi-region? | Yes, replicate across US + EU | Cần geo-replication strategy |
1.2 Functional Requirements
- FR1 — Ingest: Multi-source data ingestion (CDC, streaming events, batch files, API pulls)
- FR2 — Store: Reliable, ACID-compliant storage at PB scale với cost-effective object storage
- FR3 — Query: Multi-engine SQL access (analytics, BI, ML)
- FR4 — Stream: Real-time queries với end-to-end latency < 1 minute
- FR5 — Update/Delete: Support row-level UPDATE/DELETE (GDPR compliance)
- FR6 — Time travel: Query data as-of any point in past N days
- FR7 — Schema evolution: Add/rename/drop columns without rewriting data
- FR8 — Lineage: Track data provenance (where each row came from, when transformed)
1.3 Non-functional Requirements
| Yêu cầu | Mục tiêu |
|---|---|
| Scale | 100 TB warm + 10 PB cold |
| Query latency | P95 < 10s for ad-hoc; sub-second for cached |
| Streaming freshness | < 1 min from event to queryable |
| Cost | < $0.01 per GB-month at warm tier |
| Availability | 99.9% (multi-AZ object storage) |
| Durability | 11 nines (S3 standard) |
| Concurrent writers | 50+ Flink jobs writing same table without conflicts |
1.4 Out of Scope
- Real-time OLTP (use Postgres/Cassandra for that)
- Sub-second analytics (use ClickHouse/Druid for those)
- Graph queries (use Neo4j/Neptune)
Step 2 — High-Level Design
2.1 Architecture Evolution
2.1.1 Lambda Architecture (Legacy, ~2010)
┌─────────────────────┐
│ Source events │
└──────────┬──────────┘
│
┌────────────────┴────────────────┐
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ Batch layer │ │ Speed layer │
│ (Hadoop, S3) │ │ (Storm, Flink) │
│ Hourly/daily │ │ Real-time │
│ Source of truth│ │ Approximate │
└──────────┬───────┘ └─────────┬────────┘
│ │
▼ ▼
Batch view Real-time view
│ │
└────────────────┬────────────────┘
▼
┌──────────────────┐
│ Serving layer │
│ (Cassandra, │
│ Druid, etc.) │
└──────────────────┘
Vấn đề Lambda:
- Code duplication: Logic tính toán phải viết 2 lần (batch + speed)
- Operational complexity: 2 pipelines, 2 systems, 2 teams
- Inconsistency: Batch và speed view thường disagree
- Debugging hell: Bug ở đâu? Batch hay speed?
2.1.2 Kappa Architecture (2014, Jay Kreps)
“Why use 2 pipelines when 1 works?” — Jay Kreps, Questioning the Lambda Architecture (2014)
┌─────────────────────┐
│ Source events │
└──────────┬──────────┘
│
▼
┌──────────────────┐
│ Kafka (immutable │
│ event log) │
└────────┬──────────┘
│
▼
┌──────────────────┐
│ Stream processor│ ← Single source of truth
│ (Flink) │
└────────┬──────────┘
│
▼
┌──────────────────┐
│ Serving layer │
└──────────────────┘
Ưu điểm:
- 1 pipeline, 1 codebase
- Reprocess: replay Kafka từ offset 0
- Consistency: chỉ 1 view duy nhất
Vấn đề: Kafka chỉ giữ ngắn hạn (7-30 ngày) → reprocess data > 30 ngày phức tạp.
2.1.3 Lakehouse Architecture (2020+)
┌─────────────────────┐
│ Source events │
└──────────┬──────────┘
│
┌───────────────┴───────────────┐
▼ ▼
┌──────────────┐ ┌──────────────┐
│ CDC │ │ Streaming │
│ (Debezium) │ │ (Kafka) │
└──────┬───────┘ └──────┬───────┘
│ │
└───────────┬────────────────────┘
▼
┌──────────────────────────────────┐
│ Bronze Layer (Raw) │
│ Iceberg/Delta on S3 │
│ Append-only, full fidelity │
└──────────────────┬───────────────┘
│
▼
┌──────────────────────────────────┐
│ Silver Layer (Cleansed) │
│ Iceberg, deduplicated, typed │
└──────────────────┬───────────────┘
│
▼
┌──────────────────────────────────┐
│ Gold Layer (Aggregated) │
│ Business metrics, ML features │
└─────────────────────────┬────────┘
│
┌────────────┬──────────┴───────┬──────────┐
▼ ▼ ▼ ▼
┌────────┐ ┌──────────┐ ┌─────────┐ ┌────────┐
│ Trino │ │ Spark │ │ Flink │ │ DuckDB │
│ (BI) │ │ (ML) │ │(Stream) │ │(Local) │
└────────┘ └──────────┘ └─────────┘ └────────┘
Ưu điểm:
- Single storage layer: tất cả query engine đọc cùng table
- Bronze/Silver/Gold pattern (Databricks medallion): rõ ràng data quality tier
- ACID trên S3: Iceberg/Delta cho concurrent writers safety
- Streaming + Batch unified: Flink writes Iceberg cùng table mà Spark đọc batch
- Cost: S3 + open format = 1/5 cost Snowflake
2.2 Components Overview
| Component | Vai trò | Choice |
|---|---|---|
| Storage | Object store cho data files | AWS S3 / GCS / Azure Blob |
| File format | Columnar storage | Apache Parquet (default) |
| Table format | ACID + metadata layer | Apache Iceberg / Delta Lake / Hudi |
| Catalog | Table registry | AWS Glue / Hive Metastore / Polaris / Unity |
| Streaming ingest | Real-time CDC + events | Apache Flink / Spark Streaming |
| Batch processing | ETL jobs | Apache Spark |
| Query engine | SQL access | Trino / Presto / DuckDB |
| Orchestration | Workflow scheduling | Airflow / Dagster / Prefect |
| Lineage | Data provenance | OpenLineage |
| Quality | Data tests | Great Expectations / Soda |
2.3 Why Iceberg vs Delta vs Hudi?
| Feature | Apache Iceberg | Delta Lake | Apache Hudi |
|---|---|---|---|
| Origin | Netflix (2017) | Databricks (2019) | Uber (2017) |
| Governance | Apache, vendor-neutral | Linux Foundation (Databricks-led) | Apache |
| ACID | ✅ | ✅ | ✅ |
| Schema evolution | ✅ Strongest (full) | ✅ Good | ✅ Limited |
| Time travel | ✅ Branch + tag | ✅ Version | ✅ Commit timeline |
| Hidden partitioning | ✅ Best | ⚠️ Manual | ⚠️ Manual |
| Multi-engine | ✅ Best (Spark, Flink, Trino, Snowflake) | ⚠️ Spark-best | ⚠️ Spark-Hudi best |
| Streaming upsert | Good | Good | ✅ Best (Hudi MoR) |
| Vendor lock | None | Some (Databricks features) | None |
| Adoption 2024 | Rising fast (Snowflake, Apple, Netflix, LinkedIn) | Mature (Databricks) | Specialized |
Khuyến nghị 2024-2026:
- Iceberg cho greenfield + multi-engine flexibility
- Delta nếu đã ở Databricks ecosystem
- Hudi cho heavy upsert workload (CDC-heavy)
2.4 High-Level Architecture Diagram
flowchart TB subgraph Sources["Data Sources"] PG[(Postgres)] Kafka[(Kafka<br/>events)] S3RAW[(S3 raw files)] API[3rd-party APIs] end subgraph Ingest["Ingestion Layer"] Deb[Debezium CDC] FlinkCDC[Flink CDC] Airbyte[Airbyte] end subgraph Lake["Lakehouse Storage (S3 + Iceberg)"] Bronze[(Bronze<br/>raw, append-only)] Silver[(Silver<br/>cleaned, typed)] Gold[(Gold<br/>aggregated, ML features)] end subgraph Catalog["Catalog Layer"] Glue[AWS Glue<br/>Iceberg REST] end subgraph Compute["Compute Engines"] Spark[Apache Spark<br/>batch ETL + ML] Flink[Apache Flink<br/>streaming] Trino[Trino<br/>ad-hoc SQL] DuckDB[DuckDB<br/>local analyst] end subgraph Consumers["Consumers"] BI[BI Tools<br/>Tableau, Superset] ML[ML Platform<br/>SageMaker, Feast] APP[Applications] end PG --> Deb --> Bronze Kafka --> FlinkCDC --> Bronze S3RAW --> Spark --> Bronze API --> Airbyte --> Bronze Bronze --> Spark --> Silver Silver --> Spark --> Gold Silver --> Flink --> Gold Catalog -.metadata.-> Bronze Catalog -.metadata.-> Silver Catalog -.metadata.-> Gold Gold --> Trino --> BI Gold --> Spark --> ML Gold --> DuckDB --> APP style Bronze fill:#cd7f32,color:#fff style Silver fill:#c0c0c0,color:#000 style Gold fill:#ffd700,color:#000
Step 3 — Deep Dive
3.1 Iceberg Internals
3.1.1 Table Layout on S3
s3://bucket/warehouse/db/table/
├── metadata/
│ ├── v1.metadata.json ← table metadata at version 1
│ ├── v2.metadata.json ← version 2 after first commit
│ ├── snap-12345.avro ← snapshot manifest list
│ └── 67890-m0.avro ← manifest file
└── data/
├── month=2026-01/
│ ├── 00000-1-abc.parquet
│ └── 00001-2-def.parquet
└── month=2026-02/
└── 00000-3-ghi.parquet
3 levels of metadata:
- Table metadata (
v*.metadata.json): schema, partition spec, snapshots list - Manifest list (
snap-*.avro): list of manifests for a snapshot - Manifest file (
*-m0.avro): list of data files + statistics (min/max per column)
Tại sao 3 levels:
- Snapshot isolation: Reader đọc tại snapshot N, không thấy ghi mới ở N+1
- Pruning: Manifest có column statistics → query có thể skip files không match WHERE
- Atomic commit: Đổi pointer trong table metadata → atomic version switch
3.1.2 Schema Evolution
Iceberg unique strength: Add/rename/drop column không cần rewrite data files.
-- Add column (default NULL for old rows)
ALTER TABLE events ADD COLUMN device_id STRING;
-- Rename column (just metadata change, files unchanged)
ALTER TABLE events RENAME COLUMN ts TO event_time;
-- Change column type (with promotion rules)
ALTER TABLE events ALTER COLUMN value TYPE BIGINT; -- int → bigint OK
-- Drop column (logical, doesn't delete data)
ALTER TABLE events DROP COLUMN deprecated_field;Cách hoạt động: Iceberg track column bằng field ID (integer), không phải tên. Schema evolution = update mapping field ID ↔ name.
3.1.3 Hidden Partitioning
Hive partitioning vấn đề:
-- Hive: phải biết partition column trong query
SELECT * FROM events
WHERE year=2026 AND month=05 AND day=01
AND event_time = '2026-05-01 10:30:00';
-- User QUÊN year/month/day → full table scanIceberg hidden partitioning:
-- Define partition transformation từ event_time
ALTER TABLE events
PARTITION BY DAY(event_time);
-- User chỉ filter event_time → Iceberg AUTOMATICALLY prune partitions
SELECT * FROM events
WHERE event_time = '2026-05-01 10:30:00';
-- Iceberg apply DAY() transform → prune to 1 partitionPartition transforms hỗ trợ:
year(ts),month(ts),day(ts),hour(ts)bucket(N, col)— hash bucketingtruncate(W, col)— string/numeric truncation
3.1.4 Time Travel
-- Query as of specific snapshot
SELECT * FROM events.snapshot_id_12345;
-- Query as of timestamp (most recent snapshot before)
SELECT * FROM events FOR TIMESTAMP AS OF '2026-04-01 00:00:00';
-- Query as of N versions ago
SELECT * FROM events FOR VERSION AS OF 100;Use cases:
- GDPR audit: “Show me what user X’s record was on 2025-12-15”
- Debugging: “Pipeline xuất sai output. So sánh data ngày trước vs hôm nay”
- Reproducible ML: “Train model với exact training set như tuần trước”
- Rollback: Bad ETL → revert tới snapshot cũ
3.1.5 Branching & Tagging (Iceberg 1.x)
Like Git for data tables:
-- Create branch for experiments
ALTER TABLE events CREATE BRANCH experiments;
-- Write to branch (other readers don't see)
INSERT INTO events.branch_experiments VALUES (...);
-- Tag a version
ALTER TABLE events CREATE TAG release_v1 AS OF VERSION 100;
-- Rollback main branch to a tag
ALTER TABLE events SET CURRENT SNAPSHOT TO TAG release_v1;3.2 ACID Implementation
Vấn đề: S3 không có transaction. Concurrent writers ghi cùng table → corrupt.
Iceberg solution: Optimistic Concurrency Control:
1. Reader gets current snapshot (e.g., snapshot 100)
2. Writer A starts transaction, prepares new files
3. Writer B starts transaction, prepares different new files
4. Writer A commits: atomically swap table pointer 100 → 101
5. Writer B tries commit: detect conflict (pointer is now 101, not 100)
6. Writer B retries: re-read snapshot 101, append on top, swap 101 → 102
Atomic swap: Iceberg dùng atomic CAS (compare-and-swap) trên catalog. Catalog backends:
- Hive Metastore: CAS via DB transaction
- AWS Glue: CAS via DynamoDB conditional write
- REST catalog (Polaris): HTTP commit endpoint với etag
Conflict types:
- Append-append: Compatible (both add files), rare conflict
- Append-overwrite: Conflict (overwrite changes existing files)
- Schema evolution + write: Conflict (schema mismatch)
3.3 Streaming Ingestion với Flink
Pattern: Flink consume Kafka → write to Iceberg với exactly-once.
// Flink Iceberg sink
StreamingExecutionEnvironment env = StreamingExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // Checkpoint every 60s
DataStream<Event> events = env.addSource(kafkaSource);
FlinkSink.forRowData(events.map(event -> toRowData(event)))
.table(icebergTable)
.tableSchema(schema)
.writeParallelism(4)
.upsert(true) // Use UPSERT for CDC-like workload
.equalityFieldColumns(Arrays.asList("id")) // Primary key
.append();
env.execute("Stream to Iceberg");Exactly-once với Iceberg:
- Flink checkpoint → flush write to S3 → commit Iceberg snapshot atomically
- Nếu Flink crash giữa: data files trên S3 nhưng commit chưa happen → orphan files (cleanup later)
- Recovery: Flink resume from checkpoint, redo write (idempotent vì Iceberg dedup by file hash)
Latency: Checkpoint interval = freshness. 60s checkpoint → data available trong 60s + commit time (~5s).
3.4 Multi-engine Query
Magic của open table format: nhiều engine đọc cùng table.
# Spark
spark.table("warehouse.events").filter("event_time > '2026-01-01'").show()
# Trino
trino> SELECT count(*) FROM warehouse.events WHERE event_time > DATE '2026-01-01';
# Flink SQL
SELECT count(*) FROM events WHERE event_time > '2026-01-01';
# DuckDB (local laptop)
SELECT count(*) FROM iceberg_scan('s3://bucket/warehouse/events')
WHERE event_time > '2026-01-01';
# Snowflake (Iceberg external table)
SELECT count(*) FROM iceberg_events WHERE event_time > '2026-01-01';Why this matters: Không phải migrate data khi chuyển engine. Spark cho ETL nặng, Trino cho analyst BI, Flink cho streaming, DuckDB cho dev local.
3.5 Compaction & Maintenance
Iceberg accumulate small files (mỗi micro-batch tạo 1 file). Phải compact định kỳ:
-- Spark procedure: compact small files into larger ones
CALL system.rewrite_data_files(
table => 'warehouse.events',
options => map('target-file-size-bytes', '536870912') -- 512 MB
);
-- Expire old snapshots (release storage)
CALL system.expire_snapshots(
table => 'warehouse.events',
older_than => DATE '2026-04-01'
);
-- Remove orphan files (data files not referenced)
CALL system.remove_orphan_files(
table => 'warehouse.events',
older_than => DATE '2026-04-01'
);
-- Rewrite manifests (consolidate)
CALL system.rewrite_manifests('warehouse.events');Best practice:
- Compact daily (off-peak)
- Expire snapshots > 7-30 days
- Orphan cleanup weekly
- Monitor file count: > 100K files = degradation
3.6 CDC Integration với Outbox
Linking với Tuan-Bonus-Outbox-Pattern:
Postgres (outbox table)
│ logical replication
▼
Debezium → Kafka topic "events.Order"
│
▼
Flink CDC → Iceberg Bronze layer
│ batch ETL
▼
Iceberg Silver (cleansed, deduplicated)
│
▼
Iceberg Gold (aggregated metrics, ML features)
Bronze layer schema (mirror Postgres + metadata):
CREATE TABLE bronze.orders (
-- Original columns
id STRING,
customer_id STRING,
total INT,
status STRING,
created_at TIMESTAMP,
-- CDC metadata
op CHAR(1), -- 'c'=create, 'u'=update, 'd'=delete
ts_ms BIGINT, -- Kafka timestamp
source STRUCT<...>, -- Debezium source info
txn_id STRING, -- Postgres transaction ID
lsn STRING -- WAL log sequence number
)
PARTITIONED BY (DAY(ts_ms))
TBLPROPERTIES (
'format-version' = '2',
'write.delete.mode' = 'merge-on-read'
);3.7 Feature Store Integration
Modern ML feature stores (Feast, Tecton) build trên Iceberg:
# Feast feature definition
from feast import Entity, FeatureView, Field
from feast.types import Int64, Float32
driver = Entity(name="driver_id", join_keys=["driver_id"])
driver_stats = FeatureView(
name="driver_stats",
entities=[driver],
schema=[
Field(name="trips_today", dtype=Int64),
Field(name="avg_rating", dtype=Float32),
],
source=IcebergSource(
table="warehouse.gold.driver_stats",
timestamp_field="event_timestamp",
),
ttl=timedelta(days=1),
)Lợi ích:
- Training-serving consistency: ML model training read Iceberg historical, serving read Redis (online materialization). Same source.
- Time travel for backfill: “What features did driver X have on 2026-01-15?” — Iceberg time travel
- Lineage: Track which features used in which model
Step 4 — Operations & Trade-offs
4.1 Cost Comparison
Scenario: 100 TB warm data + 10 PB cold, query 1 TB/day.
| Component | Snowflake | Lakehouse (S3 + Iceberg + Trino) |
|---|---|---|
| Storage warm 100 TB/month | 40/TB) | 23/TB S3 Standard) |
| Storage cold 10 PB/month | Not viable economically | 4/TB S3 Glacier IR) |
| Compute (queries) | $50K/month (warehouse credits) | $15K/month (EC2 spot for Trino) |
| Compute (streaming) | Snowpipe N/A streaming | $5K/month (Flink on EKS) |
| Total | $54,000+/month | $63,260/month but with 100x cold storage |
Caveat:
- Snowflake includes auto-tuning, support
- Lakehouse cần engineering team
- Break-even điểm phụ thuộc team size: small team < 10 engineers → Snowflake; large team với data eng dedicated → Lakehouse
4.2 Query Performance
| Query type | Snowflake | Lakehouse + Trino | Lakehouse + Spark |
|---|---|---|---|
| Point lookup (1 row) | 50ms | 200ms | 5s (overhead Spark startup) |
| Aggregation 1 GB scan | 1s | 2s | 8s |
| Full table scan 1 TB | 30s | 45s | 60s |
| ML training (full read) | Not optimal | OK | Best |
| Concurrent users | High (warehouse) | Medium | Low |
Trino best for: Interactive BI, ad-hoc. Spark best for: ETL, ML training. Flink best for: Streaming ingestion.
4.3 When NOT to use Lakehouse
❌ Sub-second analytics: Use ClickHouse, Apache Druid, Pinot ❌ OLTP: Use Postgres, Cassandra ❌ Small data (<1 TB): PostgreSQL with TimescaleDB sufficient ❌ Single team, no data engineer: Snowflake/BigQuery managed services ❌ Heavy mutation (>50% of writes are UPDATEs): Lakehouse OK but tuning needed (Hudi MoR)
Estimation
Storage capacity
Daily ingestion:
- 1B events/day × 5KB/event = 5 TB/day raw
- After Parquet compression (~5x): 1 TB/day
- Bronze + Silver + Gold (~3x amplification): 3 TB/day
Yearly: 365 × 3 TB = ~1.1 PB warm + cold
S3 cost (us-east-1):
- Standard 25K/month for 1.1 PB
- Glacier IR 4.4K/month for 1.1 PB
Streaming throughput
Flink Iceberg sink benchmark (4 parallelism, m5.xlarge):
- ~50K records/s sustained
- ~5MB/s output to S3 per task
- Checkpoint interval 60s → ~3M records per checkpoint
Query throughput
Trino cluster (10 worker nodes, m5.4xlarge):
- ~50 concurrent queries
- P95 latency 5-10s for 100GB scan
- Cost: 400/day
Compaction overhead
- 1 TB/day ingestion = ~10K small files/day
- Daily compaction merge to ~1K target files (1GB each)
- Compute cost: ~2 hours of m5.4xlarge × 5 nodes = $20/day
Security First
Encryption
- At rest: S3 SSE-KMS (AES-256), customer-managed keys
- In transit: TLS 1.3 for all connections (Trino, Spark, Flink)
Access Control
Multi-layer:
- AWS IAM: control bucket-level access
- Lake Formation / Glue: column-level + row-level policies
- Iceberg ACLs (via REST catalog like Polaris)
- Engine-level: Trino access controls, Spark SQL grants
-- Lake Formation column-level access
GRANT SELECT (id, total, created_at) ON warehouse.gold.orders TO 'analyst-role';
-- Note: customer_email NOT granted to analysts
-- Row-level filter
CREATE FILTER vn_only ON warehouse.gold.orders
USING country = 'VN';
GRANT SELECT WITH FILTER vn_only ON warehouse.gold.orders TO 'vn-team';PII Handling
- Bronze layer: full PII (encrypted)
- Silver layer: pseudonymized (hash email, mask card)
- Gold layer: aggregated only (no PII)
GDPR Right-to-be-forgotten
-- Iceberg V2 supports row-level DELETE
DELETE FROM warehouse.silver.users
WHERE user_id = 'gdpr-request-user-123';
-- Required: rewrite affected files via compaction
CALL system.rewrite_data_files('warehouse.silver.users');
-- Or use copy-on-write mode:
-- write.delete.mode = 'copy-on-write'Audit Trail
Iceberg snapshot history = audit log:
SELECT * FROM warehouse.events.snapshots ORDER BY committed_at DESC;
-- Shows every commit: who, when, what files, what operationDevOps
Docker Compose: Local Lakehouse Stack
version: "3.8"
services:
minio:
image: minio/minio
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: admin
MINIO_ROOT_PASSWORD: password
ports:
- "9000:9000"
- "9001:9001"
volumes:
- minio-data:/data
rest-catalog:
image: tabulario/iceberg-rest:1.5.0
environment:
AWS_ACCESS_KEY_ID: admin
AWS_SECRET_ACCESS_KEY: password
AWS_REGION: us-east-1
CATALOG_WAREHOUSE: s3://warehouse/
CATALOG_IO__IMPL: org.apache.iceberg.aws.s3.S3FileIO
CATALOG_S3_ENDPOINT: http://minio:9000
ports:
- "8181:8181"
trino:
image: trinodb/trino:435
ports:
- "8080:8080"
volumes:
- ./trino-iceberg.properties:/etc/trino/catalog/iceberg.properties
spark:
image: tabulario/spark-iceberg:3.5.0
environment:
AWS_ACCESS_KEY_ID: admin
AWS_SECRET_ACCESS_KEY: password
ports:
- "8888:8888" # Jupyter
volumes:
minio-data:# trino-iceberg.properties
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://rest-catalog:8181
iceberg.rest-catalog.warehouse=s3://warehouse/
fs.native-s3.enabled=true
s3.endpoint=http://minio:9000
s3.region=us-east-1
s3.path-style-access=true
s3.aws-access-key=admin
s3.aws-secret-key=passwordMonitoring
Key metrics:
- Iceberg table size (bytes, file count)
- Snapshot age (oldest unexpired)
- Streaming lag (Flink checkpoint to Iceberg commit)
- Compaction backlog (files > target size)
- Query P95 latency
Prometheus alerts:
- alert: IcebergCompactionBacklog
expr: iceberg_small_files_count > 100000
for: 30m
annotations:
summary: "{{ $labels.table }} has {{ $value }} small files"
- alert: FlinkCheckpointFailing
expr: rate(flink_jobmanager_checkpoint_failed[5m]) > 0
for: 5m
annotations:
summary: "Flink checkpoint failing for {{ $labels.job }}"
- alert: IcebergSnapshotsTooMany
expr: iceberg_snapshot_count > 1000
annotations:
summary: "Need to expire snapshots"Disaster Recovery
RPO/RTO:
- S3 cross-region replication (RPO ~15 min)
- Catalog backup hourly (Glue → S3 backup bucket)
- RTO: ~30 min for catalog restore + cluster spin-up
Time travel as backup:
-- Bad ETL deployed at 10:00, found at 10:30
-- Restore table to 09:55 snapshot
CALL system.rollback_to_timestamp('events', TIMESTAMP '2026-05-01 09:55:00');Code Implementation
Spark + Iceberg Read/Write
"""
Spark + Iceberg quickstart
Run: docker exec -it spark spark-shell
"""
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from datetime import datetime
spark = SparkSession.builder \
.appName("LakehouseDemo") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.lake", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.lake.type", "rest") \
.config("spark.sql.catalog.lake.uri", "http://rest-catalog:8181") \
.getOrCreate()
# Create table
spark.sql("""
CREATE TABLE IF NOT EXISTS lake.demo.orders (
id STRING,
customer_id STRING,
total INT,
status STRING,
created_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (days(created_at))
TBLPROPERTIES (
'format-version' = '2',
'write.delete.mode' = 'merge-on-read'
)
""")
# Insert
spark.sql("""
INSERT INTO lake.demo.orders VALUES
('ord-1', 'cust-1', 500000, 'completed', TIMESTAMP '2026-05-01 10:00:00'),
('ord-2', 'cust-2', 300000, 'pending', TIMESTAMP '2026-05-01 10:05:00')
""")
# Query
spark.sql("SELECT * FROM lake.demo.orders").show()
# Update (merge-on-read)
spark.sql("""
UPDATE lake.demo.orders
SET status = 'completed'
WHERE id = 'ord-2'
""")
# Time travel
snapshot_id = spark.sql("SELECT snapshot_id FROM lake.demo.orders.snapshots ORDER BY committed_at LIMIT 1").first()[0]
spark.sql(f"SELECT * FROM lake.demo.orders VERSION AS OF {snapshot_id}").show()
# Schema evolution
spark.sql("ALTER TABLE lake.demo.orders ADD COLUMN payment_method STRING")
# Compaction
spark.sql("""
CALL lake.system.rewrite_data_files(
table => 'demo.orders',
options => map('target-file-size-bytes', '536870912')
)
""")Flink Streaming Insert
// Flink Iceberg streaming sink
public class StreamToIceberg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000);
env.getCheckpointConfig().setCheckpointStorage("s3://bucket/checkpoints");
// Kafka source
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("events.Order")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(source,
WatermarkStrategy.noWatermarks(), "Kafka");
// Parse JSON to Iceberg row
DataStream<RowData> rows = stream.map(json -> parseToRow(json));
// Iceberg sink
TableLoader tableLoader = TableLoader.fromCatalog(
CatalogLoader.rest("rest", new Configuration(), Map.of(
"uri", "http://rest-catalog:8181",
"warehouse", "s3://warehouse/"
)),
TableIdentifier.of("demo", "orders_streaming")
);
FlinkSink.forRowData(rows)
.tableLoader(tableLoader)
.upsert(true)
.equalityFieldColumns(Arrays.asList("id"))
.writeParallelism(4)
.append();
env.execute("Stream Orders to Iceberg");
}
}Trino Query
-- Trino query Iceberg
USE iceberg.demo;
-- Show tables
SHOW TABLES;
-- Aggregation
SELECT
DATE_TRUNC('day', created_at) AS day,
COUNT(*) AS order_count,
SUM(total) AS total_revenue
FROM orders
WHERE created_at > DATE '2026-01-01'
GROUP BY 1
ORDER BY 1 DESC;
-- Time travel
SELECT * FROM orders FOR TIMESTAMP AS OF TIMESTAMP '2026-04-01 00:00:00';
-- Snapshot history
SELECT * FROM "orders$snapshots" ORDER BY committed_at DESC LIMIT 10;
-- File statistics
SELECT
file_path,
file_size_in_bytes,
record_count
FROM "orders$files"
ORDER BY file_size_in_bytes DESC
LIMIT 20;System Design Diagrams
Bronze-Silver-Gold Medallion
flowchart LR subgraph Bronze["Bronze Layer"] B1["raw_orders<br/>append-only<br/>full CDC payload"] B2["raw_clickstream<br/>append-only<br/>schema-less"] end subgraph Silver["Silver Layer"] S1["orders<br/>deduplicated<br/>typed schema<br/>validated"] S2["sessions<br/>sessionized<br/>enriched with user"] end subgraph Gold["Gold Layer"] G1["daily_revenue<br/>aggregated by day"] G2["user_features<br/>ML training set"] G3["funnel_metrics<br/>BI dashboards"] end B1 -->|"clean + dedupe"| S1 B2 -->|"sessionize"| S2 S1 -->|"daily rollup"| G1 S1 -->|"feature engineering"| G2 S1 --> G3 S2 --> G2 S2 --> G3 style Bronze fill:#cd7f32,color:#fff style Silver fill:#c0c0c0,color:#000 style Gold fill:#ffd700,color:#000
Iceberg Snapshot Tree
flowchart TD Meta[("Table metadata<br/>v3.metadata.json")] Meta --> Snap1[("Snapshot 100<br/>commit: 2026-01-01")] Meta --> Snap2[("Snapshot 101<br/>commit: 2026-04-01")] Meta --> Snap3[("Snapshot 102<br/>commit: 2026-05-01<br/>**current**")] Snap1 --> M1[("Manifest list<br/>snap-100.avro")] Snap2 --> M2[("Manifest list<br/>snap-101.avro")] Snap3 --> M3[("Manifest list<br/>snap-102.avro")] M3 --> MF1[("Manifest file<br/>file-1-m0.avro")] M3 --> MF2[("Manifest file<br/>file-2-m1.avro")] MF1 --> D1[("data/2026-05-01/<br/>file-001.parquet")] MF1 --> D2[("data/2026-05-01/<br/>file-002.parquet")] MF2 --> D3[("data/2026-05-02/<br/>file-003.parquet")] style Snap3 fill:#4caf50,color:#fff
Streaming + Batch Unified
sequenceDiagram participant K as Kafka participant F as Flink participant Ice as Iceberg participant S as Spark Batch participant T as Trino Note over F: Streaming ingest K->>F: Event 1 K->>F: Event 2 K->>F: Event 3 Note over F: Checkpoint at 60s F->>Ice: Write data files F->>Ice: Commit snapshot N Note over S: Hourly batch ETL (read snapshot N) S->>Ice: SELECT * FROM bronze<br/>(snapshot N) S->>Ice: WRITE TO silver<br/>(commit snapshot N+1) Note over T: Ad-hoc query (read latest snapshot N+1) T->>Ice: SELECT * FROM silver Ice-->>T: Data from snapshot N+1
Aha Moments & Pitfalls
Aha Moments
#1: Iceberg/Delta/Hudi không phải database, là metadata layer trên S3. Data files vẫn là Parquet thường. Magic là Iceberg track ACID + snapshot + statistics qua manifest files. Vì vậy có thể chuyển từ Iceberg sang Delta (chỉ rebuild metadata, không di chuyển data).
#2: Open table format = end of vendor lock-in. Cùng dataset query bằng Spark, Flink, Trino, DuckDB, Snowflake (Iceberg external table) — không cần ETL chuyển đổi.
#3: Schema evolution miễn phí. Iceberg track column bằng field ID (integer), không phải tên. Rename column = update mapping, không rewrite TB data.
#4: Hidden partitioning > Hive partitioning. User filter
event_time = '2026-05-01'→ Iceberg auto-prune partitions, không cần biết schema phân vùng.
#5: Time travel là free disaster recovery. Bad ETL deploy → rollback table về snapshot 1 giờ trước, 0 data loss.
#6: Lambda architecture là quá khứ. Kappa với Flink + Iceberg cho cùng kết quả với 1/2 codebase và operational overhead.
#7: Bronze-Silver-Gold pattern là đơn giản hoá lớn. Mỗi layer rõ ràng quality tier, dễ debug, dễ rollback.
#8: Lakehouse cost-effective nhất ở scale > 100 TB. Dưới đó, Snowflake/BigQuery managed có thể rẻ hơn (vì không cần data engineer).
Pitfalls
Pitfall 1: Quên compact small files
Sai: Streaming ingest mỗi 60s → tạo 1 file/checkpoint → 1440 files/day → 525K files/year → query slow. Đúng: Daily compaction job. Target file size 256MB-1GB.
Pitfall 2: Không expire snapshots
Sai: Mỗi commit tạo 1 snapshot. Sau 1 năm, có 100K+ snapshots → metadata file huge → query chậm. Đúng: Expire snapshots > 7-30 ngày.
Pitfall 3: Partition by user_id (high cardinality)
Sai: 100M users → 100M partitions → metadata explosion. Đúng: Partition by date hoặc bucket(N, user_id) với N hợp lý (e.g., 1024).
Pitfall 4: Đặt Postgres + Lakehouse vào cùng SLA
Sai: Tin Lakehouse có thể replace OLTP. Iceberg latency tối thiểu vài giây. Đúng: OLTP → Postgres/Cassandra. Lakehouse cho analytics + ML, không phải primary store.
Pitfall 5: GDPR delete không rewrite files
Sai:
DELETEở merge-on-read mode chỉ tạo delete marker → file gốc vẫn chứa PII. Đúng: Định kỳ chạyrewrite_data_files()để materialize delete + chứng minh PII đã xoá.
Pitfall 6: Multi-engine không test compatibility
Sai: Spark write + Trino read → version mismatch → lỗi. Đúng: Test integration matrix. Iceberg spec versioning (v1 vs v2). Stick to recent stable versions.
Pitfall 7: Không monitor catalog health
Sai: Glue catalog quota = 100K tables. Cluster scale lên 200K → lỗi mỗi commit. Đúng: Monitor catalog metrics. Plan với REST catalog (Polaris) cho scale lớn.
Pitfall 8: Streaming + batch conflict
Sai: Flink streaming write + Spark batch write cùng table → conflict mỗi commit. Đúng: Tách table (Flink → bronze_streaming, Spark → bronze_batch). Hoặc dùng Hudi MoR cho UPSERT-heavy.
Pitfall 9: Underestimate metadata cost
Sai: 1B small files / 100K manifests → manifest list 100MB → mỗi query đọc 100MB metadata. Đúng: Compact + manifest rewrite định kỳ. Target < 10K files per partition.
Pitfall 10: Không quản lý cost S3
Sai: Glacier IR cho cold + S3 Standard cho warm, nhưng quên lifecycle policy. Đúng: S3 Intelligent-Tiering cho auto-tier. Hoặc explicit lifecycle: warm → IA after 30 days → Glacier after 90 days.
Internal Links
| Topic | Liên hệ |
|---|---|
| Tuan-08-Message-Queue | Kafka là source cho streaming ingestion |
| Tuan-Bonus-Outbox-Pattern | Outbox + Debezium → Iceberg Bronze layer |
| Tuan-Bonus-Consistency-Models-Isolation | Iceberg dùng snapshot isolation; ACID trên S3 |
| Case-Design-Ad-Click-Event-Aggregation | Kappa architecture với Flink → có thể migrate to Iceberg sink |
| Case-Design-Metrics-Monitoring-Alerting | Time-series specialized vs Lakehouse |
| Tuan-13-Monitoring-Observability | Monitor lakehouse: file count, snapshot age, compaction backlog |
| Tuan-15-Data-Security-Encryption | S3 SSE-KMS, column-level encryption |
Tham khảo
Books:
- Streaming Systems (Akidau, Chernyak, Lax — O’Reilly 2018) — https://learning.oreilly.com/library/view/streaming-systems/9781491983867/
- DDIA Ch.10 (Batch) + Ch.11 (Stream) — Kleppmann
Papers:
- Lakehouse: A New Generation of Open Platforms (Databricks 2021) — http://www.cidrdb.org/cidr2021/papers/cidr2021_paper17.pdf
- Apache Iceberg: An Architectural Look Under the Covers — https://www.dremio.com/resources/guides/apache-iceberg-an-architectural-look-under-the-covers/
- The Dataflow Model (Google 2015) — https://research.google/pubs/the-dataflow-model-a-practical-approach-to-balancing-correctness-latency-and-cost-in-massive-scale-unbounded-out-of-order-data-processing/
Engineering blogs:
- Netflix: Iceberg at Netflix — https://netflixtechblog.com/iceberg-at-netflix-c6ddb3e3d68e
- Apple: Apache Iceberg adoption — https://iceberg.apache.org/blog/
- Apache: Hudi vs Iceberg vs Delta — https://www.onehouse.ai/blog/apache-hudi-vs-delta-lake-vs-apache-iceberg-lakehouse-feature-comparison
- Jay Kreps: Questioning the Lambda Architecture — https://www.oreilly.com/radar/questioning-the-lambda-architecture/
Docs:
- Apache Iceberg — https://iceberg.apache.org/docs/
- Delta Lake — https://docs.delta.io/
- Apache Hudi — https://hudi.apache.org/docs/
- Apache Polaris (Iceberg REST catalog) — https://github.com/apache/polaris
- AWS Glue Iceberg integration — https://docs.aws.amazon.com/glue/latest/dg/iceberg-tables.html
Tools:
- Trino — https://trino.io/
- Apache Flink — https://flink.apache.org/
- Apache Spark — https://spark.apache.org/
- DuckDB Iceberg extension — https://duckdb.org/docs/extensions/iceberg.html
- Feast Feature Store — https://feast.dev/
Tham chiếu trong Roadmap: bonus case study sau khi học xong Phase 4 và 18 case studies Alex Xu Vol 1+2.