Devops
State Stores & Interactive Queries
At a Glance
| Aspect | Details |
|---|---|
| Goal | Build and query stateful stream processing applications |
| State Store Types | KeyValue, Window, Session - persistent or in-memory |
| Default Engine | RocksDB (embedded key-value store) |
| Fault Tolerance | Changelog topics + standby replicas |
| Interactive Queries | Query state stores via REST API from any instance |
| Spring Integration | KafkaStreams bean + ReadOnlyKeyValueStore |
| Prerequisites | Part 15 (Streams fundamentals) |
What You'll Learn
- How Kafka Streams manages state locally and recovers from failures
- RocksDB configuration and tuning for production
- Changelog topics and compaction for state backup
- Standby replicas for fast failover
- Building REST APIs to query stream state
- Implementing the "query routing" pattern for distributed queries
- State store monitoring and troubleshooting
Production Story: The Real-Time Leaderboard
Friday, 6:00 PM. A gaming company's backend team.
"We need a real-time leaderboard," the product manager announced. "Players want to see their rank update instantly after each game."
The first approach used a database:
Game ends → Update DB → Query for rank → Return to player
Problem: At peak times with 100,000 concurrent players, the database couldn't handle the query load. P99 latency hit 5 seconds.
The solution was Kafka Streams with stateful processing:
JAVA(8 lines)CodeLoading syntax highlighter...
Now each Kafka Streams instance held a local copy of its partition's player stats. Queries went directly to the local RocksDB store—no network hop, no database bottleneck.
JAVA(7 lines)CodeLoading syntax highlighter...
Result: P99 dropped from 5 seconds to 8 milliseconds. The state was replicated via changelog topics, so even instance failures recovered in seconds.
The insight: Stream processing isn't just about transformations—it's about bringing the state close to the computation.
Mental Model: State in Kafka Streams
┌──────────────────────────────────────────────────────────────────────────┐ │ STATE STORE ARCHITECTURE │ └──────────────────────────────────────────────────────────────────────────┘ KAFKA STREAMS INSTANCE KAFKA CLUSTER ───────────────────── ───────────────── ┌─────────────────────────────────┐ ┌───────────────────┐ │ Your Application │ │ Input Topic │ │ │ │ (partitioned) │ │ ┌───────────────────────────┐ │ reads │ │ │ │ Stream Processor │◀─┼───────────────│ p0 p1 p2 p3 │ │ │ │ │ └───────────────────┘ │ │ filter → map → aggregate │ │ │ └─────────────┬─────────────┘ │ │ │ │ │ reads/writes │ │ │ │ │ ┌─────────────▼─────────────┐ │ ┌───────────────────┐ │ │ State Store │ │ changelog │ Changelog Topic │ │ │ (RocksDB) │──┼──────────────▶│ (compacted) │ │ │ │ │ writes │ │ │ │ Key → Value (local disk) │ │ │ p0 p1 p2 p3 │ │ └───────────────────────────┘ │ └───────────────────┘ │ │ │ ┌───────────────────────────┐ │ On restart: │ │ Interactive Query API │ │ reads changelog │ │ │ │ to rebuild state │ │ GET /store/key → value │ │ │ └───────────────────────────┘ │ │ │ └─────────────────────────────────┘
Deep Dive: State Store Types
KeyValue Store
JAVA(48 lines)CodeLoading syntax highlighter...
Window Store
JAVA(47 lines)CodeLoading syntax highlighter...
Store Type Comparison
┌──────────────────────────────────────────────────────────────────────────┐ │ STATE STORE TYPES │ ├─────────────────┬─────────────────┬─────────────────┬────────────────────┤ │ │ KeyValueStore │ WindowStore │ SessionStore │ ├─────────────────┼─────────────────┼─────────────────┼────────────────────┤ │ Key │ K │ Windowed<K> │ Windowed<K> │ │ │ │ (K + timeWindow)│ (K + sessionWindow)│ ├─────────────────┼─────────────────┼─────────────────┼────────────────────┤ │ Operations │ get, put, range │ fetch by │ fetch by key, │ │ │ delete, all │ key+time range │ findSessions │ ├─────────────────┼─────────────────┼─────────────────┼────────────────────┤ │ Use Case │ Latest value │ Time-bounded │ Activity-bounded │ │ │ per key │ aggregations │ aggregations │ ├─────────────────┼─────────────────┼─────────────────┼────────────────────┤ │ Example │ User profile │ Hourly sales │ User session │ │ │ Current balance │ Rolling average │ duration │ └─────────────────┴─────────────────┴─────────────────┴────────────────────┘
Deep Dive: RocksDB Configuration
Production RocksDB Tuning
JAVA(74 lines)CodeLoading syntax highlighter...
Streams Configuration for RocksDB
JAVA(22 lines)CodeLoading syntax highlighter...
Deep Dive: Changelog Topics and Recovery
How Changelog Topics Work
┌──────────────────────────────────────────────────────────────────────────┐ │ CHANGELOG TOPIC LIFECYCLE │ └──────────────────────────────────────────────────────────────────────────┘ NORMAL OPERATION: State Store Update Changelog Topic Write ────────────────── ──────────────────── put("alice", 100) ──▶ [key=alice, value=100, ts=T1] put("bob", 200) ──▶ [key=bob, value=200, ts=T2] put("alice", 150) ──▶ [key=alice, value=150, ts=T3] (update) delete("bob") ──▶ [key=bob, value=null, ts=T4] (tombstone) COMPACTION (background): Before compaction: After compaction: ┌─────────────────────┐ ┌─────────────────────┐ │ alice → 100 (T1) │ │ │ │ bob → 200 (T2) │ │ alice → 150 (T3) │ ← Latest alice only │ alice → 150 (T3) │ ──▶ │ │ ← bob deleted │ bob → null (T4) │ │ │ └─────────────────────┘ └─────────────────────┘ RECOVERY (on restart): 1. Application starts 2. Discovers state store empty/outdated 3. Reads changelog from beginning 4. Replays all records to rebuild state 5. State restored, ready to process Timeline: ├──────────────────┼──────────────────┼──────────────────┤ │ Read changelog │ Restore state │ Resume processing│ │ (fast with │ (local disk │ │ │ compaction) │ writes) │ │
Changelog Configuration
JAVA(17 lines)CodeLoading syntax highlighter...
Disable Logging (When to Use)
JAVA(10 lines)CodeLoading syntax highlighter...
Deep Dive: Standby Replicas
Why Standby Replicas?
┌──────────────────────────────────────────────────────────────────────────┐ │ STANDBY REPLICA PATTERN │ └──────────────────────────────────────────────────────────────────────────┘ WITHOUT STANDBY REPLICAS: Instance 1 (active) Instance 2 (standby candidate) ┌─────────────────────┐ ┌─────────────────────┐ │ Partition 0 state │ │ (empty) │ │ ████████████████ │ │ │ │ (10GB RocksDB) │ │ │ └─────────────────────┘ └─────────────────────┘ Instance 1 fails... Instance 2 must restore 10GB from changelog = MINUTES of downtime WITH STANDBY REPLICAS (num.standby.replicas=1): Instance 1 (active) Instance 2 (standby) ┌─────────────────────┐ ┌─────────────────────┐ │ Partition 0 state │ │ Partition 0 replica │ │ ████████████████ │ │ ████████████████ │ │ (10GB RocksDB) │ │ (10GB RocksDB) │ │ │ │ (kept in sync) │ └─────────────────────┘ └─────────────────────┘ │ ▲ └────────── changelog ────────────┘ Instance 1 fails... Instance 2 already has state = SECONDS of downtime
Configuring Standby Replicas
JAVA(18 lines)CodeLoading syntax highlighter...
Standby Assignment Strategy
JAVA(24 lines)CodeLoading syntax highlighter...
Deep Dive: Interactive Queries
Basic Store Query
JAVA(58 lines)CodeLoading syntax highlighter...
Windowed Store Query
JAVA(25 lines)CodeLoading syntax highlighter...
REST Controller for Queries
JAVA(45 lines)CodeLoading syntax highlighter...
Deep Dive: Query Routing Pattern
Finding the Right Instance
┌──────────────────────────────────────────────────────────────────────────┐ │ QUERY ROUTING PATTERN │ └──────────────────────────────────────────────────────────────────────────┘ Client Query: GET /player/alice Step 1: Determine which partition owns "alice" ┌─────────────────────────────────────────────────────────────┐ │ partition = hash(key) % numPartitions │ │ partition = hash("alice") % 4 = 2 │ └─────────────────────────────────────────────────────────────┘ Step 2: Find which instance has partition 2 ┌─────────────────────────────────────────────────────────────┐ │ KafkaStreams.queryMetadataForKey("store", "alice", ...) │ │ Returns: StreamsMetadata { host: "instance-2", port: 8080 }│ └─────────────────────────────────────────────────────────────┘ Step 3: Route query ┌─────────────────────────────────────────────────────────────┐ │ If local: query store directly │ │ If remote: HTTP call to instance-2:8080/player/alice │ └─────────────────────────────────────────────────────────────┘ Instance Layout: ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Instance 1 │ │ Instance 2 │ │ Instance 3 │ │ P0, P1 │ │ P2, P3 │ │ P4, P5 │ │ │ │ │ │ │ │ alice? NO │ │ alice? YES │ │ alice? NO │ └─────────────┘ └──────┬──────┘ └─────────────┘ │ Query here!
Query Routing Service Implementation
JAVA(126 lines)CodeLoading syntax highlighter...
Application Host Configuration
YAML(14 lines)CodeLoading syntax highlighter...
Deep Dive: State Store Monitoring
Key Metrics to Track
JAVA(86 lines)CodeLoading syntax highlighter...
RocksDB Metrics
JAVA(63 lines)CodeLoading syntax highlighter...
Common Mistakes
1. Querying During Rebalance
JAVA(30 lines)CodeLoading syntax highlighter...
2. Large State Without Standby
JAVA(11 lines)CodeLoading syntax highlighter...
3. Not Considering Query Locality
JAVA(14 lines)CodeLoading syntax highlighter...
4. Iterator Resource Leaks
JAVA(20 lines)CodeLoading syntax highlighter...
Debug This: The Incomplete Aggregation
A
count() aggregation is showing fewer results than expected:JAVA(6 lines)CodeLoading syntax highlighter...
The topology is running on 3 instances. What's happening?
Answer
The issue: Interactive queries only see local partition data.
Instance 1 (partitions 0,1): categories A, B Instance 2 (partitions 2,3): categories C, D Instance 3 (partitions 4,5): category E Query on Instance 1: only sees A, B (2 categories)
Solutions:
-
Query all instances (scatter-gather):JAVA(8 lines)CodeLoading syntax highlighter...
-
Use GlobalKTable if data is small:JAVA(3 lines)CodeLoading syntax highlighter...
-
External store for global queries:JAVA(4 lines)CodeLoading syntax highlighter...
Root cause: This is by design. KTable state is partitioned for scalability. If you need global view, you must either aggregate across instances or use GlobalKTable (which doesn't scale as well).
Exercises
Exercise 1: Build a Session Store
Create a stateful processor that:
- Tracks user sessions (events within 30min gaps)
- Stores session start time, event count, last activity
- Provides query API for active sessions
Exercise 2: Implement Query Routing
Build a REST service with:
- Point queries routed to correct instance
- Range queries scattered to all instances
- Fallback to standby replicas on failure
Exercise 3: RocksDB Tuning
Profile and tune RocksDB for:
- High write throughput workload
- Read-heavy workload with large dataset
- Memory-constrained environment
Exercise 4: State Store Migration
Implement a strategy to:
- Add a new field to state store values
- Migrate existing data without downtime
- Handle both old and new formats during migration
Exercise 5: Monitoring Dashboard
Create Grafana dashboards showing:
- State store sizes and growth
- Restoration time after restarts
- Cache hit ratios
- Query latencies (local vs remote)
Interview Questions
Q1: "How does Kafka Streams achieve fault tolerance for stateful processing?"
What they're looking for: Understanding of state backup and recovery
Strong answer:
"Kafka Streams uses a changelog topic pattern:
-
Changelog Topics - Every state store write is also written to a compacted Kafka topic:
State: put(key, value) Changelog: [key, value] appended to topic -
Compaction - Changelog topics use log compaction, keeping only the latest value per key. This bounds recovery time.
-
Recovery Process - On restart:
- Discover assigned partitions
- Read changelog from beginning (or checkpoint)
- Replay into local state store
- Resume processing
-
Standby Replicas - Optionally, other instances maintain hot copies:
- Continuously consume changelog
- Ready to take over immediately
The key insight is that Kafka itself becomes the source of truth. Local state stores are just materialized views that can be rebuilt from the changelog."
Q2: "When would you disable changelog logging for a state store?"
What they're looking for: Understanding of trade-offs
Strong answer:
"I'd disable changelog when:
-
Derived/Recomputable State - If state can be rebuilt from source topics:JAVA(2 lines)CodeLoading syntax highlighter...
-
Caching Layer - Short-term cache where loss is acceptable:JAVA(2 lines)CodeLoading syntax highlighter...
-
Very High Write Volume - When changelog becomes bottleneck:JAVA(2 lines)CodeLoading syntax highlighter...
-
Testing - Unit tests don't need persistence:JAVA(3 lines)CodeLoading syntax highlighter...
The risk is longer recovery time (must reprocess from source) and potential data loss if source topics have limited retention. Always evaluate: Is the recovery time acceptable?"
Q3: "Explain the query routing pattern for interactive queries."
What they're looking for: Distributed systems understanding
Strong answer:
"Kafka Streams partitions state across instances. For queries:
Point Queries (single key):
JAVA(11 lines)CodeLoading syntax highlighter...
Range/All Queries (scatter-gather):
JAVA(12 lines)CodeLoading syntax highlighter...
Key considerations:
- Configure
application.serverfor discovery - Handle rebalancing gracefully
- Consider standby replicas for failover
- Load balancer needs to be aware of routing"
Q4: "How do you size memory for Kafka Streams with state stores?"
What they're looking for: Operational experience
Strong answer:
"Memory in Kafka Streams has several components:
1. JVM Heap:
- Record cache: StreamsConfig.STATESTORE_CACHE_MAX_BYTES - In-flight records - Application objects Rule: Start with 1-2GB, monitor GC
2. Off-Heap (RocksDB):
- Block cache: Most important for reads - Write buffers: For write performance - Indexes and bloom filters Rule: Block cache = 1/3 of available memory
3. OS Page Cache:
- RocksDB relies heavily on OS page cache - Keep memory available for this Rule: Leave 20-30% of memory for OS
Sizing formula:
Total memory needed per instance: = JVM heap (2GB base + 100MB per partition) + RocksDB block cache (256MB - 1GB) + Write buffers (64MB × num_stores × 3) + OS page cache (20% of total) Example: 8 partitions, 3 stores = 2.8GB heap + 512MB cache + 576MB buffers + ~1GB OS ≈ 5GB per instance
Monitor
rocksdb.cache.hit.ratio - if low, increase block cache."Q5: "How would you handle a state store that's growing unbounded?"
What they're looking for: Problem-solving approach
Strong answer:
"Unbounded growth typically means data isn't being removed. Approaches:
1. Windowed Stores (automatic retention):
JAVA(3 lines)CodeLoading syntax highlighter...
2. TTL with Processor API:
JAVA(12 lines)CodeLoading syntax highlighter...
3. Tombstones in Source Topic:
JAVA(4 lines)CodeLoading syntax highlighter...
4. External Cleanup Job:
JAVA(2 lines)CodeLoading syntax highlighter...
Monitoring: Track
store.approximateNumEntries() and alert on growth rate exceeding expectations."Summary & Key Takeaways
State Store Fundamentals
┌──────────────────────────────────────────────────────────────────────────┐ │ STATE STORE SUMMARY │ ├────────────────────────────────┬─────────────────────────────────────────┤ │ Concept │ Key Point │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Default Storage │ RocksDB (embedded, persistent) │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Fault Tolerance │ Changelog topics (compacted) │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Fast Recovery │ Standby replicas (hot copies) │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Query Access │ Interactive queries (local + routing) │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Data Distribution │ Partitioned (like Kafka topics) │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Global Access │ GlobalKTable or scatter-gather │ └────────────────────────────────┴─────────────────────────────────────────┘
Essential Takeaways
-
State is local but recoverable - Each instance has its own state store, backed by changelog topics.
-
RocksDB needs tuning - Block cache, write buffers, and compaction settings significantly impact performance.
-
Standby replicas are essential - For large state stores, standby replicas reduce failover time from minutes to seconds.
-
Interactive queries need routing - A single query only sees local partitions; implement scatter-gather for global views.
-
Monitor state store health - Track entry counts, restoration times, and cache hit ratios.
-
Clean up old data - Use windowed stores, TTL patterns, or tombstones to prevent unbounded growth.
Quick Reference
┌──────────────────────────────────────────────────────────────────────────┐ │ STATE STORE QUICK REFERENCE │ ├────────────────────────────────┬─────────────────────────────────────────┤ │ Configuration │ Code/Property │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Enable changelog │ Materialized.withLoggingEnabled() │ │ Disable changelog │ Materialized.withLoggingDisabled() │ │ In-memory store │ withStoreType(IN_MEMORY) │ │ Standby replicas │ num.standby.replicas=1 │ │ State directory │ state.dir=/var/streams │ │ Cache size │ statestore.cache.max.bytes=10485760 │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Query Operations │ │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Get store handle │ streams.store(StoreQueryParameters...) │ │ Point lookup │ store.get(key) │ │ Range query │ store.range(from, to) │ │ All entries │ store.all() (iterator, must close!) │ │ Approximate count │ store.approximateNumEntries() │ │ Find key owner │ streams.queryMetadataForKey(...) │ │ All store hosts │ streams.allMetadataForStore(...) │ └────────────────────────────────┴─────────────────────────────────────────┘
Series Navigation
Series Overview: Kafka Compendium Series