Devops

Streams Fundamentals & Topologies

At a Glance

AspectDetails
GoalBuild stream processing applications with Kafka Streams
ArchitectureLibrary, not framework - runs in your JVM
DSL vs Processor APIHigh-level declarative vs low-level imperative
Key AbstractionsKStream (event stream), KTable (changelog), GlobalKTable (broadcast)
TopologyDirected acyclic graph of stream processors
Spring IntegrationSpring Kafka Streams with functional style
PrerequisitesParts 1-14 (core Kafka concepts)

What You'll Learn

  • Kafka Streams architecture and how it differs from other frameworks
  • Building topologies with the high-level DSL
  • KStream vs KTable: when to use each
  • Stream transformations: map, filter, flatMap, branch
  • The Processor API for custom logic
  • Testing Kafka Streams applications
  • Spring Kafka Streams integration

Production Story: The Real-Time Fraud Detector

Tuesday, 10:00 AM. A fintech company's risk team.

"We need to detect fraud in real-time," the product manager explained. "Right now, our batch job runs every hour. By the time we catch fraud, the money is gone."

The team evaluated options:

  • Apache Flink: Powerful, but complex cluster management
  • Apache Spark Streaming: Required Spark cluster, microbatching latency
  • Custom consumers: Would need to build everything from scratch
Then someone suggested Kafka Streams:
  • No separate cluster needed - just a Java library
  • Exactly-once semantics built-in
  • State management handled automatically
  • Scales with Kafka partitions
JAVA(11 lines)
Code
Loading syntax highlighter...

Within a week, they had real-time fraud detection running. No new infrastructure, no cluster management—just a Java application that scaled automatically with their Kafka deployment.

The key insight: Kafka Streams turns stream processing from an infrastructure problem into a library problem.

Mental Model: Kafka Streams Architecture

┌──────────────────────────────────────────────────────────────────────────┐
│                    KAFKA STREAMS ARCHITECTURE                            │
└──────────────────────────────────────────────────────────────────────────┘

  OTHER FRAMEWORKS                       KAFKA STREAMS
  ─────────────────                      ─────────────────

  ┌─────────────────┐                   ┌─────────────────┐
  │   Your Code     │                   │   Your Code     │
  └────────┬────────┘                   │   + Streams     │
           │                            │   Library       │
           ▼                            └────────┬────────┘
  ┌─────────────────┐                            │
  │ Submit to       │                            │
  │ Cluster         │                   No separate cluster!
  └────────┬────────┘                            │
           │                                     ▼
           ▼                            ┌─────────────────┐
  ┌─────────────────┐                   │  Your JVM       │
  │ Separate        │                   │  (App Server,   │
  │ Processing      │                   │   Container,    │
  │ Cluster         │                   │   etc.)         │
  │                 │                   └────────┬────────┘
  │ • Flink         │                            │
  │ • Spark         │                            │
  │ • etc.          │                            │
  └────────┬────────┘                            │
           │                                     │
           ▼                                     ▼
  ┌─────────────────┐                   ┌─────────────────┐
  │     Kafka       │                   │     Kafka       │
  └─────────────────┘                   └─────────────────┘

The Topology: Heart of Kafka Streams

┌──────────────────────────────────────────────────────────────────────────┐
│                         STREAM TOPOLOGY                                  │
└──────────────────────────────────────────────────────────────────────────┘

  A topology is a directed acyclic graph (DAG) of processors:

                        ┌───────────────┐
                        │    SOURCE     │  ← Reads from Kafka topic
                        │  (orders)     │
                        └───────┬───────┘
                                │
                                ▼
                        ┌───────────────┐
                        │    FILTER     │  ← Removes cancelled orders
                        │ status=VALID  │
                        └───────┬───────┘
                                │
                   ┌────────────┴────────────┐
                   ▼                         ▼
           ┌───────────────┐         ┌───────────────┐
           │     MAP       │         │   AGGREGATE   │  ← State store
           │ enrich data   │         │  by customer  │
           └───────┬───────┘         └───────┬───────┘
                   │                         │
                   ▼                         ▼
           ┌───────────────┐         ┌───────────────┐
           │     SINK      │         │     SINK      │
           │ (enriched-    │         │ (customer-    │
           │  orders)      │         │  totals)      │
           └───────────────┘         └───────────────┘

  Each processor:
  • Receives records from upstream
  • Transforms, filters, or aggregates
  • Passes results downstream

Deep Dive: KStream vs KTable

The Fundamental Difference

┌──────────────────────────────────────────────────────────────────────────┐
│                     KSTREAM VS KTABLE                                    │
├─────────────────────────────────┬────────────────────────────────────────┤
│           KStream               │             KTable                     │
├─────────────────────────────────┼────────────────────────────────────────┤
│ Event stream (all events)       │ Changelog (latest per key)             │
│                                 │                                        │
│ Records: Immutable facts        │ Records: Updates to state              │
│                                 │                                        │
│ "Alice bought coffee"           │ "Alice's balance is $100"              │
│ "Alice bought tea"              │ "Alice's balance is $85"               │
│ "Alice bought coffee"           │                                        │
│                                 │                                        │
│ → All 3 records preserved       │ → Only latest balance kept             │
├─────────────────────────────────┼────────────────────────────────────────┤
│ Use when:                       │ Use when:                              │
│ • Every event matters           │ • Only latest value matters            │
│ • Processing transactions       │ • Reference data / lookups             │
│ • Log of changes                │ • Aggregation results                  │
└─────────────────────────────────┴────────────────────────────────────────┘

Visualizing the Difference

  Topic: user-events (compacted)

  Input sequence:
  ┌─────────────────────────────────────────────────────────────────────┐
  │  Key     │  Value          │  Offset  │                             │
  ├──────────┼─────────────────┼──────────┤                             │
  │  alice   │  {"age": 25}    │    0     │                             │
  │  bob     │  {"age": 30}    │    1     │                             │
  │  alice   │  {"age": 26}    │    2     │  ← Alice had birthday       │
  │  alice   │  {"age": 26}    │    3     │  ← Duplicate event          │
  └─────────────────────────────────────────────────────────────────────┘

  KStream view (all records):             KTable view (latest per key):
  ┌────────────────────────────┐          ┌────────────────────────────┐
  │  alice → {"age": 25}       │          │                            │
  │  bob   → {"age": 30}       │          │  alice → {"age": 26}       │
  │  alice → {"age": 26}       │          │  bob   → {"age": 30}       │
  │  alice → {"age": 26}       │          │                            │
  └────────────────────────────┘          └────────────────────────────┘
       4 records processed                     2 entries in table

Code Example: KStream vs KTable

JAVA(44 lines)
Code
Loading syntax highlighter...

Deep Dive: Stream Transformations

Stateless Operations

JAVA(85 lines)
Code
Loading syntax highlighter...

The Transformation Flow

┌──────────────────────────────────────────────────────────────────────────┐
│                  TRANSFORMATION OPERATIONS                               │
└──────────────────────────────────────────────────────────────────────────┘

  FILTER                          MAP                         FLATMAP
  ──────                          ───                         ───────
  ┌───┐                          ┌───┐                       ┌───┐
  │ A │───┐                      │ A │──▶ │ A'│              │ A │──▶ │ A1│
  └───┘   │    Keep if           └───┘                       └───┘    │ A2│
  ┌───┐   │    predicate         ┌───┐                       ┌───┐    │ A3│
  │ B │───┼──▶ true              │ B │──▶ │ B'│              │ B │──▶ │ B1│
  └───┘   │                      └───┘                       └───┘
  ┌───┐   │                      ┌───┐                       ┌───┐
  │ C │───┘                      │ C │──▶ │ C'│              │ C │──▶ (none)
  └───┘                          └───┘                       └───┘

  Output: subset                  Output: same count          Output: 0..N per input


  BRANCH                                    MERGE
  ──────                                    ─────
       ┌───────────────┐
       │    Input      │                Stream 1    Stream 2
       │    Stream     │                    │           │
       └───────┬───────┘                    ▼           ▼
               │                        ┌───────────────────┐
    ┌──────────┼──────────┐             │      Merged       │
    │          │          │             │      Stream       │
    ▼          ▼          ▼             └───────────────────┘
  ┌───┐      ┌───┐      ┌───┐
  │ A │      │ B │      │ C │          Interleaved by arrival
  └───┘      └───┘      └───┘          time
  High      Priority   Standard
  Value

Deep Dive: GlobalKTable for Broadcast Data

When to Use GlobalKTable

┌──────────────────────────────────────────────────────────────────────────┐
│                    KTABLE VS GLOBALTABLE                                 │
├─────────────────────────────────┬────────────────────────────────────────┤
│           KTable                │          GlobalKTable                  │
├─────────────────────────────────┼────────────────────────────────────────┤
│ Partitioned like input topic    │ Full copy on every instance            │
│                                 │                                        │
│ Instance sees subset of data    │ Instance sees ALL data                 │
│                                 │                                        │
│ Join requires co-partitioning   │ Join works with any key                │
│                                 │                                        │
│ Scales with partitions          │ Memory usage × instance count          │
├─────────────────────────────────┼────────────────────────────────────────┤
│ Use for:                        │ Use for:                               │
│ • Large tables                  │ • Small reference data                 │
│ • Data already partitioned      │ • Lookup tables                        │
│                                 │ • Configuration                        │
└─────────────────────────────────┴────────────────────────────────────────┘

GlobalKTable Example

JAVA(33 lines)
Code
Loading syntax highlighter...

Visualization: Partitioned vs Global

┌──────────────────────────────────────────────────────────────────────────┐
│                    DATA DISTRIBUTION                                     │
└──────────────────────────────────────────────────────────────────────────┘

  KTable (partitioned):

  Topic: customers (6 partitions)

  Instance 1              Instance 2              Instance 3
  ┌─────────────┐        ┌─────────────┐        ┌─────────────┐
  │ Partition 0 │        │ Partition 2 │        │ Partition 4 │
  │ Partition 1 │        │ Partition 3 │        │ Partition 5 │
  │             │        │             │        │             │
  │ Alice, Bob  │        │ Carol, Dan  │        │ Eve, Frank  │
  └─────────────┘        └─────────────┘        └─────────────┘

  Each instance only sees SOME customers


  GlobalKTable (broadcast):

  Topic: products (3 partitions)

  Instance 1              Instance 2              Instance 3
  ┌─────────────┐        ┌─────────────┐        ┌─────────────┐
  │ ALL         │        │ ALL         │        │ ALL         │
  │ Partitions  │        │ Partitions  │        │ Partitions  │
  │             │        │             │        │             │
  │ All 1000    │        │ All 1000    │        │ All 1000    │
  │ products    │        │ products    │        │ products    │
  └─────────────┘        └─────────────┘        └─────────────┘

  Every instance sees ALL products

Deep Dive: Aggregations

Basic Aggregations

JAVA(63 lines)
Code
Loading syntax highlighter...

Aggregation with Subtraction (for Tombstones)

JAVA(17 lines)
Code
Loading syntax highlighter...

Deep Dive: The Processor API

When to Use Processor API

┌──────────────────────────────────────────────────────────────────────────┐
│                    DSL VS PROCESSOR API                                  │
├─────────────────────────────────┬────────────────────────────────────────┤
│           DSL                   │         Processor API                  │
├─────────────────────────────────┼────────────────────────────────────────┤
│ Declarative, fluent             │ Imperative, full control               │
│ builder.stream().filter()...    │ process(), punctuate()                 │
│                                 │                                        │
│ Handles state stores            │ Manual state store access              │
│ automatically                   │                                        │
│                                 │                                        │
│ Limited to predefined ops       │ Any custom logic                       │
├─────────────────────────────────┼────────────────────────────────────────┤
│ Use when:                       │ Use when:                              │
│ • Standard transformations      │ • Custom windowing logic               │
│ • Aggregations, joins           │ • Periodic actions (punctuate)         │
│ • Most use cases                │ • Complex state machine                │
│                                 │ • Accessing record metadata            │
└─────────────────────────────────┴────────────────────────────────────────┘

Custom Processor Example

JAVA(92 lines)
Code
Loading syntax highlighter...

Registering the Processor

JAVA(26 lines)
Code
Loading syntax highlighter...

Deep Dive: Spring Kafka Streams

Functional Style Configuration

JAVA(34 lines)
Code
Loading syntax highlighter...

Spring Cloud Stream Functional Style

JAVA(40 lines)
Code
Loading syntax highlighter...
YAML(17 lines)
Code
Loading syntax highlighter...

Deep Dive: Topology Visualization

Describing the Topology

JAVA(14 lines)
Code
Loading syntax highlighter...

Example Topology Output

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [orders])
      --> KSTREAM-FILTER-0000000001
    Processor: KSTREAM-FILTER-0000000001 (stores: [])
      --> KSTREAM-MAPVALUES-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
      --> KSTREAM-SINK-0000000003
      <-- KSTREAM-FILTER-0000000001
    Sink: KSTREAM-SINK-0000000003 (topic: enriched-orders)
      <-- KSTREAM-MAPVALUES-0000000002

   Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000004 (topics: [enriched-orders])
      --> KSTREAM-AGGREGATE-0000000005
    Processor: KSTREAM-AGGREGATE-0000000005 (stores: [customer-stats])
      --> KSTREAM-TOSTREAM-0000000006
      <-- KSTREAM-SOURCE-0000000004
    Processor: KSTREAM-TOSTREAM-0000000006 (stores: [])
      --> KSTREAM-SINK-0000000007
      <-- KSTREAM-AGGREGATE-0000000005
    Sink: KSTREAM-SINK-0000000007 (topic: customer-statistics)
      <-- KSTREAM-TOSTREAM-0000000006

Deep Dive: Testing Kafka Streams

TopologyTestDriver

JAVA(101 lines)
Code
Loading syntax highlighter...

Testing with State Stores

JAVA(45 lines)
Code
Loading syntax highlighter...

Common Mistakes

1. Modifying Input Objects

JAVA(18 lines)
Code
Loading syntax highlighter...

2. Using map() When mapValues() Suffices

JAVA(11 lines)
Code
Loading syntax highlighter...

3. Not Naming Processors

JAVA(11 lines)
Code
Loading syntax highlighter...

4. GlobalKTable for Large Data

JAVA(12 lines)
Code
Loading syntax highlighter...

Debug This: The Duplicated Aggregates

An aggregation is producing duplicate or incorrect counts:

JAVA(7 lines)
Code
Loading syntax highlighter...
What's causing the duplicates? (Answer below)
Answer
Possible causes:
  1. Reprocessing on restart - If auto.offset.reset=earliest and the application restarts, it reprocesses old messages:
    JAVA(3 lines)
    Code
    Loading syntax highlighter...
  2. Multiple instances with same application.id - Each instance processes same data:
    JAVA(3 lines)
    Code
    Loading syntax highlighter...
  3. Upstream duplication - Producer retries creating duplicates:
    JAVA(8 lines)
    Code
    Loading syntax highlighter...
  4. State store not persisted - Using in-memory stores that reset:
    JAVA(4 lines)
    Code
    Loading syntax highlighter...
  5. Multiple topologies - Accidentally creating duplicate processors:
    JAVA(2 lines)
    Code
    Loading syntax highlighter...
Debugging steps:
  1. Check state store changelog topic for duplicates
  2. Verify application.id is consistent
  3. Check if producer is idempotent
  4. Enable Streams metrics for rebalance count
  5. Log every input record to trace source

Exercises

Exercise 1: Build an Alert Aggregator

Create a topology that:

  • Reads from alerts topic
  • Groups by alertType
  • Counts alerts per type in 5-minute tumbling windows
  • Outputs counts exceeding threshold to alert-summaries

Exercise 2: Implement Deduplication

Build a processor that:

  • Tracks seen message IDs in a state store
  • Filters duplicates within a 1-hour window
  • Cleans up old entries using punctuate

Exercise 3: Join Three Streams

Create a topology joining:

  • orders (KStream)
  • customers (KTable)
  • products (GlobalKTable)
  • Output: enriched orders with customer and product details

Exercise 4: Write Comprehensive Tests

Using TopologyTestDriver:

  • Test stateless transformations
  • Test windowed aggregations with time advancement
  • Test state store queries
  • Test error handling

Exercise 5: Topology Optimization

Given a topology, identify and fix:

  • Unnecessary repartitions
  • Missing named processors
  • Inefficient GlobalKTable usage
  • State store configuration issues

Interview Questions

What they're looking for: Architecture trade-off understanding
Strong answer: "The choice depends on several factors:
Choose Kafka Streams when:
  • Data already in Kafka (no connector overhead)
  • Simple to medium complexity processing
  • Want to avoid cluster management
  • Need exactly-once with Kafka sink/source
  • Team knows Java/Kotlin
  • Deploying as microservices
Choose Flink when:
  • Complex event processing (CEP)
  • Need SQL interface
  • Multi-source/sink (not just Kafka)
  • Advanced windowing (event-time, sessions)
  • Team has Flink expertise
  • Need managed state with checkpointing across sources
Choose Spark Streaming when:
  • Already have Spark infrastructure
  • Batch + stream unified processing
  • ML model serving in stream
  • Need DataFrame API

Kafka Streams is 'just a library' - that's both its strength (simplicity) and limitation (single runtime)."

Q2: "Explain the difference between KStream and KTable with a concrete example."

What they're looking for: Core concept understanding
Strong answer: "Let me use a concrete example: tracking user purchases.
KStream for purchase events:
alice buys coffee    → record 1
alice buys sandwich  → record 2
alice buys coffee    → record 3

KStream sees all 3 records. Total: 3 purchases.
KTable for user balance:
alice balance $100  → update 1
alice balance $85   → update 2 (replaces)
alice balance $70   → update 3 (replaces)

KTable has 1 entry: alice → $70

The key insight:

  • KStream = facts that happened (immutable)
  • KTable = current state (mutable by key)
When you do builder.stream() you get KStream - all events. When you do builder.table() you get KTable - latest per key.

Aggregations convert KStream to KTable because aggregation result is 'current state' not 'event stream'."

Q3: "What happens when a Kafka Streams application restarts?"

What they're looking for: Understanding of fault tolerance
Strong answer: "Several things happen:
  1. Consumer group rebalance - Partitions reassigned to remaining/new instances
  2. State restoration - Local state stores rebuilt from changelog topics:
    Changelog topic: app-id-store-changelog
    Contains all state updates
    Replayed to rebuild RocksDB state
    
  3. Offset management - Resumes from last committed offset (exactly-once) or might reprocess (at-least-once depending on config)
  4. Standby replicas (if configured) - Can speed up failover:
    JAVA
    Code
    Loading syntax highlighter...

For exactly-once, the transaction is atomic:

  • Process records
  • Update state
  • Commit offsets

All happen together, so restart picks up cleanly.

The key optimization is standby replicas - another instance keeps state store copy ready, so failover doesn't need full restoration."

Q4: "How do you handle late-arriving data in Kafka Streams?"

What they're looking for: Windowing and time semantics knowledge
Strong answer: "Kafka Streams provides several mechanisms:
  1. Grace period - Allow late data within window:
    JAVA(4 lines)
    Code
    Loading syntax highlighter...
  2. Suppress - Emit only final results:
    JAVA(2 lines)
    Code
    Loading syntax highlighter...
  3. Custom timestamp extractor - Use event time:
    JAVA(2 lines)
    Code
    Loading syntax highlighter...
  4. Handling out-of-order:
    JAVA(2 lines)
    Code
    Loading syntax highlighter...

For production, I typically:

  • Use event timestamps, not processing time
  • Set reasonable grace periods based on expected lateness
  • Monitor suppression buffer sizes
  • Have separate handling for very late data (dead letter)"

Q5: "How do you scale a Kafka Streams application?"

What they're looking for: Parallelism and partition understanding
Strong answer: "Kafka Streams parallelism is tied to partitions:
Maximum parallelism = number of input topic partitions
Scaling approaches:
  1. Horizontal - Add more instances:
    3 partitions, 1 instance → each thread gets 1 partition
    3 partitions, 3 instances → each instance gets 1 partition
    3 partitions, 6 instances → 3 idle (waste)
    
  2. Increase partitions - More parallelism possible:
    BASH
    Code
    Loading syntax highlighter...

    Note: Requires rebalance, can affect ordering

  3. Thread count per instance:
    JAVA
    Code
    Loading syntax highlighter...

    One thread per partition is typical

  4. Sub-topologies - Independent parts can scale separately
Bottleneck identification:
  • If all threads busy → add instances or partitions
  • If one partition lagged → check for hot key
  • If state store slow → add standby replicas or better disk

The key constraint: you cannot have more active consumers than partitions."

Summary & Key Takeaways

Kafka Streams Core Concepts

┌──────────────────────────────────────────────────────────────────────────┐
│                    KAFKA STREAMS SUMMARY                                 │
├────────────────────────────────┬─────────────────────────────────────────┤
│ Concept                        │ Key Point                               │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Architecture                   │ Library, not cluster - runs in your JVM │
├────────────────────────────────┼─────────────────────────────────────────┤
│ KStream                        │ Event stream - all records matter       │
├────────────────────────────────┼─────────────────────────────────────────┤
│ KTable                         │ Changelog - latest value per key        │
├────────────────────────────────┼─────────────────────────────────────────┤
│ GlobalKTable                   │ Full broadcast - every instance has all │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Topology                       │ DAG of processors - source→transform→sink│
├────────────────────────────────┼─────────────────────────────────────────┤
│ Parallelism                    │ Tied to partitions - max = partition #  │
├────────────────────────────────┼─────────────────────────────────────────┤
│ State                          │ Local stores + changelog for recovery   │
└────────────────────────────────┴─────────────────────────────────────────┘

Essential Takeaways

  1. Library, not framework - Kafka Streams runs in your application, no separate cluster needed.
  2. KStream vs KTable - Events vs state. Use KStream for transactions, KTable for current state.
  3. Prefer mapValues over map - mapValues() preserves partitioning, map() may trigger repartition.
  4. Name your processors - Makes debugging and monitoring much easier.
  5. Test with TopologyTestDriver - Fast, deterministic unit testing without real Kafka.
  6. Parallelism = partitions - You cannot have more parallelism than input topic partitions.

Quick Reference

┌──────────────────────────────────────────────────────────────────────────┐
│                    KAFKA STREAMS QUICK REFERENCE                         │
├────────────────────────────────┬─────────────────────────────────────────┤
│ Operation                      │ Code                                    │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Read stream                    │ builder.stream("topic")                 │
│ Read table                     │ builder.table("topic")                  │
│ Read global                    │ builder.globalTable("topic")            │
│ Filter                         │ stream.filter((k,v) -> predicate)       │
│ Transform value                │ stream.mapValues(v -> newV)             │
│ Transform key+value            │ stream.map((k,v) -> KeyValue.pair(...)) │
│ Split stream                   │ stream.split().branch(...).branch(...)  │
│ Merge streams                  │ stream1.merge(stream2)                  │
│ Group by key                   │ stream.groupByKey()                     │
│ Group by new key               │ stream.groupBy((k,v) -> newKey)         │
│ Count                          │ grouped.count()                         │
│ Aggregate                      │ grouped.aggregate(init, aggr)           │
│ Write to topic                 │ stream.to("output-topic")               │
└────────────────────────────────┴─────────────────────────────────────────┘

Series Navigation

Series Overview: Kafka Compendium Series