Devops

Windowing, Joins & Time Semantics

At a Glance

AspectDetails
GoalTime-based aggregations and joining streams correctly
Window TypesTumbling (fixed), Hopping (overlapping), Sliding, Session
Time SemanticsEvent time (preferred), Processing time, Ingestion time
Join TypesKStream-KStream, KStream-KTable, KStream-GlobalKTable
Grace PeriodHow long to wait for late-arriving data
SuppressionEmit only final results vs. continuous updates
PrerequisitesParts 15-16 (Streams fundamentals, state stores)

What You'll Learn

  • Choosing the right window type for your use case
  • Event time vs processing time trade-offs
  • Handling late-arriving and out-of-order data
  • Stream-stream joins with time windows
  • Stream-table joins for enrichment
  • Suppression for exactly-once aggregation output
  • Testing windowed operations

Production Story: The Billing System That Double-Charged

Thursday, 4:00 PM. Finance escalation.

"Customers are being billed multiple times for the same subscription period," the support lead reported. The billing system aggregated usage events into hourly windows and charged accordingly.

The problem: incomplete windows were being emitted.
JAVA(9 lines)
Code
Loading syntax highlighter...

Every time a new event arrived, the aggregation emitted an updated count. A customer with 10 events got billed 10 times—once per intermediate count.

The fix: Use suppress() to emit only final results:
JAVA(11 lines)
Code
Loading syntax highlighter...
The lesson: Kafka Streams emits continuously by default. For billing, deduplication, or any operation requiring complete results, you must explicitly suppress intermediate outputs.

Mental Model: Windows in Time

┌──────────────────────────────────────────────────────────────────────────┐
│                      WINDOW TYPES VISUALIZED                             │
└──────────────────────────────────────────────────────────────────────────┘

Time: 0    5    10   15   20   25   30   35   40   45   50   55   60 (min)
Events: *  *  *    *     *  * *     *  *    *     *  *    *  *  *  *

═══════════════════════════════════════════════════════════════════════════
TUMBLING WINDOW (size=15min)
Non-overlapping, fixed-size

├──────────────────┤├──────────────────┤├──────────────────┤├─────────────
│    Window 1      ││    Window 2      ││    Window 3      ││   Window 4
│   0-15 min       ││   15-30 min      ││   30-45 min      ││   45-60 min
│   4 events       ││   4 events       ││   4 events       ││   4 events
└──────────────────┘└──────────────────┘└──────────────────┘└─────────────

═══════════════════════════════════════════════════════════════════════════
HOPPING WINDOW (size=15min, advance=5min)
Overlapping windows

├──────────────────┤                                        Window 1: 0-15
    ├──────────────────┤                                    Window 2: 5-20
        ├──────────────────┤                                Window 3: 10-25
            ├──────────────────┤                            Window 4: 15-30
                                ...

Each event belongs to multiple windows (3 in this case)

═══════════════════════════════════════════════════════════════════════════
SLIDING WINDOW (size=10min, no fixed boundaries)
Window for each event pair within time difference

Event at T=12 creates window with all events from T=2 to T=22
Event at T=25 creates window with all events from T=15 to T=35

Used for: "all events within 10 minutes of each other"

═══════════════════════════════════════════════════════════════════════════
SESSION WINDOW (inactivity gap=10min)
Dynamic windows based on activity

*  *  *         *     *  * *              *  *    *
├───────┤       ├──────────┤             ├─────────┤
Session 1       Session 2                 Session 3
(gap > 10min)   (gap > 10min)

Windows merge when events arrive within gap

Deep Dive: Window Operations

Tumbling Windows

JAVA(67 lines)
Code
Loading syntax highlighter...

Hopping Windows

JAVA(18 lines)
Code
Loading syntax highlighter...

Sliding Windows

JAVA(14 lines)
Code
Loading syntax highlighter...

Session Windows

JAVA(24 lines)
Code
Loading syntax highlighter...

Deep Dive: Time Semantics

Three Types of Time

┌──────────────────────────────────────────────────────────────────────────┐
│                      TIME SEMANTICS                                      │
└──────────────────────────────────────────────────────────────────────────┘

                     Producer                Kafka                Consumer
  Event occurs         │                       │                      │
       │               │                       │                      │
       ▼               ▼                       ▼                      ▼
  ┌─────────┐    ┌───────────┐          ┌───────────┐          ┌───────────┐
  │ EVENT   │───▶│ Produces  │─────────▶│  Stored   │─────────▶│ Processed │
  │ TIME    │    │           │          │           │          │           │
  │         │    │ INGESTION │          │           │          │ PROCESSING│
  │ T = 0   │    │ TIME      │          │           │          │ TIME      │
  │         │    │ T = 100ms │          │           │          │ T = 5s    │
  └─────────┘    └───────────┘          └───────────┘          └───────────┘

  EVENT TIME       When the event actually occurred (business time)
  ───────────      Set by producer, embedded in message
                   ✅ Deterministic results
                   ✅ Handles late/out-of-order data
                   ❌ Requires timestamp extraction

  INGESTION TIME   When Kafka received the message
  ──────────────   Set by broker (CreateTime)
                   ✅ No extraction needed
                   ❌ Doesn't reflect actual event time
                   ❌ Affected by producer delays

  PROCESSING TIME  When consumer processes the message
  ───────────────  Current wall-clock time
                   ✅ Simplest to use
                   ❌ Non-deterministic (varies with consumer speed)
                   ❌ Different results on replay

Configuring Timestamp Extraction

JAVA(45 lines)
Code
Loading syntax highlighter...

Handling Late Data

JAVA(35 lines)
Code
Loading syntax highlighter...

Deep Dive: Stream-Stream Joins

Join Concepts

┌──────────────────────────────────────────────────────────────────────────┐
│                    STREAM-STREAM JOINS                                   │
└──────────────────────────────────────────────────────────────────────────┘

  KStream-KStream joins require a TIME WINDOW because:
  - Both streams are infinite
  - Without time bounds, we'd need infinite state

  Stream A: Orders         Stream B: Payments
  ────────────────         ─────────────────
  [order-1, T=0]           [pay-1, T=2]     ← Match! Within window
  [order-2, T=5]           [pay-2, T=100]   ← No match (too late)
  [order-3, T=10]          [pay-3, T=12]    ← Match!

  Join Window: 10 seconds
  ┌─────────────────────────────────────────────────────────────────────┐
  │ For each record, look for matches in OTHER stream within ±10s       │
  │                                                                     │
  │ order-1 (T=0) joins with payments from T=-10 to T=10                │
  │ pay-1 (T=2) is in range → MATCH                                     │
  │                                                                     │
  │ order-2 (T=5) joins with payments from T=-5 to T=15                 │
  │ pay-2 (T=100) is out of range → NO MATCH                            │
  └─────────────────────────────────────────────────────────────────────┘

Stream-Stream Join Example

JAVA(70 lines)
Code
Loading syntax highlighter...

Asymmetric Join Windows

JAVA(15 lines)
Code
Loading syntax highlighter...

Deep Dive: Stream-Table Joins

KStream-KTable Join

┌──────────────────────────────────────────────────────────────────────────┐
│                    STREAM-TABLE JOINS                                    │
└──────────────────────────────────────────────────────────────────────────┘

  KStream-KTable: Enrich stream with table lookup

  KStream (orders)          KTable (customers)
  ─────────────────         ─────────────────
  [order-1, cust=A]         [A → "Alice, VIP"]
  [order-2, cust=B]         [B → "Bob, Regular"]
  [order-3, cust=C]   JOIN  [C → null]  (not found)
         │                        │
         └────────────┬───────────┘
                      ▼
  ┌─────────────────────────────────────────────────────────────────────┐
  │ [order-1, "Alice, VIP"]                                             │
  │ [order-2, "Bob, Regular"]                                           │
  │ [order-3, null]           ← Left join: customer C not in table      │
  └─────────────────────────────────────────────────────────────────────┘

  NO TIME WINDOW NEEDED:
  - Table represents "current state"
  - Stream record joins with table value AT THAT MOMENT
  - Table updates reflect in future stream joins
JAVA(56 lines)
Code
Loading syntax highlighter...

KStream-GlobalKTable Join

JAVA(26 lines)
Code
Loading syntax highlighter...

Deep Dive: Co-Partitioning Requirements

When Co-Partitioning Is Required

┌──────────────────────────────────────────────────────────────────────────┐
│                    CO-PARTITIONING FOR JOINS                             │
└──────────────────────────────────────────────────────────────────────────┘

  For KStream-KStream and KStream-KTable joins:
  - Both topics MUST have same number of partitions
  - Same key MUST map to same partition number

  CORRECT: Co-partitioned
  ─────────────────────────
  orders (4 partitions)         customers (4 partitions)
  key=A → partition 1           key=A → partition 1  ✅
  key=B → partition 2           key=B → partition 2  ✅
  key=C → partition 0           key=C → partition 0  ✅

  Instance 1 gets partition 0,1: has both order and customer for A, C
  Instance 2 gets partition 2,3: has both order and customer for B


  INCORRECT: Not co-partitioned
  ──────────────────────────────
  orders (4 partitions)         customers (6 partitions)
  key=A → partition 1           key=A → partition 4  ❌
  key=B → partition 2           key=B → partition 1  ❌

  Instance 1 has order for A, but customer A is on different instance!
  JOIN FAILS


  FIX: Repartition one or both topics
  ────────────────────────────────────
  KStream<String, Order> orders = builder.stream("orders")
      .selectKey((k, v) -> v.getCustomerId());  // Forces repartition

  // Creates internal repartition topic with correct partitioning

Verifying Co-Partitioning

JAVA(28 lines)
Code
Loading syntax highlighter...

Deep Dive: Suppression Patterns

Emit Only Final Results

JAVA(28 lines)
Code
Loading syntax highlighter...

Emit After Time Limit

JAVA(14 lines)
Code
Loading syntax highlighter...

Testing Windowed Operations

JAVA(99 lines)
Code
Loading syntax highlighter...

Common Mistakes

1. Missing Grace Period for Distributed Systems

JAVA(9 lines)
Code
Loading syntax highlighter...

2. Forgetting Suppression for "Exactly-Once" Semantics

JAVA(12 lines)
Code
Loading syntax highlighter...

3. Not Re-Keying Before Table Join

JAVA(10 lines)
Code
Loading syntax highlighter...

4. Using GlobalKTable for Large Data

JAVA(9 lines)
Code
Loading syntax highlighter...

Debug This: The Window That Won't Close

Windowed aggregations are never producing output:

JAVA(11 lines)
Code
Loading syntax highlighter...

Events are flowing. Why no output?

Answer
The issue: Windows close based on stream time, not wall-clock time.
Stream time only advances when new records arrive. If events stop flowing, stream time stops advancing, and windows never close.
Scenario:
Events arrive at T=0, T=30s, T=50s
Window [0-60s] should close at T=60s + grace
But no event arrives after T=50s
Stream time stuck at T=50s
Window never closes!
Solutions:
  1. Punctuation with wall-clock time (Processor API):
    JAVA(5 lines)
    Code
    Loading syntax highlighter...
  2. Synthetic heartbeat events:
    JAVA(8 lines)
    Code
    Loading syntax highlighter...
  3. Use wall-clock based suppression:
    JAVA(4 lines)
    Code
    Loading syntax highlighter...
  4. Lower grace period if late data is acceptable to lose:
    JAVA(3 lines)
    Code
    Loading syntax highlighter...

The root cause is that Kafka Streams is event-driven - without events, time doesn't advance.

Exercises

Exercise 1: Fraud Detection with Session Windows

Build a topology that:

  • Groups transactions by user
  • Detects sessions (30-minute inactivity gap)
  • Alerts if > 10 transactions in a session
  • Includes session duration in alert

Exercise 2: Stream-Stream Join with Asymmetric Windows

Create a join between:

  • orders stream
  • shipments stream (can arrive 1-7 days after order)
  • Output orders that haven't shipped within SLA

Exercise 3: Implement Late Data Handling

Build a pipeline that:

  • Processes events with 5-minute tumbling windows
  • Routes late events (> 10 minutes) to DLQ
  • Tracks percentage of late events as metric

Exercise 4: Multi-Way Join

Create a topology joining three streams:

  • orders
  • payments
  • shipments All within a 1-hour window

Exercise 5: Exactly-Once Billing Aggregation

Implement hourly billing with:

  • Tumbling windows with grace period
  • Suppression for final results only
  • Idempotent output to billing topic
  • Tests verifying no duplicate bills

Interview Questions

Q1: "What's the difference between event time and processing time? When would you use each?"

What they're looking for: Time semantics understanding
Strong answer: "Event time is when the event actually occurred (embedded in the message). Processing time is when the consumer processes it (wall-clock).

I prefer event time for most cases:

JAVA(2 lines)
Code
Loading syntax highlighter...
Event time advantages:
  • Deterministic results on replay
  • Handles late-arriving data correctly
  • Results reflect business reality
Processing time use cases:
  • Monitoring/alerting where 'now' matters
  • Simple scenarios where event timing isn't critical
  • Legacy systems without embedded timestamps

The key challenge with event time is handling late data. I typically use:

  • Grace periods for expected lateness
  • Watermarks (implicit in Kafka Streams via stream time)
  • Dead letter queues for extremely late data

If an event's timestamp is '5 minutes ago' but I'm processing now, event time correctly windows it with other events from that time period."

Q2: "How do stream-stream joins work in Kafka Streams? What's the windowing requirement?"

What they're looking for: Join mechanics understanding
Strong answer: "Stream-stream joins require a time window because both streams are unbounded. Without it, we'd need infinite state.

The join mechanics:

Stream A: event at T=100
Stream B: find matching events where T in [100-window, 100+window]
Implementation:
JAVA(8 lines)
Code
Loading syntax highlighter...
State required:
  • Both streams buffered for window duration
  • Window store for each stream
  • Co-partitioning required (same key → same partition)
Join types:
  • Inner: output only when both match
  • Left: output A even if no B match (B can be null)
  • Outer: output both unmatched A and unmatched B
The window can be asymmetric with .before() and .after() for cases like 'payment must come after order'."

Q3: "Explain suppression in Kafka Streams. When is it necessary?"

What they're looking for: Understanding of continuous vs. final results
Strong answer: "By default, KTable aggregations emit on every update. Suppression holds results until a condition is met.
Without suppression:
Window [0-60s]: 5 events arrive
Emits: 1, 2, 3, 4, 5 (5 separate emissions)
With suppression:
JAVA(2 lines)
Code
Loading syntax highlighter...
When to use:
  1. Billing/Charging - Bill once per period, not per event
  2. Notifications - Send summary, not flood user
  3. Downstream systems expecting final values
  4. Deduplication scenarios
Suppression types:
JAVA(5 lines)
Code
Loading syntax highlighter...
The catch: untilWindowCloses relies on stream time advancing, which requires new events. For low-volume streams, consider heartbeats or time-limit based suppression."

Q4: "What is co-partitioning and why does it matter for joins?"

What they're looking for: Distributed join understanding
Strong answer: "Co-partitioning means two topics have:
  1. Same number of partitions
  2. Same partitioning strategy (same key → same partition number)
Why it matters:
Instance 1 processes: orders partition 0, customers partition 0
Instance 2 processes: orders partition 1, customers partition 1

For a join on customerId:
- Customer 'alice' in orders-p0 must match customer 'alice' in customers-p0
- Both must be on the same instance for local join
If not co-partitioned:
orders (4 partitions):  alice → p1
customers (6 partitions): alice → p4
Different partition numbers → different instances → join fails!
How to fix:
JAVA(3 lines)
Code
Loading syntax highlighter...
GlobalKTable exception:
JAVA(4 lines)
Code
Loading syntax highlighter...

Best practice: Design topics with co-partitioning in mind upfront. Repartitioning is expensive."

Q5: "How do you handle late-arriving data in windowed aggregations?"

What they're looking for: Production-ready late data handling
Strong answer: "Multiple strategies depending on requirements:
1. Grace Period (accept late data):
JAVA(3 lines)
Code
Loading syntax highlighter...
2. Side Output for Late Data:
JAVA(4 lines)
Code
Loading syntax highlighter...
3. Late Data Topic:
JAVA(3 lines)
Code
Loading syntax highlighter...
4. Reprocessing Strategy:
  • Store late events
  • Periodically replay to generate corrections
  • Used for financial reconciliation
Monitoring:
  • Track percentage of late events
  • Alert on unexpected lateness patterns
  • Dashboard showing lateness distribution
The trade-off: Longer grace period → more accurate results but:
  • More memory (larger window stores)
  • Higher latency (results delayed)
  • More state to restore on failure

For billing: I typically use 15-30 minute grace, with DLQ for anything later, and daily reconciliation job for DLQ events."

Summary & Key Takeaways

Windowing & Joins Summary

┌──────────────────────────────────────────────────────────────────────────┐
│                    WINDOWING & JOINS SUMMARY                             │
├────────────────────────────────┬─────────────────────────────────────────┤
│ Window Type                    │ Use Case                                │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Tumbling                       │ Fixed periods (hourly/daily aggregates) │
│ Hopping                        │ Rolling windows (5-min avg every 1-min) │
│ Sliding                        │ Time difference between events          │
│ Session                        │ Activity-based (user sessions)          │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Join Type                      │ Requirement                             │
├────────────────────────────────┼─────────────────────────────────────────┤
│ KStream-KStream                │ Time window + co-partitioning           │
│ KStream-KTable                 │ Co-partitioning only                    │
│ KStream-GlobalKTable           │ None (full data on every instance)      │
└────────────────────────────────┴─────────────────────────────────────────┘

Essential Takeaways

  1. Always use event time - Processing time gives non-deterministic results on replay.
  2. Always have grace periods - Distributed systems have clock skew and network delays.
  3. Use suppression for final results - Without it, you get continuous updates, not once-per-window.
  4. Co-partitioning is mandatory - KStream-KStream and KStream-KTable joins fail without it.
  5. GlobalKTable for small lookups - Avoids co-partitioning but duplicates data on every instance.
  6. Test with time advancement - TopologyTestDriver lets you control stream time precisely.

Quick Reference

┌──────────────────────────────────────────────────────────────────────────┐
│                    WINDOWING QUICK REFERENCE                             │
├────────────────────────────────┬─────────────────────────────────────────┤
│ Window Definition              │ Code                                    │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Tumbling                       │ TimeWindows.ofSizeAndGrace(size, grace) │
│ Hopping                        │ TimeWindows.of(size).advanceBy(advance) │
│ Sliding                        │ SlidingWindows.ofTimeDifferenceAndGrace │
│ Session                        │ SessionWindows.ofInactivityGapAndGrace  │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Join Operations                │                                         │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Stream-Stream Inner            │ stream1.join(stream2, joiner, window)   │
│ Stream-Stream Left             │ stream1.leftJoin(stream2, joiner, win)  │
│ Stream-Stream Outer            │ stream1.outerJoin(stream2, joiner, win) │
│ Stream-Table                   │ stream.join(table, joiner)              │
│ Stream-GlobalTable             │ stream.join(globalTable, keyMapper, j)  │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Suppression                    │                                         │
├────────────────────────────────┼─────────────────────────────────────────┤
│ Until window closes            │ Suppressed.untilWindowCloses(buffer)    │
│ Until time limit               │ Suppressed.untilTimeLimit(dur, buffer)  │
└────────────────────────────────┴─────────────────────────────────────────┘

Series Navigation

Series Overview: Kafka Compendium Series