Streams Fundamentals & Topologies
At a Glance
| Aspect | Details |
|---|---|
| Goal | Build stream processing applications with Kafka Streams |
| Architecture | Library, not framework - runs in your JVM |
| DSL vs Processor API | High-level declarative vs low-level imperative |
| Key Abstractions | KStream (event stream), KTable (changelog), GlobalKTable (broadcast) |
| Topology | Directed acyclic graph of stream processors |
| Spring Integration | Spring Kafka Streams with functional style |
| Prerequisites | Parts 1-14 (core Kafka concepts) |
What You'll Learn
- Kafka Streams architecture and how it differs from other frameworks
- Building topologies with the high-level DSL
- KStream vs KTable: when to use each
- Stream transformations: map, filter, flatMap, branch
- The Processor API for custom logic
- Testing Kafka Streams applications
- Spring Kafka Streams integration
Production Story: The Real-Time Fraud Detector
"We need to detect fraud in real-time," the product manager explained. "Right now, our batch job runs every hour. By the time we catch fraud, the money is gone."
The team evaluated options:
- Apache Flink: Powerful, but complex cluster management
- Apache Spark Streaming: Required Spark cluster, microbatching latency
- Custom consumers: Would need to build everything from scratch
- No separate cluster needed - just a Java library
- Exactly-once semantics built-in
- State management handled automatically
- Scales with Kafka partitions
JAVA(11 lines)CodeLoading syntax highlighter...
Within a week, they had real-time fraud detection running. No new infrastructure, no cluster management—just a Java application that scaled automatically with their Kafka deployment.
Mental Model: Kafka Streams Architecture
┌──────────────────────────────────────────────────────────────────────────┐ │ KAFKA STREAMS ARCHITECTURE │ └──────────────────────────────────────────────────────────────────────────┘ OTHER FRAMEWORKS KAFKA STREAMS ───────────────── ───────────────── ┌─────────────────┐ ┌─────────────────┐ │ Your Code │ │ Your Code │ └────────┬────────┘ │ + Streams │ │ │ Library │ ▼ └────────┬────────┘ ┌─────────────────┐ │ │ Submit to │ │ │ Cluster │ No separate cluster! └────────┬────────┘ │ │ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ │ Your JVM │ │ Separate │ │ (App Server, │ │ Processing │ │ Container, │ │ Cluster │ │ etc.) │ │ │ └────────┬────────┘ │ • Flink │ │ │ • Spark │ │ │ • etc. │ │ └────────┬────────┘ │ │ │ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ │ Kafka │ │ Kafka │ └─────────────────┘ └─────────────────┘
The Topology: Heart of Kafka Streams
┌──────────────────────────────────────────────────────────────────────────┐ │ STREAM TOPOLOGY │ └──────────────────────────────────────────────────────────────────────────┘ A topology is a directed acyclic graph (DAG) of processors: ┌───────────────┐ │ SOURCE │ ← Reads from Kafka topic │ (orders) │ └───────┬───────┘ │ ▼ ┌───────────────┐ │ FILTER │ ← Removes cancelled orders │ status=VALID │ └───────┬───────┘ │ ┌────────────┴────────────┐ ▼ ▼ ┌───────────────┐ ┌───────────────┐ │ MAP │ │ AGGREGATE │ ← State store │ enrich data │ │ by customer │ └───────┬───────┘ └───────┬───────┘ │ │ ▼ ▼ ┌───────────────┐ ┌───────────────┐ │ SINK │ │ SINK │ │ (enriched- │ │ (customer- │ │ orders) │ │ totals) │ └───────────────┘ └───────────────┘ Each processor: • Receives records from upstream • Transforms, filters, or aggregates • Passes results downstream
Deep Dive: KStream vs KTable
The Fundamental Difference
┌──────────────────────────────────────────────────────────────────────────┐ │ KSTREAM VS KTABLE │ ├─────────────────────────────────┬────────────────────────────────────────┤ │ KStream │ KTable │ ├─────────────────────────────────┼────────────────────────────────────────┤ │ Event stream (all events) │ Changelog (latest per key) │ │ │ │ │ Records: Immutable facts │ Records: Updates to state │ │ │ │ │ "Alice bought coffee" │ "Alice's balance is $100" │ │ "Alice bought tea" │ "Alice's balance is $85" │ │ "Alice bought coffee" │ │ │ │ │ │ → All 3 records preserved │ → Only latest balance kept │ ├─────────────────────────────────┼────────────────────────────────────────┤ │ Use when: │ Use when: │ │ • Every event matters │ • Only latest value matters │ │ • Processing transactions │ • Reference data / lookups │ │ • Log of changes │ • Aggregation results │ └─────────────────────────────────┴────────────────────────────────────────┘
Visualizing the Difference
Topic: user-events (compacted) Input sequence: ┌─────────────────────────────────────────────────────────────────────┐ │ Key │ Value │ Offset │ │ ├──────────┼─────────────────┼──────────┤ │ │ alice │ {"age": 25} │ 0 │ │ │ bob │ {"age": 30} │ 1 │ │ │ alice │ {"age": 26} │ 2 │ ← Alice had birthday │ │ alice │ {"age": 26} │ 3 │ ← Duplicate event │ └─────────────────────────────────────────────────────────────────────┘ KStream view (all records): KTable view (latest per key): ┌────────────────────────────┐ ┌────────────────────────────┐ │ alice → {"age": 25} │ │ │ │ bob → {"age": 30} │ │ alice → {"age": 26} │ │ alice → {"age": 26} │ │ bob → {"age": 30} │ │ alice → {"age": 26} │ │ │ └────────────────────────────┘ └────────────────────────────┘ 4 records processed 2 entries in table
Code Example: KStream vs KTable
JAVA(44 lines)CodeLoading syntax highlighter...
Deep Dive: Stream Transformations
Stateless Operations
JAVA(85 lines)CodeLoading syntax highlighter...
The Transformation Flow
┌──────────────────────────────────────────────────────────────────────────┐ │ TRANSFORMATION OPERATIONS │ └──────────────────────────────────────────────────────────────────────────┘ FILTER MAP FLATMAP ────── ─── ─────── ┌───┐ ┌───┐ ┌───┐ │ A │───┐ │ A │──▶ │ A'│ │ A │──▶ │ A1│ └───┘ │ Keep if └───┘ └───┘ │ A2│ ┌───┐ │ predicate ┌───┐ ┌───┐ │ A3│ │ B │───┼──▶ true │ B │──▶ │ B'│ │ B │──▶ │ B1│ └───┘ │ └───┘ └───┘ ┌───┐ │ ┌───┐ ┌───┐ │ C │───┘ │ C │──▶ │ C'│ │ C │──▶ (none) └───┘ └───┘ └───┘ Output: subset Output: same count Output: 0..N per input BRANCH MERGE ────── ───── ┌───────────────┐ │ Input │ Stream 1 Stream 2 │ Stream │ │ │ └───────┬───────┘ ▼ ▼ │ ┌───────────────────┐ ┌──────────┼──────────┐ │ Merged │ │ │ │ │ Stream │ ▼ ▼ ▼ └───────────────────┘ ┌───┐ ┌───┐ ┌───┐ │ A │ │ B │ │ C │ Interleaved by arrival └───┘ └───┘ └───┘ time High Priority Standard Value
Deep Dive: GlobalKTable for Broadcast Data
When to Use GlobalKTable
┌──────────────────────────────────────────────────────────────────────────┐ │ KTABLE VS GLOBALTABLE │ ├─────────────────────────────────┬────────────────────────────────────────┤ │ KTable │ GlobalKTable │ ├─────────────────────────────────┼────────────────────────────────────────┤ │ Partitioned like input topic │ Full copy on every instance │ │ │ │ │ Instance sees subset of data │ Instance sees ALL data │ │ │ │ │ Join requires co-partitioning │ Join works with any key │ │ │ │ │ Scales with partitions │ Memory usage × instance count │ ├─────────────────────────────────┼────────────────────────────────────────┤ │ Use for: │ Use for: │ │ • Large tables │ • Small reference data │ │ • Data already partitioned │ • Lookup tables │ │ │ • Configuration │ └─────────────────────────────────┴────────────────────────────────────────┘
GlobalKTable Example
JAVA(33 lines)CodeLoading syntax highlighter...
Visualization: Partitioned vs Global
┌──────────────────────────────────────────────────────────────────────────┐ │ DATA DISTRIBUTION │ └──────────────────────────────────────────────────────────────────────────┘ KTable (partitioned): Topic: customers (6 partitions) Instance 1 Instance 2 Instance 3 ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Partition 0 │ │ Partition 2 │ │ Partition 4 │ │ Partition 1 │ │ Partition 3 │ │ Partition 5 │ │ │ │ │ │ │ │ Alice, Bob │ │ Carol, Dan │ │ Eve, Frank │ └─────────────┘ └─────────────┘ └─────────────┘ Each instance only sees SOME customers GlobalKTable (broadcast): Topic: products (3 partitions) Instance 1 Instance 2 Instance 3 ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ ALL │ │ ALL │ │ ALL │ │ Partitions │ │ Partitions │ │ Partitions │ │ │ │ │ │ │ │ All 1000 │ │ All 1000 │ │ All 1000 │ │ products │ │ products │ │ products │ └─────────────┘ └─────────────┘ └─────────────┘ Every instance sees ALL products
Deep Dive: Aggregations
Basic Aggregations
JAVA(63 lines)CodeLoading syntax highlighter...
Aggregation with Subtraction (for Tombstones)
JAVA(17 lines)CodeLoading syntax highlighter...
Deep Dive: The Processor API
When to Use Processor API
┌──────────────────────────────────────────────────────────────────────────┐ │ DSL VS PROCESSOR API │ ├─────────────────────────────────┬────────────────────────────────────────┤ │ DSL │ Processor API │ ├─────────────────────────────────┼────────────────────────────────────────┤ │ Declarative, fluent │ Imperative, full control │ │ builder.stream().filter()... │ process(), punctuate() │ │ │ │ │ Handles state stores │ Manual state store access │ │ automatically │ │ │ │ │ │ Limited to predefined ops │ Any custom logic │ ├─────────────────────────────────┼────────────────────────────────────────┤ │ Use when: │ Use when: │ │ • Standard transformations │ • Custom windowing logic │ │ • Aggregations, joins │ • Periodic actions (punctuate) │ │ • Most use cases │ • Complex state machine │ │ │ • Accessing record metadata │ └─────────────────────────────────┴────────────────────────────────────────┘
Custom Processor Example
JAVA(92 lines)CodeLoading syntax highlighter...
Registering the Processor
JAVA(26 lines)CodeLoading syntax highlighter...
Deep Dive: Spring Kafka Streams
Functional Style Configuration
JAVA(34 lines)CodeLoading syntax highlighter...
Spring Cloud Stream Functional Style
JAVA(40 lines)CodeLoading syntax highlighter...
YAML(17 lines)CodeLoading syntax highlighter...
Deep Dive: Topology Visualization
Describing the Topology
JAVA(14 lines)CodeLoading syntax highlighter...
Example Topology Output
Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [orders]) --> KSTREAM-FILTER-0000000001 Processor: KSTREAM-FILTER-0000000001 (stores: []) --> KSTREAM-MAPVALUES-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-MAPVALUES-0000000002 (stores: []) --> KSTREAM-SINK-0000000003 <-- KSTREAM-FILTER-0000000001 Sink: KSTREAM-SINK-0000000003 (topic: enriched-orders) <-- KSTREAM-MAPVALUES-0000000002 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000004 (topics: [enriched-orders]) --> KSTREAM-AGGREGATE-0000000005 Processor: KSTREAM-AGGREGATE-0000000005 (stores: [customer-stats]) --> KSTREAM-TOSTREAM-0000000006 <-- KSTREAM-SOURCE-0000000004 Processor: KSTREAM-TOSTREAM-0000000006 (stores: []) --> KSTREAM-SINK-0000000007 <-- KSTREAM-AGGREGATE-0000000005 Sink: KSTREAM-SINK-0000000007 (topic: customer-statistics) <-- KSTREAM-TOSTREAM-0000000006
Deep Dive: Testing Kafka Streams
TopologyTestDriver
JAVA(101 lines)CodeLoading syntax highlighter...
Testing with State Stores
JAVA(45 lines)CodeLoading syntax highlighter...
Common Mistakes
1. Modifying Input Objects
JAVA(18 lines)CodeLoading syntax highlighter...
2. Using map() When mapValues() Suffices
JAVA(11 lines)CodeLoading syntax highlighter...
3. Not Naming Processors
JAVA(11 lines)CodeLoading syntax highlighter...
4. GlobalKTable for Large Data
JAVA(12 lines)CodeLoading syntax highlighter...
Debug This: The Duplicated Aggregates
An aggregation is producing duplicate or incorrect counts:
JAVA(7 lines)CodeLoading syntax highlighter...
What's causing the duplicates? (Answer below)
Answer
-
Reprocessing on restart - If
auto.offset.reset=earliestand the application restarts, it reprocesses old messages:JAVA(3 lines)CodeLoading syntax highlighter... -
Multiple instances with same application.id - Each instance processes same data:JAVA(3 lines)CodeLoading syntax highlighter...
-
Upstream duplication - Producer retries creating duplicates:JAVA(8 lines)CodeLoading syntax highlighter...
-
State store not persisted - Using in-memory stores that reset:JAVA(4 lines)CodeLoading syntax highlighter...
-
Multiple topologies - Accidentally creating duplicate processors:JAVA(2 lines)CodeLoading syntax highlighter...
- Check state store changelog topic for duplicates
- Verify
application.idis consistent - Check if producer is idempotent
- Enable Streams metrics for rebalance count
- Log every input record to trace source
Exercises
Exercise 1: Build an Alert Aggregator
Create a topology that:
- Reads from
alertstopic - Groups by
alertType - Counts alerts per type in 5-minute tumbling windows
- Outputs counts exceeding threshold to
alert-summaries
Exercise 2: Implement Deduplication
Build a processor that:
- Tracks seen message IDs in a state store
- Filters duplicates within a 1-hour window
- Cleans up old entries using punctuate
Exercise 3: Join Three Streams
Create a topology joining:
orders(KStream)customers(KTable)products(GlobalKTable)- Output: enriched orders with customer and product details
Exercise 4: Write Comprehensive Tests
Using TopologyTestDriver:
- Test stateless transformations
- Test windowed aggregations with time advancement
- Test state store queries
- Test error handling
Exercise 5: Topology Optimization
Given a topology, identify and fix:
- Unnecessary repartitions
- Missing named processors
- Inefficient GlobalKTable usage
- State store configuration issues
Interview Questions
Q1: "When would you use Kafka Streams vs Flink or Spark Streaming?"
- Data already in Kafka (no connector overhead)
- Simple to medium complexity processing
- Want to avoid cluster management
- Need exactly-once with Kafka sink/source
- Team knows Java/Kotlin
- Deploying as microservices
- Complex event processing (CEP)
- Need SQL interface
- Multi-source/sink (not just Kafka)
- Advanced windowing (event-time, sessions)
- Team has Flink expertise
- Need managed state with checkpointing across sources
- Already have Spark infrastructure
- Batch + stream unified processing
- ML model serving in stream
- Need DataFrame API
Kafka Streams is 'just a library' - that's both its strength (simplicity) and limitation (single runtime)."
Q2: "Explain the difference between KStream and KTable with a concrete example."
alice buys coffee → record 1 alice buys sandwich → record 2 alice buys coffee → record 3 KStream sees all 3 records. Total: 3 purchases.
alice balance $100 → update 1 alice balance $85 → update 2 (replaces) alice balance $70 → update 3 (replaces) KTable has 1 entry: alice → $70
The key insight:
- KStream = facts that happened (immutable)
- KTable = current state (mutable by key)
builder.stream() you get KStream - all events.
When you do builder.table() you get KTable - latest per key.Aggregations convert KStream to KTable because aggregation result is 'current state' not 'event stream'."
Q3: "What happens when a Kafka Streams application restarts?"
-
Consumer group rebalance - Partitions reassigned to remaining/new instances
-
State restoration - Local state stores rebuilt from changelog topics:
Changelog topic: app-id-store-changelog Contains all state updates Replayed to rebuild RocksDB state -
Offset management - Resumes from last committed offset (exactly-once) or might reprocess (at-least-once depending on config)
-
Standby replicas (if configured) - Can speed up failover:JAVACodeLoading syntax highlighter...
For exactly-once, the transaction is atomic:
- Process records
- Update state
- Commit offsets
All happen together, so restart picks up cleanly.
Q4: "How do you handle late-arriving data in Kafka Streams?"
-
Grace period - Allow late data within window:JAVA(4 lines)CodeLoading syntax highlighter...
-
Suppress - Emit only final results:JAVA(2 lines)CodeLoading syntax highlighter...
-
Custom timestamp extractor - Use event time:JAVA(2 lines)CodeLoading syntax highlighter...
-
Handling out-of-order:JAVA(2 lines)CodeLoading syntax highlighter...
For production, I typically:
- Use event timestamps, not processing time
- Set reasonable grace periods based on expected lateness
- Monitor suppression buffer sizes
- Have separate handling for very late data (dead letter)"
Q5: "How do you scale a Kafka Streams application?"
-
Horizontal - Add more instances:
3 partitions, 1 instance → each thread gets 1 partition 3 partitions, 3 instances → each instance gets 1 partition 3 partitions, 6 instances → 3 idle (waste) -
Increase partitions - More parallelism possible:BASHCodeLoading syntax highlighter...
Note: Requires rebalance, can affect ordering
-
Thread count per instance:JAVACodeLoading syntax highlighter...
One thread per partition is typical
-
Sub-topologies - Independent parts can scale separately
- If all threads busy → add instances or partitions
- If one partition lagged → check for hot key
- If state store slow → add standby replicas or better disk
The key constraint: you cannot have more active consumers than partitions."
Summary & Key Takeaways
Kafka Streams Core Concepts
┌──────────────────────────────────────────────────────────────────────────┐ │ KAFKA STREAMS SUMMARY │ ├────────────────────────────────┬─────────────────────────────────────────┤ │ Concept │ Key Point │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Architecture │ Library, not cluster - runs in your JVM │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ KStream │ Event stream - all records matter │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ KTable │ Changelog - latest value per key │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ GlobalKTable │ Full broadcast - every instance has all │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Topology │ DAG of processors - source→transform→sink│ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Parallelism │ Tied to partitions - max = partition # │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ State │ Local stores + changelog for recovery │ └────────────────────────────────┴─────────────────────────────────────────┘
Essential Takeaways
-
Library, not framework - Kafka Streams runs in your application, no separate cluster needed.
-
KStream vs KTable - Events vs state. Use KStream for transactions, KTable for current state.
-
Prefer mapValues over map -
mapValues()preserves partitioning,map()may trigger repartition. -
Name your processors - Makes debugging and monitoring much easier.
-
Test with TopologyTestDriver - Fast, deterministic unit testing without real Kafka.
-
Parallelism = partitions - You cannot have more parallelism than input topic partitions.
Quick Reference
┌──────────────────────────────────────────────────────────────────────────┐ │ KAFKA STREAMS QUICK REFERENCE │ ├────────────────────────────────┬─────────────────────────────────────────┤ │ Operation │ Code │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Read stream │ builder.stream("topic") │ │ Read table │ builder.table("topic") │ │ Read global │ builder.globalTable("topic") │ │ Filter │ stream.filter((k,v) -> predicate) │ │ Transform value │ stream.mapValues(v -> newV) │ │ Transform key+value │ stream.map((k,v) -> KeyValue.pair(...)) │ │ Split stream │ stream.split().branch(...).branch(...) │ │ Merge streams │ stream1.merge(stream2) │ │ Group by key │ stream.groupByKey() │ │ Group by new key │ stream.groupBy((k,v) -> newKey) │ │ Count │ grouped.count() │ │ Aggregate │ grouped.aggregate(init, aggr) │ │ Write to topic │ stream.to("output-topic") │ └────────────────────────────────┴─────────────────────────────────────────┘