Windowing, Joins & Time Semantics
At a Glance
| Aspect | Details |
|---|---|
| Goal | Time-based aggregations and joining streams correctly |
| Window Types | Tumbling (fixed), Hopping (overlapping), Sliding, Session |
| Time Semantics | Event time (preferred), Processing time, Ingestion time |
| Join Types | KStream-KStream, KStream-KTable, KStream-GlobalKTable |
| Grace Period | How long to wait for late-arriving data |
| Suppression | Emit only final results vs. continuous updates |
| Prerequisites | Parts 15-16 (Streams fundamentals, state stores) |
What You'll Learn
- Choosing the right window type for your use case
- Event time vs processing time trade-offs
- Handling late-arriving and out-of-order data
- Stream-stream joins with time windows
- Stream-table joins for enrichment
- Suppression for exactly-once aggregation output
- Testing windowed operations
Production Story: The Billing System That Double-Charged
"Customers are being billed multiple times for the same subscription period," the support lead reported. The billing system aggregated usage events into hourly windows and charged accordingly.
JAVA(9 lines)CodeLoading syntax highlighter...
Every time a new event arrived, the aggregation emitted an updated count. A customer with 10 events got billed 10 times—once per intermediate count.
suppress() to emit only final results:JAVA(11 lines)CodeLoading syntax highlighter...
Mental Model: Windows in Time
┌──────────────────────────────────────────────────────────────────────────┐ │ WINDOW TYPES VISUALIZED │ └──────────────────────────────────────────────────────────────────────────┘ Time: 0 5 10 15 20 25 30 35 40 45 50 55 60 (min) Events: * * * * * * * * * * * * * * * * ═══════════════════════════════════════════════════════════════════════════ TUMBLING WINDOW (size=15min) Non-overlapping, fixed-size ├──────────────────┤├──────────────────┤├──────────────────┤├───────────── │ Window 1 ││ Window 2 ││ Window 3 ││ Window 4 │ 0-15 min ││ 15-30 min ││ 30-45 min ││ 45-60 min │ 4 events ││ 4 events ││ 4 events ││ 4 events └──────────────────┘└──────────────────┘└──────────────────┘└───────────── ═══════════════════════════════════════════════════════════════════════════ HOPPING WINDOW (size=15min, advance=5min) Overlapping windows ├──────────────────┤ Window 1: 0-15 ├──────────────────┤ Window 2: 5-20 ├──────────────────┤ Window 3: 10-25 ├──────────────────┤ Window 4: 15-30 ... Each event belongs to multiple windows (3 in this case) ═══════════════════════════════════════════════════════════════════════════ SLIDING WINDOW (size=10min, no fixed boundaries) Window for each event pair within time difference Event at T=12 creates window with all events from T=2 to T=22 Event at T=25 creates window with all events from T=15 to T=35 Used for: "all events within 10 minutes of each other" ═══════════════════════════════════════════════════════════════════════════ SESSION WINDOW (inactivity gap=10min) Dynamic windows based on activity * * * * * * * * * * ├───────┤ ├──────────┤ ├─────────┤ Session 1 Session 2 Session 3 (gap > 10min) (gap > 10min) Windows merge when events arrive within gap
Deep Dive: Window Operations
Tumbling Windows
JAVA(67 lines)CodeLoading syntax highlighter...
Hopping Windows
JAVA(18 lines)CodeLoading syntax highlighter...
Sliding Windows
JAVA(14 lines)CodeLoading syntax highlighter...
Session Windows
JAVA(24 lines)CodeLoading syntax highlighter...
Deep Dive: Time Semantics
Three Types of Time
┌──────────────────────────────────────────────────────────────────────────┐ │ TIME SEMANTICS │ └──────────────────────────────────────────────────────────────────────────┘ Producer Kafka Consumer Event occurs │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ ┌─────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ EVENT │───▶│ Produces │─────────▶│ Stored │─────────▶│ Processed │ │ TIME │ │ │ │ │ │ │ │ │ │ INGESTION │ │ │ │ PROCESSING│ │ T = 0 │ │ TIME │ │ │ │ TIME │ │ │ │ T = 100ms │ │ │ │ T = 5s │ └─────────┘ └───────────┘ └───────────┘ └───────────┘ EVENT TIME When the event actually occurred (business time) ─────────── Set by producer, embedded in message ✅ Deterministic results ✅ Handles late/out-of-order data ❌ Requires timestamp extraction INGESTION TIME When Kafka received the message ────────────── Set by broker (CreateTime) ✅ No extraction needed ❌ Doesn't reflect actual event time ❌ Affected by producer delays PROCESSING TIME When consumer processes the message ─────────────── Current wall-clock time ✅ Simplest to use ❌ Non-deterministic (varies with consumer speed) ❌ Different results on replay
Configuring Timestamp Extraction
JAVA(45 lines)CodeLoading syntax highlighter...
Handling Late Data
JAVA(35 lines)CodeLoading syntax highlighter...
Deep Dive: Stream-Stream Joins
Join Concepts
┌──────────────────────────────────────────────────────────────────────────┐ │ STREAM-STREAM JOINS │ └──────────────────────────────────────────────────────────────────────────┘ KStream-KStream joins require a TIME WINDOW because: - Both streams are infinite - Without time bounds, we'd need infinite state Stream A: Orders Stream B: Payments ──────────────── ───────────────── [order-1, T=0] [pay-1, T=2] ← Match! Within window [order-2, T=5] [pay-2, T=100] ← No match (too late) [order-3, T=10] [pay-3, T=12] ← Match! Join Window: 10 seconds ┌─────────────────────────────────────────────────────────────────────┐ │ For each record, look for matches in OTHER stream within ±10s │ │ │ │ order-1 (T=0) joins with payments from T=-10 to T=10 │ │ pay-1 (T=2) is in range → MATCH │ │ │ │ order-2 (T=5) joins with payments from T=-5 to T=15 │ │ pay-2 (T=100) is out of range → NO MATCH │ └─────────────────────────────────────────────────────────────────────┘
Stream-Stream Join Example
JAVA(70 lines)CodeLoading syntax highlighter...
Asymmetric Join Windows
JAVA(15 lines)CodeLoading syntax highlighter...
Deep Dive: Stream-Table Joins
KStream-KTable Join
┌──────────────────────────────────────────────────────────────────────────┐ │ STREAM-TABLE JOINS │ └──────────────────────────────────────────────────────────────────────────┘ KStream-KTable: Enrich stream with table lookup KStream (orders) KTable (customers) ───────────────── ───────────────── [order-1, cust=A] [A → "Alice, VIP"] [order-2, cust=B] [B → "Bob, Regular"] [order-3, cust=C] JOIN [C → null] (not found) │ │ └────────────┬───────────┘ ▼ ┌─────────────────────────────────────────────────────────────────────┐ │ [order-1, "Alice, VIP"] │ │ [order-2, "Bob, Regular"] │ │ [order-3, null] ← Left join: customer C not in table │ └─────────────────────────────────────────────────────────────────────┘ NO TIME WINDOW NEEDED: - Table represents "current state" - Stream record joins with table value AT THAT MOMENT - Table updates reflect in future stream joins
JAVA(56 lines)CodeLoading syntax highlighter...
KStream-GlobalKTable Join
JAVA(26 lines)CodeLoading syntax highlighter...
Deep Dive: Co-Partitioning Requirements
When Co-Partitioning Is Required
┌──────────────────────────────────────────────────────────────────────────┐ │ CO-PARTITIONING FOR JOINS │ └──────────────────────────────────────────────────────────────────────────┘ For KStream-KStream and KStream-KTable joins: - Both topics MUST have same number of partitions - Same key MUST map to same partition number CORRECT: Co-partitioned ───────────────────────── orders (4 partitions) customers (4 partitions) key=A → partition 1 key=A → partition 1 ✅ key=B → partition 2 key=B → partition 2 ✅ key=C → partition 0 key=C → partition 0 ✅ Instance 1 gets partition 0,1: has both order and customer for A, C Instance 2 gets partition 2,3: has both order and customer for B INCORRECT: Not co-partitioned ────────────────────────────── orders (4 partitions) customers (6 partitions) key=A → partition 1 key=A → partition 4 ❌ key=B → partition 2 key=B → partition 1 ❌ Instance 1 has order for A, but customer A is on different instance! JOIN FAILS FIX: Repartition one or both topics ──────────────────────────────────── KStream<String, Order> orders = builder.stream("orders") .selectKey((k, v) -> v.getCustomerId()); // Forces repartition // Creates internal repartition topic with correct partitioning
Verifying Co-Partitioning
JAVA(28 lines)CodeLoading syntax highlighter...
Deep Dive: Suppression Patterns
Emit Only Final Results
JAVA(28 lines)CodeLoading syntax highlighter...
Emit After Time Limit
JAVA(14 lines)CodeLoading syntax highlighter...
Testing Windowed Operations
JAVA(99 lines)CodeLoading syntax highlighter...
Common Mistakes
1. Missing Grace Period for Distributed Systems
JAVA(9 lines)CodeLoading syntax highlighter...
2. Forgetting Suppression for "Exactly-Once" Semantics
JAVA(12 lines)CodeLoading syntax highlighter...
3. Not Re-Keying Before Table Join
JAVA(10 lines)CodeLoading syntax highlighter...
4. Using GlobalKTable for Large Data
JAVA(9 lines)CodeLoading syntax highlighter...
Debug This: The Window That Won't Close
Windowed aggregations are never producing output:
JAVA(11 lines)CodeLoading syntax highlighter...
Events are flowing. Why no output?
Answer
Events arrive at T=0, T=30s, T=50s Window [0-60s] should close at T=60s + grace But no event arrives after T=50s Stream time stuck at T=50s Window never closes!
-
Punctuation with wall-clock time (Processor API):JAVA(5 lines)CodeLoading syntax highlighter...
-
Synthetic heartbeat events:JAVA(8 lines)CodeLoading syntax highlighter...
-
Use wall-clock based suppression:JAVA(4 lines)CodeLoading syntax highlighter...
-
Lower grace period if late data is acceptable to lose:JAVA(3 lines)CodeLoading syntax highlighter...
The root cause is that Kafka Streams is event-driven - without events, time doesn't advance.
Exercises
Exercise 1: Fraud Detection with Session Windows
Build a topology that:
- Groups transactions by user
- Detects sessions (30-minute inactivity gap)
- Alerts if > 10 transactions in a session
- Includes session duration in alert
Exercise 2: Stream-Stream Join with Asymmetric Windows
Create a join between:
ordersstreamshipmentsstream (can arrive 1-7 days after order)- Output orders that haven't shipped within SLA
Exercise 3: Implement Late Data Handling
Build a pipeline that:
- Processes events with 5-minute tumbling windows
- Routes late events (> 10 minutes) to DLQ
- Tracks percentage of late events as metric
Exercise 4: Multi-Way Join
Create a topology joining three streams:
orderspaymentsshipmentsAll within a 1-hour window
Exercise 5: Exactly-Once Billing Aggregation
Implement hourly billing with:
- Tumbling windows with grace period
- Suppression for final results only
- Idempotent output to billing topic
- Tests verifying no duplicate bills
Interview Questions
Q1: "What's the difference between event time and processing time? When would you use each?"
I prefer event time for most cases:
JAVA(2 lines)CodeLoading syntax highlighter...
- Deterministic results on replay
- Handles late-arriving data correctly
- Results reflect business reality
- Monitoring/alerting where 'now' matters
- Simple scenarios where event timing isn't critical
- Legacy systems without embedded timestamps
The key challenge with event time is handling late data. I typically use:
- Grace periods for expected lateness
- Watermarks (implicit in Kafka Streams via stream time)
- Dead letter queues for extremely late data
If an event's timestamp is '5 minutes ago' but I'm processing now, event time correctly windows it with other events from that time period."
Q2: "How do stream-stream joins work in Kafka Streams? What's the windowing requirement?"
The join mechanics:
Stream A: event at T=100 Stream B: find matching events where T in [100-window, 100+window]
JAVA(8 lines)CodeLoading syntax highlighter...
- Both streams buffered for window duration
- Window store for each stream
- Co-partitioning required (same key → same partition)
- Inner: output only when both match
- Left: output A even if no B match (B can be null)
- Outer: output both unmatched A and unmatched B
.before() and .after() for cases like 'payment must come after order'."Q3: "Explain suppression in Kafka Streams. When is it necessary?"
Window [0-60s]: 5 events arrive Emits: 1, 2, 3, 4, 5 (5 separate emissions)
JAVA(2 lines)CodeLoading syntax highlighter...
- Billing/Charging - Bill once per period, not per event
- Notifications - Send summary, not flood user
- Downstream systems expecting final values
- Deduplication scenarios
JAVA(5 lines)CodeLoading syntax highlighter...
untilWindowCloses relies on stream time advancing, which requires new events. For low-volume streams, consider heartbeats or time-limit based suppression."Q4: "What is co-partitioning and why does it matter for joins?"
- Same number of partitions
- Same partitioning strategy (same key → same partition number)
Instance 1 processes: orders partition 0, customers partition 0 Instance 2 processes: orders partition 1, customers partition 1 For a join on customerId: - Customer 'alice' in orders-p0 must match customer 'alice' in customers-p0 - Both must be on the same instance for local join
orders (4 partitions): alice → p1 customers (6 partitions): alice → p4 Different partition numbers → different instances → join fails!
JAVA(3 lines)CodeLoading syntax highlighter...
JAVA(4 lines)CodeLoading syntax highlighter...
Best practice: Design topics with co-partitioning in mind upfront. Repartitioning is expensive."
Q5: "How do you handle late-arriving data in windowed aggregations?"
JAVA(3 lines)CodeLoading syntax highlighter...
JAVA(4 lines)CodeLoading syntax highlighter...
JAVA(3 lines)CodeLoading syntax highlighter...
- Store late events
- Periodically replay to generate corrections
- Used for financial reconciliation
- Track percentage of late events
- Alert on unexpected lateness patterns
- Dashboard showing lateness distribution
- More memory (larger window stores)
- Higher latency (results delayed)
- More state to restore on failure
For billing: I typically use 15-30 minute grace, with DLQ for anything later, and daily reconciliation job for DLQ events."
Summary & Key Takeaways
Windowing & Joins Summary
┌──────────────────────────────────────────────────────────────────────────┐ │ WINDOWING & JOINS SUMMARY │ ├────────────────────────────────┬─────────────────────────────────────────┤ │ Window Type │ Use Case │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Tumbling │ Fixed periods (hourly/daily aggregates) │ │ Hopping │ Rolling windows (5-min avg every 1-min) │ │ Sliding │ Time difference between events │ │ Session │ Activity-based (user sessions) │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Join Type │ Requirement │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ KStream-KStream │ Time window + co-partitioning │ │ KStream-KTable │ Co-partitioning only │ │ KStream-GlobalKTable │ None (full data on every instance) │ └────────────────────────────────┴─────────────────────────────────────────┘
Essential Takeaways
-
Always use event time - Processing time gives non-deterministic results on replay.
-
Always have grace periods - Distributed systems have clock skew and network delays.
-
Use suppression for final results - Without it, you get continuous updates, not once-per-window.
-
Co-partitioning is mandatory - KStream-KStream and KStream-KTable joins fail without it.
-
GlobalKTable for small lookups - Avoids co-partitioning but duplicates data on every instance.
-
Test with time advancement - TopologyTestDriver lets you control stream time precisely.
Quick Reference
┌──────────────────────────────────────────────────────────────────────────┐ │ WINDOWING QUICK REFERENCE │ ├────────────────────────────────┬─────────────────────────────────────────┤ │ Window Definition │ Code │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Tumbling │ TimeWindows.ofSizeAndGrace(size, grace) │ │ Hopping │ TimeWindows.of(size).advanceBy(advance) │ │ Sliding │ SlidingWindows.ofTimeDifferenceAndGrace │ │ Session │ SessionWindows.ofInactivityGapAndGrace │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Join Operations │ │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Stream-Stream Inner │ stream1.join(stream2, joiner, window) │ │ Stream-Stream Left │ stream1.leftJoin(stream2, joiner, win) │ │ Stream-Stream Outer │ stream1.outerJoin(stream2, joiner, win) │ │ Stream-Table │ stream.join(table, joiner) │ │ Stream-GlobalTable │ stream.join(globalTable, keyMapper, j) │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Suppression │ │ ├────────────────────────────────┼─────────────────────────────────────────┤ │ Until window closes │ Suppressed.untilWindowCloses(buffer) │ │ Until time limit │ Suppressed.untilTimeLimit(dur, buffer) │ └────────────────────────────────┴─────────────────────────────────────────┘