Case Study 03 — Data Layer cho Realtime Analytics
“Design analytics platform với 100K events/sec ingest, sub-second dashboard query, 5-year retention. Kafka + ClickHouse + Postgres.”
Tags: case-study analytics clickhouse kafka realtime Liên quan: Tuan-14-OLAP-Columnar-ClickHouse · Tuan-Bonus-CDC-Debezium
1. Requirements
Functional
- Ingest events from web/mobile apps
- Per-event: user_id, session_id, event_type, properties (jsonb), timestamp
- Real-time dashboard: metrics last 5 min, 1 hour, 1 day
- Custom queries (analyst SQL)
- Funnels, cohorts, retention
- Export to CSV/Parquet
Non-functional
- 100K events/sec sustained, 500K peak
- Dashboard P99 < 1s for last hour
- Custom query P99 < 30s for last month
- 5-year retention
- Cost-effective at PB scale
2. Architecture
graph LR Web[Web/Mobile clients] --> Collector[Event collector<br/>HTTP edge] Collector --> Kafka[(Kafka<br/>topic per event_type)] Kafka --> Enricher[Stream enricher<br/>Flink] Enricher --> CH[(ClickHouse<br/>hot data)] Kafka -.archive.-> S3[(S3 Iceberg<br/>cold)] CH -.materialized.-> CHHourly[(Hourly aggs)] CHHourly -.materialized.-> CHDaily[(Daily aggs)] BI[Grafana/Metabase] --> CH BI --> CHHourly BI --> CHDaily Lake[(S3 Iceberg<br/>cold storage 5y)] -.federated.-> Trino[Trino] --> BI PG[(Postgres<br/>user/session metadata)] -.dictionary.-> CH style Kafka fill:#fff9c4 style CH fill:#c8e6c9
2.1 Why this combo
- Kafka: durable ingest buffer, decouples producer from consumer
- Flink (or kafka-clickhouse direct): enrichment, dedup, late-data handling
- ClickHouse: hot data, sub-second dashboards
- Iceberg/S3: cold cheap storage, query via Trino for old data
- Postgres: dimension data (users, sessions, products)
3. Event Schema
{
"event_id": "uuid-v7",
"event_type": "page_view",
"user_id": 42,
"session_id": "...",
"occurred_at": "2026-05-16T10:00:00.123Z",
"received_at": "2026-05-16T10:00:00.456Z",
"properties": {
"url": "/products/iphone",
"referrer": "/search",
"user_agent": "...",
"country": "VN",
"city": "HCMC"
},
"context": {
"app_version": "1.2.3",
"ip_hash": "..."
}
}Key fields top-level (filterable indexed), the rest in properties JSON.
4. Ingestion
4.1 Collector
Lightweight HTTP receiver:
@app.post("/v1/events")
async def ingest(events: list[Event]):
for event in events:
event.event_id = str(uuid7())
event.received_at = datetime.now(timezone.utc)
await kafka_producer.send("events_raw", event.json())
return {"accepted": len(events)}Kafka producer with batching:
linger.ms=10— batch up to 10mscompression.type=zstdacks=1(leader ack only — speed)
100K req/s easily handled by 10-20 collector replicas.
4.2 Topics structure
events_raw (all events, partitioned by user_id hash)
events_page_view (filtered)
events_purchase
events_signup
...
events_dlq (failed messages)
Sharded by event type for consumer parallelism.
4.3 Schema evolution
Schema Registry (Confluent or Apicurio). Avro/Protobuf format. Backward-compatible changes only.
message Event {
string event_id = 1;
string event_type = 2;
int64 user_id = 3;
google.protobuf.Timestamp occurred_at = 4;
map<string, Value> properties = 5; // flexible
}5. Stream Processing (Flink)
5.1 Enrichment
DataStream<Event> events = env.fromSource(kafka, ...);
DataStream<EnrichedEvent> enriched = events
.keyBy(Event::userId)
.process(new EnrichmentFunction());
class EnrichmentFunction extends KeyedProcessFunction<...> {
private ValueState<User> userState;
public void processElement(Event e, ...) {
User user = userState.value();
if (user == null) {
user = lookupUser(e.userId()); // Postgres / cache
userState.update(user);
}
out.collect(new EnrichedEvent(e, user.country(), user.tier()));
}
}5.2 Dedup
UUID v7 event_id. Bloom filter in Flink state for last 1h window. Drop if seen.
5.3 Late data
Watermark: event.occurred_at - 30s. Events older than watermark = late.
Drop or send to side output for separate processing.
5.4 Output
Sink: ClickHouse via JDBC or HTTP.
enriched.addSink(new ClickHouseSink(...));Batch every 1000 events or 1s.
6. ClickHouse Schema
6.1 Raw events
CREATE TABLE events (
event_id UUID,
event_type LowCardinality(String),
occurred_at DateTime64(3),
received_at DateTime64(3),
user_id UInt64,
session_id String,
user_country LowCardinality(String),
user_tier LowCardinality(String),
properties String, -- JSON
INDEX bloom_user user_id TYPE bloom_filter GRANULARITY 4
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(occurred_at)
ORDER BY (event_type, occurred_at, user_id)
TTL occurred_at + INTERVAL 90 DAY;- Partition by month → easy drop old data
- ORDER BY (event_type, ts, user_id) → filter by type then time
- TTL 90 days for raw events (older → cold storage)
- Bloom filter for user_id quick lookups
6.2 Hourly aggregates
CREATE MATERIALIZED VIEW events_hourly
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (hour, event_type, user_country)
AS
SELECT
toStartOfHour(occurred_at) AS hour,
event_type,
user_country,
count() AS event_count,
uniqState(user_id) AS unique_users,
uniqState(session_id) AS unique_sessions
FROM events
GROUP BY hour, event_type, user_country;Dashboard query:
SELECT
event_type,
sum(event_count) AS total,
uniqMerge(unique_users) AS unique_users
FROM events_hourly
WHERE hour > now() - INTERVAL 24 HOUR
GROUP BY event_type
ORDER BY total DESC;Sub-second on aggregated table.
6.3 Daily aggregates
Roll up from hourly to daily:
CREATE MATERIALIZED VIEW events_daily
ENGINE = SummingMergeTree() ORDER BY (day, event_type) AS
SELECT toDate(hour) AS day, event_type, sum(event_count) AS total
FROM events_hourly GROUP BY day, event_type;5 years × 365 days × ~100 event types = ~180K rows. Fits in RAM.
6.4 User dictionary
CREATE DICTIONARY users_dict (
id UInt64,
country String,
tier String,
signup_date Date
)
PRIMARY KEY id
SOURCE(POSTGRESQL(host 'pg' port 5432 user 'reader' password 'x' db 'app' table 'users'))
LAYOUT(HASHED())
LIFETIME(MIN 60 MAX 120);Fast joins via dictionary lookup:
SELECT
dictGet('users_dict', 'country', user_id) AS country,
count() FROM events GROUP BY country;No JOIN — in-memory hash lookup.
7. Cold Storage (Iceberg)
After 90 days, move from ClickHouse to Iceberg.
# Daily job
async def archive():
cutoff = today - timedelta(days=90)
# Export from CH
duckdb.execute("""
COPY (SELECT * FROM clickhouse('events', 'occurred_at < $1', cutoff))
TO 's3://lakehouse/events/' (FORMAT PARQUET, PARTITION_BY (toYYYYMM(occurred_at)))
""")
# ...register with Iceberg catalog
# Drop partition in CH
clickhouse.execute(f"ALTER TABLE events DROP PARTITION {month}")Now data in Iceberg, queryable via Trino, DuckDB, BigQuery.
5-year retention:
- Hot in ClickHouse: 90 days
- Cold in Iceberg/S3: 5 years
- 100K events/s × 90 days = 800GB hot (with compression ~80GB)
- × 5 years × 0.05 (compression) = 80TB cold (~$2K/year storage)
8. Query Patterns
8.1 Real-time dashboard (last hour)
-- From hourly aggregates
SELECT event_type, sum(event_count) FROM events_hourly
WHERE hour > now() - 1 HOUR GROUP BY event_type;< 100ms.
8.2 Funnel analysis
Last 7 days, percentage of users who completed: signup → view_product → purchase.
WITH funnel AS (
SELECT
user_id,
max(if(event_type='signup' AND occurred_at >= now() - 7 DAY, 1, 0)) AS s1,
max(if(event_type='view_product' AND occurred_at >= now() - 7 DAY, 1, 0)) AS s2,
max(if(event_type='purchase' AND occurred_at >= now() - 7 DAY, 1, 0)) AS s3
FROM events
WHERE occurred_at >= now() - 7 DAY
GROUP BY user_id
)
SELECT sum(s1), sum(s2), sum(s3),
round(100.0 * sum(s2) / nullif(sum(s1), 0), 1) AS s1_to_s2_pct,
round(100.0 * sum(s3) / nullif(sum(s2), 0), 1) AS s2_to_s3_pct
FROM funnel;Few seconds. Materialize as scheduled view if frequent.
8.3 Retention cohort
Users who signed up week N, how many active in week N+k?
WITH cohorts AS (
SELECT user_id, toMonday(occurred_at) AS signup_week
FROM events WHERE event_type = 'signup'
),
active AS (
SELECT user_id, toMonday(occurred_at) AS active_week
FROM events
WHERE event_type IN ('login', 'view_product') AND occurred_at > now() - 90 DAY
GROUP BY user_id, active_week
)
SELECT
c.signup_week,
a.active_week,
dateDiff('week', c.signup_week, a.active_week) AS weeks_since_signup,
count(DISTINCT c.user_id) AS active_users
FROM cohorts c JOIN active a ON c.user_id = a.user_id AND a.active_week >= c.signup_week
GROUP BY c.signup_week, a.active_week
ORDER BY c.signup_week, weeks_since_signup;Run nightly, cache result.
8.4 Custom analyst query (cold data)
-- Via Trino on Iceberg
SELECT count(*) FROM iceberg.events.events
WHERE occurred_at BETWEEN DATE '2024-01-01' AND DATE '2024-01-31'
AND properties['country'] = 'VN';Slower (10-60s) but acceptable for analyst.
9. Cardinality Management
Cardinality issue: too many unique values explode storage/memory.
Watch:
user_id(high cardinality OK with bloom filter)url(path) — millions unique- IP addresses — billions
Strategies:
- Hash high-cardinality strings
- Group rare values into “other”
- Use HyperLogLog for distinct counts (approximate)
-- Count distinct URLs (exact, expensive)
SELECT count(DISTINCT properties['url']) FROM events;
-- HLL approximation (fast)
SELECT uniqHLL12(properties['url']) FROM events;10. Cost Analysis
10.1 Compute
- Kafka cluster: 6 brokers × c6i.2xlarge = ~$2000/month
- Flink: 3 nodes × c6i.4xlarge = ~$1500/month
- ClickHouse: 3 nodes × r6i.4xlarge = ~$2500/month
- Postgres: 1 master + 2 replica r6i.large = ~$700/month
- Collector: 10 × m6i.large = ~$600/month
- Total compute: ~$7300/month
10.2 Storage
- ClickHouse disk: 3 × 2TB EBS = ~$500/month
- S3 (Iceberg cold): 80TB → ~$2000/month (Standard-IA)
- Kafka retention 7 days: 2TB × 200/month
- Total storage: ~$2700/month
10.3 Total
~$10K/month for 100K events/s system. Optimize over time.
Comparison: Segment + Snowflake equivalent: $50-100K/month at this scale.
11. Operational Concerns
11.1 Kafka lag
kafka-consumer-groups --describe --group flink-enricher
# LAG > 10K → scale FlinkAlert if consumer lag > 1 minute of events.
11.2 ClickHouse parts count
Too many small parts → slow query. Monitor:
SELECT count() FROM system.parts WHERE database='default' AND active=1;If > 1000 per table → OPTIMIZE TABLE.
11.3 Late data handling
Events arriving with occurred_at way in past:
- < 1h late: insert normally
- 1-24h late: insert but trigger re-aggregation of affected hours
-
24h late: drop or side-output
11.4 Schema changes
New event type: just add to topic. Auto-discovered.
New property: update schema, but ClickHouse properties is JSON string → handles any.
12. Migration / Rebuild
ClickHouse table rebuild (e.g., new ORDER BY):
- Create new table
- Backfill via INSERT … SELECT
- Switch app to write new table
- Drop old
Can take hours for huge table. Plan windows.
13. Privacy
- IP hashed (not stored raw)
- PII in
propertiesflagged, encrypted column-level if needed - Per-user deletion (GDPR): mark + scheduled cleanup
- Anonymization for cold storage
14. Tiếp theo
Cập nhật: 2026-05-16