Devops

State Stores & Interactive Queries

At a Glance

AspectDetails
GoalBuild and query stateful stream processing applications
State Store TypesKeyValue, Window, Session - persistent or in-memory
Default EngineRocksDB (embedded key-value store)
Fault ToleranceChangelog topics + standby replicas
Interactive QueriesQuery state stores via REST API from any instance
Spring IntegrationKafkaStreams bean + ReadOnlyKeyValueStore
PrerequisitesPart 15 (Streams fundamentals)

What You'll Learn

  • How Kafka Streams manages state locally and recovers from failures
  • RocksDB configuration and tuning for production
  • Changelog topics and compaction for state backup
  • Standby replicas for fast failover
  • Building REST APIs to query stream state
  • Implementing the "query routing" pattern for distributed queries
  • State store monitoring and troubleshooting

Production Story: The Real-Time Leaderboard

Friday, 6:00 PM. A gaming company's backend team.

"We need a real-time leaderboard," the product manager announced. "Players want to see their rank update instantly after each game."

The first approach used a database:

Game ends → Update DB → Query for rank → Return to player

Problem: At peak times with 100,000 concurrent players, the database couldn't handle the query load. P99 latency hit 5 seconds.

The solution was Kafka Streams with stateful processing:
JAVA(8 lines)
Code
Loading syntax highlighter...
Now each Kafka Streams instance held a local copy of its partition's player stats. Queries went directly to the local RocksDB store—no network hop, no database bottleneck.
JAVA(7 lines)
Code
Loading syntax highlighter...
Result: P99 dropped from 5 seconds to 8 milliseconds. The state was replicated via changelog topics, so even instance failures recovered in seconds.
The insight: Stream processing isn't just about transformations—it's about bringing the state close to the computation.

Mental Model: State in Kafka Streams

┌──────────────────────────────────────────────────────────────────────────┐
│                  STATE STORE ARCHITECTURE                                │
└──────────────────────────────────────────────────────────────────────────┘

  KAFKA STREAMS INSTANCE                              KAFKA CLUSTER
  ─────────────────────                              ─────────────────

  ┌─────────────────────────────────┐               ┌───────────────────┐
  │        Your Application         │               │   Input Topic     │
  │                                 │               │   (partitioned)   │
  │  ┌───────────────────────────┐  │    reads      │                   │
  │  │     Stream Processor      │◀─┼───────────────│  p0  p1  p2  p3   │
  │  │                           │  │               └───────────────────┘
  │  │  filter → map → aggregate │  │
  │  └─────────────┬─────────────┘  │
  │                │                │
  │       reads/writes              │
  │                │                │
  │  ┌─────────────▼─────────────┐  │               ┌───────────────────┐
  │  │      State Store          │  │   changelog   │  Changelog Topic  │
  │  │       (RocksDB)           │──┼──────────────▶│  (compacted)      │
  │  │                           │  │    writes     │                   │
  │  │  Key → Value (local disk) │  │               │  p0  p1  p2  p3   │
  │  └───────────────────────────┘  │               └───────────────────┘
  │                                 │
  │  ┌───────────────────────────┐  │                  On restart:
  │  │   Interactive Query API   │  │                  reads changelog
  │  │                           │  │                  to rebuild state
  │  │  GET /store/key → value   │  │
  │  └───────────────────────────┘  │
  │                                 │
  └─────────────────────────────────┘

Deep Dive: State Store Types

KeyValue Store

JAVA(48 lines)
Code
Loading syntax highlighter...

Window Store

JAVA(47 lines)
Code
Loading syntax highlighter...

Store Type Comparison

┌──────────────────────────────────────────────────────────────────────────┐
│                     STATE STORE TYPES                                    │
├─────────────────┬─────────────────┬─────────────────┬────────────────────┤
│                 │ KeyValueStore   │ WindowStore     │ SessionStore       │
├─────────────────┼─────────────────┼─────────────────┼────────────────────┤
│ Key             │ K               │ Windowed<K>     │ Windowed<K>        │
│                 │                 │ (K + timeWindow)│ (K + sessionWindow)│
├─────────────────┼─────────────────┼─────────────────┼────────────────────┤
│ Operations      │ get, put, range │ fetch by        │ fetch by key,      │
│                 │ delete, all     │ key+time range  │ findSessions       │
├─────────────────┼─────────────────┼─────────────────┼────────────────────┤
│ Use Case        │ Latest value    │ Time-bounded    │ Activity-bounded   │
│                 │ per key         │ aggregations    │ aggregations       │
├─────────────────┼─────────────────┼─────────────────┼────────────────────┤
│ Example         │ User profile    │ Hourly sales    │ User session       │
│                 │ Current balance │ Rolling average │ duration           │
└─────────────────┴─────────────────┴─────────────────┴────────────────────┘

Deep Dive: RocksDB Configuration

Production RocksDB Tuning

JAVA(74 lines)
Code
Loading syntax highlighter...

Streams Configuration for RocksDB

JAVA(22 lines)
Code
Loading syntax highlighter...

Deep Dive: Changelog Topics and Recovery

How Changelog Topics Work

┌──────────────────────────────────────────────────────────────────────────┐
│                    CHANGELOG TOPIC LIFECYCLE                             │
└──────────────────────────────────────────────────────────────────────────┘

  NORMAL OPERATION:

  State Store Update          Changelog Topic Write
  ──────────────────          ────────────────────
  put("alice", 100)    ──▶    [key=alice, value=100, ts=T1]
  put("bob", 200)      ──▶    [key=bob, value=200, ts=T2]
  put("alice", 150)    ──▶    [key=alice, value=150, ts=T3]  (update)
  delete("bob")        ──▶    [key=bob, value=null, ts=T4]   (tombstone)


  COMPACTION (background):

  Before compaction:          After compaction:
  ┌─────────────────────┐     ┌─────────────────────┐
  │ alice → 100 (T1)    │     │                     │
  │ bob   → 200 (T2)    │     │ alice → 150 (T3)    │  ← Latest alice only
  │ alice → 150 (T3)    │ ──▶ │                     │  ← bob deleted
  │ bob   → null (T4)   │     │                     │
  └─────────────────────┘     └─────────────────────┘


  RECOVERY (on restart):

  1. Application starts
  2. Discovers state store empty/outdated
  3. Reads changelog from beginning
  4. Replays all records to rebuild state
  5. State restored, ready to process

  Timeline:
  ├──────────────────┼──────────────────┼──────────────────┤
  │  Read changelog  │  Restore state   │  Resume processing│
  │  (fast with      │  (local disk     │                   │
  │   compaction)    │   writes)        │                   │

Changelog Configuration

JAVA(17 lines)
Code
Loading syntax highlighter...

Disable Logging (When to Use)

JAVA(10 lines)
Code
Loading syntax highlighter...

Deep Dive: Standby Replicas

Why Standby Replicas?

┌──────────────────────────────────────────────────────────────────────────┐
│                    STANDBY REPLICA PATTERN                               │
└──────────────────────────────────────────────────────────────────────────┘

  WITHOUT STANDBY REPLICAS:

  Instance 1 (active)              Instance 2 (standby candidate)
  ┌─────────────────────┐          ┌─────────────────────┐
  │ Partition 0 state   │          │      (empty)        │
  │ ████████████████    │          │                     │
  │ (10GB RocksDB)      │          │                     │
  └─────────────────────┘          └─────────────────────┘

  Instance 1 fails...
  Instance 2 must restore 10GB from changelog = MINUTES of downtime


  WITH STANDBY REPLICAS (num.standby.replicas=1):

  Instance 1 (active)              Instance 2 (standby)
  ┌─────────────────────┐          ┌─────────────────────┐
  │ Partition 0 state   │          │ Partition 0 replica │
  │ ████████████████    │          │ ████████████████    │
  │ (10GB RocksDB)      │          │ (10GB RocksDB)      │
  │                     │          │ (kept in sync)      │
  └─────────────────────┘          └─────────────────────┘
           │                                 ▲
           └────────── changelog ────────────┘

  Instance 1 fails...
  Instance 2 already has state = SECONDS of downtime

Configuring Standby Replicas

JAVA(18 lines)
Code
Loading syntax highlighter...

Standby Assignment Strategy

JAVA(24 lines)
Code
Loading syntax highlighter...

Deep Dive: Interactive Queries

Basic Store Query

JAVA(58 lines)
Code
Loading syntax highlighter...

Windowed Store Query

JAVA(25 lines)
Code
Loading syntax highlighter...

REST Controller for Queries

JAVA(45 lines)
Code
Loading syntax highlighter...

Deep Dive: Query Routing Pattern

Finding the Right Instance

┌──────────────────────────────────────────────────────────────────────────┐
│                    QUERY ROUTING PATTERN                                 │
└──────────────────────────────────────────────────────────────────────────┘

  Client Query: GET /player/alice

  Step 1: Determine which partition owns "alice"
  ┌─────────────────────────────────────────────────────────────┐
  │  partition = hash(key) % numPartitions                      │
  │  partition = hash("alice") % 4 = 2                          │
  └─────────────────────────────────────────────────────────────┘

  Step 2: Find which instance has partition 2
  ┌─────────────────────────────────────────────────────────────┐
  │  KafkaStreams.queryMetadataForKey("store", "alice", ...)    │
  │  Returns: StreamsMetadata { host: "instance-2", port: 8080 }│
  └─────────────────────────────────────────────────────────────┘

  Step 3: Route query
  ┌─────────────────────────────────────────────────────────────┐
  │  If local: query store directly                             │
  │  If remote: HTTP call to instance-2:8080/player/alice       │
  └─────────────────────────────────────────────────────────────┘

  Instance Layout:
  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
  │ Instance 1  │    │ Instance 2  │    │ Instance 3  │
  │ P0, P1      │    │ P2, P3      │    │ P4, P5      │
  │             │    │             │    │             │
  │ alice? NO   │    │ alice? YES  │    │ alice? NO   │
  └─────────────┘    └──────┬──────┘    └─────────────┘
                            │
                       Query here!

Query Routing Service Implementation

JAVA(126 lines)
Code
Loading syntax highlighter...

Application Host Configuration

YAML(14 lines)
Code
Loading syntax highlighter...

Deep Dive: State Store Monitoring

Key Metrics to Track

JAVA(86 lines)
Code
Loading syntax highlighter...

RocksDB Metrics

JAVA(63 lines)
Code
Loading syntax highlighter...

Common Mistakes

1. Querying During Rebalance

JAVA(30 lines)
Code
Loading syntax highlighter...

2. Large State Without Standby

JAVA(11 lines)
Code
Loading syntax highlighter...

3. Not Considering Query Locality

JAVA(14 lines)
Code
Loading syntax highlighter...

4. Iterator Resource Leaks

JAVA(20 lines)
Code
Loading syntax highlighter...

Debug This: The Incomplete Aggregation

A count() aggregation is showing fewer results than expected:
JAVA(6 lines)
Code
Loading syntax highlighter...

The topology is running on 3 instances. What's happening?

Answer
The issue: Interactive queries only see local partition data.
Instance 1 (partitions 0,1): categories A, B
Instance 2 (partitions 2,3): categories C, D
Instance 3 (partitions 4,5): category E

Query on Instance 1: only sees A, B (2 categories)
Solutions:
  1. Query all instances (scatter-gather):
    JAVA(8 lines)
    Code
    Loading syntax highlighter...
  2. Use GlobalKTable if data is small:
    JAVA(3 lines)
    Code
    Loading syntax highlighter...
  3. External store for global queries:
    JAVA(4 lines)
    Code
    Loading syntax highlighter...
Root cause: This is by design. KTable state is partitioned for scalability. If you need global view, you must either aggregate across instances or use GlobalKTable (which doesn't scale as well).

Exercises

Exercise 1: Build a Session Store

Create a stateful processor that:

  • Tracks user sessions (events within 30min gaps)
  • Stores session start time, event count, last activity
  • Provides query API for active sessions

Exercise 2: Implement Query Routing

Build a REST service with:

  • Point queries routed to correct instance
  • Range queries scattered to all instances
  • Fallback to standby replicas on failure

Exercise 3: RocksDB Tuning

Profile and tune RocksDB for:

  • High write throughput workload
  • Read-heavy workload with large dataset
  • Memory-constrained environment

Exercise 4: State Store Migration

Implement a strategy to:

  • Add a new field to state store values
  • Migrate existing data without downtime
  • Handle both old and new formats during migration

Exercise 5: Monitoring Dashboard

Create Grafana dashboards showing:

  • State store sizes and growth
  • Restoration time after restarts
  • Cache hit ratios
  • Query latencies (local vs remote)

Interview Questions

Q1: "How does Kafka Streams achieve fault tolerance for stateful processing?"

What they're looking for: Understanding of state backup and recovery
Strong answer: "Kafka Streams uses a changelog topic pattern:
  1. Changelog Topics - Every state store write is also written to a compacted Kafka topic:
    State: put(key, value)
    Changelog: [key, value] appended to topic
    
  2. Compaction - Changelog topics use log compaction, keeping only the latest value per key. This bounds recovery time.
  3. Recovery Process - On restart:
    • Discover assigned partitions
    • Read changelog from beginning (or checkpoint)
    • Replay into local state store
    • Resume processing
  4. Standby Replicas - Optionally, other instances maintain hot copies:
    • Continuously consume changelog
    • Ready to take over immediately

The key insight is that Kafka itself becomes the source of truth. Local state stores are just materialized views that can be rebuilt from the changelog."

Q2: "When would you disable changelog logging for a state store?"

What they're looking for: Understanding of trade-offs
Strong answer: "I'd disable changelog when:
  1. Derived/Recomputable State - If state can be rebuilt from source topics:
    JAVA(2 lines)
    Code
    Loading syntax highlighter...
  2. Caching Layer - Short-term cache where loss is acceptable:
    JAVA(2 lines)
    Code
    Loading syntax highlighter...
  3. Very High Write Volume - When changelog becomes bottleneck:
    JAVA(2 lines)
    Code
    Loading syntax highlighter...
  4. Testing - Unit tests don't need persistence:
    JAVA(3 lines)
    Code
    Loading syntax highlighter...
The risk is longer recovery time (must reprocess from source) and potential data loss if source topics have limited retention. Always evaluate: Is the recovery time acceptable?"

Q3: "Explain the query routing pattern for interactive queries."

What they're looking for: Distributed systems understanding
Strong answer: "Kafka Streams partitions state across instances. For queries:
Point Queries (single key):
JAVA(11 lines)
Code
Loading syntax highlighter...
Range/All Queries (scatter-gather):
JAVA(12 lines)
Code
Loading syntax highlighter...
Key considerations:
  • Configure application.server for discovery
  • Handle rebalancing gracefully
  • Consider standby replicas for failover
  • Load balancer needs to be aware of routing"

Q4: "How do you size memory for Kafka Streams with state stores?"

What they're looking for: Operational experience
Strong answer: "Memory in Kafka Streams has several components:
1. JVM Heap:
- Record cache: StreamsConfig.STATESTORE_CACHE_MAX_BYTES
- In-flight records
- Application objects
Rule: Start with 1-2GB, monitor GC
2. Off-Heap (RocksDB):
- Block cache: Most important for reads
- Write buffers: For write performance
- Indexes and bloom filters

Rule: Block cache = 1/3 of available memory
3. OS Page Cache:
- RocksDB relies heavily on OS page cache
- Keep memory available for this

Rule: Leave 20-30% of memory for OS
Sizing formula:
Total memory needed per instance:
= JVM heap (2GB base + 100MB per partition)
+ RocksDB block cache (256MB - 1GB)
+ Write buffers (64MB × num_stores × 3)
+ OS page cache (20% of total)

Example: 8 partitions, 3 stores
= 2.8GB heap + 512MB cache + 576MB buffers + ~1GB OS
≈ 5GB per instance
Monitor rocksdb.cache.hit.ratio - if low, increase block cache."

Q5: "How would you handle a state store that's growing unbounded?"

What they're looking for: Problem-solving approach
Strong answer: "Unbounded growth typically means data isn't being removed. Approaches:
1. Windowed Stores (automatic retention):
JAVA(3 lines)
Code
Loading syntax highlighter...
2. TTL with Processor API:
JAVA(12 lines)
Code
Loading syntax highlighter...
3. Tombstones in Source Topic:
JAVA(4 lines)
Code
Loading syntax highlighter...
4. External Cleanup Job:
JAVA(2 lines)
Code
Loading syntax highlighter...
Monitoring: Track store.approximateNumEntries() and alert on growth rate exceeding expectations."

Summary & Key Takeaways

State Store Fundamentals

┌──────────────────────────────────────────────────────────────────────────┐
│                    STATE STORE SUMMARY                                   │
├────────────────────────────────┬─────────────────────────────────────────┤
│ Concept                        │ Key Point                               │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Default Storage                │ RocksDB (embedded, persistent)          │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Fault Tolerance                │ Changelog topics (compacted)            │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Fast Recovery                  │ Standby replicas (hot copies)           │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Query Access                   │ Interactive queries (local + routing)   │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Data Distribution              │ Partitioned (like Kafka topics)         │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Global Access                  │ GlobalKTable or scatter-gather          │
└────────────────────────────────┴─────────────────────────────────────────┘

Essential Takeaways

  1. State is local but recoverable - Each instance has its own state store, backed by changelog topics.
  2. RocksDB needs tuning - Block cache, write buffers, and compaction settings significantly impact performance.
  3. Standby replicas are essential - For large state stores, standby replicas reduce failover time from minutes to seconds.
  4. Interactive queries need routing - A single query only sees local partitions; implement scatter-gather for global views.
  5. Monitor state store health - Track entry counts, restoration times, and cache hit ratios.
  6. Clean up old data - Use windowed stores, TTL patterns, or tombstones to prevent unbounded growth.

Quick Reference

┌──────────────────────────────────────────────────────────────────────────┐
│                    STATE STORE QUICK REFERENCE                           │
├────────────────────────────────┬─────────────────────────────────────────┤
│ Configuration                  │ Code/Property                           │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Enable changelog               │ Materialized.withLoggingEnabled()       │
│ Disable changelog              │ Materialized.withLoggingDisabled()      │
│ In-memory store                │ withStoreType(IN_MEMORY)                │
│ Standby replicas               │ num.standby.replicas=1                  │
│ State directory                │ state.dir=/var/streams                  │
│ Cache size                     │ statestore.cache.max.bytes=10485760     │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Query Operations               │                                         │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Get store handle               │ streams.store(StoreQueryParameters...)  │
│ Point lookup                   │ store.get(key)                          │
│ Range query                    │ store.range(from, to)                   │
│ All entries                    │ store.all() (iterator, must close!)     │
│ Approximate count              │ store.approximateNumEntries()           │
│ Find key owner                 │ streams.queryMetadataForKey(...)        │
│ All store hosts                │ streams.allMetadataForStore(...)        │
└────────────────────────────────┴─────────────────────────────────────────┘

Series Navigation

Series Overview: Kafka Compendium Series