Case Study 03 — Data Layer cho Realtime Analytics

“Design analytics platform với 100K events/sec ingest, sub-second dashboard query, 5-year retention. Kafka + ClickHouse + Postgres.”

Tags: case-study analytics clickhouse kafka realtime Liên quan: Tuan-14-OLAP-Columnar-ClickHouse · Tuan-Bonus-CDC-Debezium


1. Requirements

Functional

  • Ingest events from web/mobile apps
  • Per-event: user_id, session_id, event_type, properties (jsonb), timestamp
  • Real-time dashboard: metrics last 5 min, 1 hour, 1 day
  • Custom queries (analyst SQL)
  • Funnels, cohorts, retention
  • Export to CSV/Parquet

Non-functional

  • 100K events/sec sustained, 500K peak
  • Dashboard P99 < 1s for last hour
  • Custom query P99 < 30s for last month
  • 5-year retention
  • Cost-effective at PB scale

2. Architecture

graph LR
    Web[Web/Mobile clients] --> Collector[Event collector<br/>HTTP edge]
    Collector --> Kafka[(Kafka<br/>topic per event_type)]
    Kafka --> Enricher[Stream enricher<br/>Flink]
    Enricher --> CH[(ClickHouse<br/>hot data)]
    Kafka -.archive.-> S3[(S3 Iceberg<br/>cold)]

    CH -.materialized.-> CHHourly[(Hourly aggs)]
    CHHourly -.materialized.-> CHDaily[(Daily aggs)]

    BI[Grafana/Metabase] --> CH
    BI --> CHHourly
    BI --> CHDaily

    Lake[(S3 Iceberg<br/>cold storage 5y)] -.federated.-> Trino[Trino] --> BI

    PG[(Postgres<br/>user/session metadata)] -.dictionary.-> CH

    style Kafka fill:#fff9c4
    style CH fill:#c8e6c9

2.1 Why this combo

  • Kafka: durable ingest buffer, decouples producer from consumer
  • Flink (or kafka-clickhouse direct): enrichment, dedup, late-data handling
  • ClickHouse: hot data, sub-second dashboards
  • Iceberg/S3: cold cheap storage, query via Trino for old data
  • Postgres: dimension data (users, sessions, products)

3. Event Schema

{
    "event_id": "uuid-v7",
    "event_type": "page_view",
    "user_id": 42,
    "session_id": "...",
    "occurred_at": "2026-05-16T10:00:00.123Z",
    "received_at": "2026-05-16T10:00:00.456Z",
    "properties": {
        "url": "/products/iphone",
        "referrer": "/search",
        "user_agent": "...",
        "country": "VN",
        "city": "HCMC"
    },
    "context": {
        "app_version": "1.2.3",
        "ip_hash": "..."
    }
}

Key fields top-level (filterable indexed), the rest in properties JSON.


4. Ingestion

4.1 Collector

Lightweight HTTP receiver:

@app.post("/v1/events")
async def ingest(events: list[Event]):
    for event in events:
        event.event_id = str(uuid7())
        event.received_at = datetime.now(timezone.utc)
        await kafka_producer.send("events_raw", event.json())
    return {"accepted": len(events)}

Kafka producer with batching:

  • linger.ms=10 — batch up to 10ms
  • compression.type=zstd
  • acks=1 (leader ack only — speed)

100K req/s easily handled by 10-20 collector replicas.

4.2 Topics structure

events_raw                    (all events, partitioned by user_id hash)
events_page_view              (filtered)
events_purchase
events_signup
...
events_dlq                    (failed messages)

Sharded by event type for consumer parallelism.

4.3 Schema evolution

Schema Registry (Confluent or Apicurio). Avro/Protobuf format. Backward-compatible changes only.

message Event {
    string event_id = 1;
    string event_type = 2;
    int64 user_id = 3;
    google.protobuf.Timestamp occurred_at = 4;
    map<string, Value> properties = 5;  // flexible
}

5.1 Enrichment

DataStream<Event> events = env.fromSource(kafka, ...);
DataStream<EnrichedEvent> enriched = events
    .keyBy(Event::userId)
    .process(new EnrichmentFunction());
 
class EnrichmentFunction extends KeyedProcessFunction<...> {
    private ValueState<User> userState;
 
    public void processElement(Event e, ...) {
        User user = userState.value();
        if (user == null) {
            user = lookupUser(e.userId());  // Postgres / cache
            userState.update(user);
        }
        out.collect(new EnrichedEvent(e, user.country(), user.tier()));
    }
}

5.2 Dedup

UUID v7 event_id. Bloom filter in Flink state for last 1h window. Drop if seen.

5.3 Late data

Watermark: event.occurred_at - 30s. Events older than watermark = late.

Drop or send to side output for separate processing.

5.4 Output

Sink: ClickHouse via JDBC or HTTP.

enriched.addSink(new ClickHouseSink(...));

Batch every 1000 events or 1s.


6. ClickHouse Schema

6.1 Raw events

CREATE TABLE events (
    event_id UUID,
    event_type LowCardinality(String),
    occurred_at DateTime64(3),
    received_at DateTime64(3),
    user_id UInt64,
    session_id String,
    user_country LowCardinality(String),
    user_tier LowCardinality(String),
    properties String,  -- JSON
    INDEX bloom_user user_id TYPE bloom_filter GRANULARITY 4
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(occurred_at)
ORDER BY (event_type, occurred_at, user_id)
TTL occurred_at + INTERVAL 90 DAY;
  • Partition by month → easy drop old data
  • ORDER BY (event_type, ts, user_id) → filter by type then time
  • TTL 90 days for raw events (older → cold storage)
  • Bloom filter for user_id quick lookups

6.2 Hourly aggregates

CREATE MATERIALIZED VIEW events_hourly
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (hour, event_type, user_country)
AS
SELECT
    toStartOfHour(occurred_at) AS hour,
    event_type,
    user_country,
    count() AS event_count,
    uniqState(user_id) AS unique_users,
    uniqState(session_id) AS unique_sessions
FROM events
GROUP BY hour, event_type, user_country;

Dashboard query:

SELECT
    event_type,
    sum(event_count) AS total,
    uniqMerge(unique_users) AS unique_users
FROM events_hourly
WHERE hour > now() - INTERVAL 24 HOUR
GROUP BY event_type
ORDER BY total DESC;

Sub-second on aggregated table.

6.3 Daily aggregates

Roll up from hourly to daily:

CREATE MATERIALIZED VIEW events_daily
ENGINE = SummingMergeTree() ORDER BY (day, event_type) AS
SELECT toDate(hour) AS day, event_type, sum(event_count) AS total
FROM events_hourly GROUP BY day, event_type;

5 years × 365 days × ~100 event types = ~180K rows. Fits in RAM.

6.4 User dictionary

CREATE DICTIONARY users_dict (
    id UInt64,
    country String,
    tier String,
    signup_date Date
)
PRIMARY KEY id
SOURCE(POSTGRESQL(host 'pg' port 5432 user 'reader' password 'x' db 'app' table 'users'))
LAYOUT(HASHED())
LIFETIME(MIN 60 MAX 120);

Fast joins via dictionary lookup:

SELECT
    dictGet('users_dict', 'country', user_id) AS country,
    count() FROM events GROUP BY country;

No JOIN — in-memory hash lookup.


7. Cold Storage (Iceberg)

After 90 days, move from ClickHouse to Iceberg.

# Daily job
async def archive():
    cutoff = today - timedelta(days=90)
    # Export from CH
    duckdb.execute("""
        COPY (SELECT * FROM clickhouse('events', 'occurred_at < $1', cutoff))
        TO 's3://lakehouse/events/' (FORMAT PARQUET, PARTITION_BY (toYYYYMM(occurred_at)))
    """)
    # ...register with Iceberg catalog
    # Drop partition in CH
    clickhouse.execute(f"ALTER TABLE events DROP PARTITION {month}")

Now data in Iceberg, queryable via Trino, DuckDB, BigQuery.

5-year retention:

  • Hot in ClickHouse: 90 days
  • Cold in Iceberg/S3: 5 years
  • 100K events/s × 90 days = 800GB hot (with compression ~80GB)
  • × 5 years × 0.05 (compression) = 80TB cold (~$2K/year storage)

8. Query Patterns

8.1 Real-time dashboard (last hour)

-- From hourly aggregates
SELECT event_type, sum(event_count) FROM events_hourly
WHERE hour > now() - 1 HOUR GROUP BY event_type;

< 100ms.

8.2 Funnel analysis

Last 7 days, percentage of users who completed: signup → view_product → purchase.

WITH funnel AS (
    SELECT
        user_id,
        max(if(event_type='signup' AND occurred_at >= now() - 7 DAY, 1, 0)) AS s1,
        max(if(event_type='view_product' AND occurred_at >= now() - 7 DAY, 1, 0)) AS s2,
        max(if(event_type='purchase' AND occurred_at >= now() - 7 DAY, 1, 0)) AS s3
    FROM events
    WHERE occurred_at >= now() - 7 DAY
    GROUP BY user_id
)
SELECT sum(s1), sum(s2), sum(s3),
    round(100.0 * sum(s2) / nullif(sum(s1), 0), 1) AS s1_to_s2_pct,
    round(100.0 * sum(s3) / nullif(sum(s2), 0), 1) AS s2_to_s3_pct
FROM funnel;

Few seconds. Materialize as scheduled view if frequent.

8.3 Retention cohort

Users who signed up week N, how many active in week N+k?

WITH cohorts AS (
    SELECT user_id, toMonday(occurred_at) AS signup_week
    FROM events WHERE event_type = 'signup'
),
active AS (
    SELECT user_id, toMonday(occurred_at) AS active_week
    FROM events
    WHERE event_type IN ('login', 'view_product') AND occurred_at > now() - 90 DAY
    GROUP BY user_id, active_week
)
SELECT
    c.signup_week,
    a.active_week,
    dateDiff('week', c.signup_week, a.active_week) AS weeks_since_signup,
    count(DISTINCT c.user_id) AS active_users
FROM cohorts c JOIN active a ON c.user_id = a.user_id AND a.active_week >= c.signup_week
GROUP BY c.signup_week, a.active_week
ORDER BY c.signup_week, weeks_since_signup;

Run nightly, cache result.

8.4 Custom analyst query (cold data)

-- Via Trino on Iceberg
SELECT count(*) FROM iceberg.events.events
WHERE occurred_at BETWEEN DATE '2024-01-01' AND DATE '2024-01-31'
AND properties['country'] = 'VN';

Slower (10-60s) but acceptable for analyst.


9. Cardinality Management

Cardinality issue: too many unique values explode storage/memory.

Watch:

  • user_id (high cardinality OK with bloom filter)
  • url (path) — millions unique
  • IP addresses — billions

Strategies:

  • Hash high-cardinality strings
  • Group rare values into “other”
  • Use HyperLogLog for distinct counts (approximate)
-- Count distinct URLs (exact, expensive)
SELECT count(DISTINCT properties['url']) FROM events;
 
-- HLL approximation (fast)
SELECT uniqHLL12(properties['url']) FROM events;

10. Cost Analysis

10.1 Compute

  • Kafka cluster: 6 brokers × c6i.2xlarge = ~$2000/month
  • Flink: 3 nodes × c6i.4xlarge = ~$1500/month
  • ClickHouse: 3 nodes × r6i.4xlarge = ~$2500/month
  • Postgres: 1 master + 2 replica r6i.large = ~$700/month
  • Collector: 10 × m6i.large = ~$600/month
  • Total compute: ~$7300/month

10.2 Storage

  • ClickHouse disk: 3 × 2TB EBS = ~$500/month
  • S3 (Iceberg cold): 80TB → ~$2000/month (Standard-IA)
  • Kafka retention 7 days: 2TB × 200/month
  • Total storage: ~$2700/month

10.3 Total

~$10K/month for 100K events/s system. Optimize over time.

Comparison: Segment + Snowflake equivalent: $50-100K/month at this scale.


11. Operational Concerns

11.1 Kafka lag

kafka-consumer-groups --describe --group flink-enricher
# LAG > 10K → scale Flink

Alert if consumer lag > 1 minute of events.

11.2 ClickHouse parts count

Too many small parts → slow query. Monitor:

SELECT count() FROM system.parts WHERE database='default' AND active=1;

If > 1000 per table → OPTIMIZE TABLE.

11.3 Late data handling

Events arriving with occurred_at way in past:

  • < 1h late: insert normally
  • 1-24h late: insert but trigger re-aggregation of affected hours
  • 24h late: drop or side-output

11.4 Schema changes

New event type: just add to topic. Auto-discovered. New property: update schema, but ClickHouse properties is JSON string → handles any.


12. Migration / Rebuild

ClickHouse table rebuild (e.g., new ORDER BY):

  1. Create new table
  2. Backfill via INSERT … SELECT
  3. Switch app to write new table
  4. Drop old

Can take hours for huge table. Plan windows.


13. Privacy

  • IP hashed (not stored raw)
  • PII in properties flagged, encrypted column-level if needed
  • Per-user deletion (GDPR): mark + scheduled cleanup
  • Anonymization for cold storage

14. Tiếp theo

Case-Design-Data-AI-RAG

Cập nhật: 2026-05-16