Consumer Internals
At a Glance
| Aspect | Details |
|---|---|
| Topic | Fetch mechanics, poll loop, heartbeats, consumer timeouts |
| Complexity | Intermediate |
| Prerequisites | Parts 1-4 (Fundamentals) |
| Time | 90 minutes |
| Spring Kafka | @KafkaListener, container configuration |
What You'll Learn
After completing this article, you will be able to:
- Understand the consumer's internal architecture and poll loop
- Configure fetch parameters for throughput vs latency trade-offs
- Tune
max.poll.interval.msandmax.poll.recordsto prevent consumer kicks - Implement Spring Kafka listeners with proper configuration
- Monitor consumer health through heartbeats and session timeouts
Production Story: The Consumer That Kept Getting Kicked
The Incident
Our order processing service was mysteriously losing messages. The consumer would process orders for a few minutes, then suddenly stop. Looking at consumer group status showed constant rebalancing - consumers joining and leaving repeatedly.
The Investigation
BASH(10 lines)CodeLoading syntax highlighter...
The consumer logs revealed the pattern:
10:15:00.000 INFO - Poll returned 500 records 10:15:00.001 INFO - Processing order ORD-001... 10:15:05.000 INFO - Processing order ORD-050... 10:15:30.000 INFO - Processing order ORD-100... 10:16:00.000 INFO - Processing order ORD-150... ... (processing continues slowly) 10:20:00.000 WARN - Member consumer-1 sending LeaveGroup due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than max.poll.interval.ms 10:20:00.001 INFO - Rebalancing triggered
The problem became clear:
┌─────────────────────────────────────────────────────────────────────┐ │ THE POLL TIMEOUT SCENARIO │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ Configuration: │ │ • max.poll.records = 500 (default) │ │ • max.poll.interval.ms = 300000 (5 minutes, default) │ │ • Processing time per order = ~600ms (external API calls) │ │ │ │ Timeline: │ │ ──────────────────────────────────────────────────────────────── │ │ T=0:00 poll() returns 500 orders │ │ T=0:00 Start processing order 1 │ │ T=0:36 Finished 60 orders (60 × 600ms) │ │ T=1:00 Finished 100 orders │ │ T=3:00 Finished 300 orders │ │ T=5:00 max.poll.interval.ms EXCEEDED! │ │ ↓ │ │ T=5:00 Consumer kicked from group (poll timeout) │ │ T=5:01 Rebalance triggered │ │ T=5:02 Consumer rejoins │ │ T=5:03 Partitions reassigned │ │ T=5:04 poll() returns same 500 orders (offset not committed) │ │ T=10:04 Same thing happens again! │ │ │ │ Result: Infinite loop, orders reprocessed but never committed │ │ │ └─────────────────────────────────────────────────────────────────────┘
The Root Cause
poll() calls because:- Processing 500 records × 600ms each = 5 minutes
max.poll.interval.ms= 5 minutes- Racing against the timeout every single poll
The Fix
JAVA(29 lines)CodeLoading syntax highlighter...
After the fix: Zero rebalances, all orders processed successfully.
Mental Model: Consumer Architecture
┌─────────────────────────────────────────────────────────────────────────┐ │ KAFKA CONSUMER INTERNALS │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ Application Thread │ │ ┌──────────────────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ while (running) { │ │ │ │ ConsumerRecords records = consumer.poll(Duration); │ │ │ │ for (record : records) { │ │ │ │ │ process(record); │ │ │ │ │ } │ │ │ │ │ consumer.commitSync(); // or auto-commit │ │ │ │ │ } │ │ │ │ │ │ │ │ │ └────────────────────────────────────────────────────────────┼─────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────────────────────────────────┐ │ │ │ POLL() INTERNALS │ │ │ │ │ │ │ │ 1. Check if time since last poll > max.poll.interval.ms │ │ │ │ └── If yes: Leave group, trigger rebalance │ │ │ │ │ │ │ │ 2. Send heartbeat (if background heartbeat thread active) │ │ │ │ │ │ │ │ 3. Handle any pending rebalance callbacks │ │ │ │ │ │ │ │ 4. If records available in internal buffer → return them │ │ │ │ │ │ │ │ 5. If buffer empty, send FetchRequest to broker(s) │ │ │ │ ├── Request includes: partitions, offsets, fetch.max.bytes │ │ │ │ └── Wait up to fetch.max.wait.ms for data │ │ │ │ │ │ │ │ 6. Return fetched records (up to max.poll.records) │ │ │ │ │ │ │ └──────────────────────────────────────────────────────────────────┘ │ │ │ │ Background Heartbeat Thread (since Kafka 0.10.1) │ │ ┌──────────────────────────────────────────────────────────────────┐ │ │ │ │ │ │ │ while (running) { │ │ │ │ sleep(heartbeat.interval.ms); // default 3s │ │ │ │ sendHeartbeat(); // to group coordinator │ │ │ │ } │ │ │ │ │ │ │ │ If no heartbeat for session.timeout.ms → consumer considered │ │ │ │ dead → removed from group → rebalance │ │ │ │ │ │ │ └──────────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘
Two Critical Timeouts
┌─────────────────────────────────────────────────────────────────────────┐ │ TWO TIMEOUT MECHANISMS │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ 1. SESSION TIMEOUT (session.timeout.ms) │ │ ───────────────────────────────────── │ │ • Monitored via: Heartbeat thread │ │ • Default: 45000ms (45 seconds) │ │ • Detects: Consumer process crashed/frozen │ │ │ │ Heartbeat Thread Group Coordinator │ │ ┌──────────┐ ┌──────────────┐ │ │ │ ♥ 3s │─────────────►│ Last HB: now │ │ │ │ ♥ 3s │─────────────►│ Last HB: now │ │ │ │ ♥ 3s │─────────────►│ Last HB: now │ │ │ │ X crash │ │ │ │ │ │ │ │ ...45s... │ │ │ │ │ │ TIMEOUT! │ │ │ └──────────┘ └──────────────┘ │ │ │ │ │ │ 2. POLL INTERVAL TIMEOUT (max.poll.interval.ms) │ │ ───────────────────────────────────────────── │ │ • Monitored via: Time between poll() calls │ │ • Default: 300000ms (5 minutes) │ │ • Detects: Consumer stuck in processing (alive but not progressing) │ │ │ │ Application Thread Consumer Library │ │ ┌──────────┐ ┌──────────────┐ │ │ │ poll() │─────────────►│ last poll:T0 │ │ │ │ process │ │ │ │ │ │ process │ │ T0+1min │ │ │ │ process │ │ T0+2min │ │ │ │ process │ │ T0+3min │ │ │ │ process │ │ T0+4min │ │ │ │ process │ │ T0+5min │ │ │ │ │ │ TIMEOUT! │ → Leave group │ │ └──────────┘ └──────────────┘ │ │ │ │ │ │ KEY DIFFERENCE: │ │ ──────────────── │ │ • session.timeout.ms: "Is the consumer process alive?" │ │ • max.poll.interval.ms: "Is the consumer making progress?" │ │ │ │ Both cause rebalance when exceeded, but for different reasons │ │ │ └─────────────────────────────────────────────────────────────────────────┘
Deep Dive
1. Fetch Mechanics
JAVA(33 lines)CodeLoading syntax highlighter...
Fetch Flow Visualization
CONSUMER FETCH FLOW: Consumer Broker ┌──────────┐ ┌──────────────────────────────────┐ │ │ │ │ │ poll() │ │ Partition: orders-0 │ │ │ │ │ ┌────────────────────────────┐ │ │ │ │ FetchRequest: │ │ offset 100: msg1 (500B) │ │ │ │ │ ├─ topic: orders │ │ offset 101: msg2 (500B) │ │ │ │ │ ├─ partitions: [0,1] │ │ offset 102: msg3 (500B) │ │ │ │ │ ├─ offsets: [100, 50] │ │ offset 103: msg4 (500B) │ │ │ │ │ └─ maxBytes: 1MB │ │ ... │ │ │ │ │ ─────────────────────► │ └────────────────────────────┘ │ │ │ │ │ │ │ │ │ │ Wait for: │ │ │ │ │ • fetch.min.bytes (1KB) │ │ │ │ │ OR │ │ │ │ │ • fetch.max.wait.ms (500ms) │ │ │ │ │ │ │ │ │ FetchResponse: │ │ │ │ │ ├─ orders-0: 10 msgs │ │ │ │ │ └─ orders-1: 5 msgs │ │ │ │ │ ◄───────────────────── │ │ │ │ │ │ │ │ │ ┌────▼────┐ │ │ │ │ │ Internal│ All 15 records │ │ │ │ │ Buffer │ stored here │ │ │ │ └────┬────┘ │ │ │ │ │ │ │ │ │ │ max.poll.records=5 │ │ │ │ ▼ │ │ │ │ Returns 5 records │ │ │ │ (buffer has 10 more) │ │ │ │ │ │ │ │ │ Next poll(): │ │ │ │ Returns 5 from buffer │ │ │ │ (no fetch request needed) │ │ │ │ │ │ └──────────┘ └──────────────────────────────────┘
2. Heartbeat Configuration
JAVA(29 lines)CodeLoading syntax highlighter...
Timeout Relationship
TIMEOUT CONFIGURATION GUIDELINES: ┌──────────────────────────────────────────────────────────────────────┐ │ │ │ heartbeat.interval.ms < session.timeout.ms / 3 │ │ │ │ │ │ │ └── Allows 2+ missed heartbeats │ │ │ before timeout │ │ │ │ │ └── 10s heartbeat with 30s session = 2 missed OK │ │ │ │ │ │ max.poll.interval.ms > (max.poll.records × processing_time) │ │ │ │ │ │ │ └── Account for worst case processing │ │ │ │ │ └── 100 records × 500ms = 50s, so use 60s+ timeout │ │ │ │ │ │ EXAMPLE CONFIGURATION: │ │ ─────────────────────── │ │ │ │ │ │ Fast processing (< 10ms per message): │ │ │ max.poll.records = 500 │ │ │ max.poll.interval.ms = 60000 (1 minute) │ │ │ │ │ │ Slow processing (100-500ms per message): │ │ │ max.poll.records = 50-100 │ │ │ max.poll.interval.ms = 60000-120000 │ │ │ │ │ │ Very slow processing (> 1s per message): │ │ │ max.poll.records = 10-20 │ │ │ max.poll.interval.ms = 300000+ OR use async processing │ │ │ │ └──────────────────────────────────────────────────────────────────────┘
3. Spring Kafka @KafkaListener
JAVA(40 lines)CodeLoading syntax highlighter...
@KafkaListener Usage Patterns
JAVA(75 lines)CodeLoading syntax highlighter...
4. Consumer Record Buffer
JAVA(31 lines)CodeLoading syntax highlighter...
5. Consumer Position Management
JAVA(46 lines)CodeLoading syntax highlighter...
6. Consumer Lifecycle
JAVA(76 lines)CodeLoading syntax highlighter...
Common Mistakes
Mistake 1: Ignoring max.poll.interval.ms
JAVA(24 lines)CodeLoading syntax highlighter...
Mistake 2: Confusing session.timeout.ms and max.poll.interval.ms
JAVA(9 lines)CodeLoading syntax highlighter...
Mistake 3: Wrong fetch.max.bytes with large messages
JAVA(9 lines)CodeLoading syntax highlighter...
Mistake 4: Too many consumers for partitions
JAVA(11 lines)CodeLoading syntax highlighter...
Mistake 5: Not handling deserializer errors
JAVA(13 lines)CodeLoading syntax highlighter...
Debug This
Scenario: Consumer Lag Keeps Growing
- Consumer lag (unconsumed messages) increases over time
- Processing appears normal
- No errors in logs
BASH(12 lines)CodeLoading syntax highlighter...
JAVA(18 lines)CodeLoading syntax highlighter...
- Processing too slow: Consumer can't keep up with producer rate
- Too few partitions: Limited parallelism
- Underprovisioned consumers: Need more consumer instances
- Fetch config issues: fetch.min.bytes too high, causing delays
JAVA(15 lines)CodeLoading syntax highlighter...
Exercises
Exercise 1: Tune Consumer for Latency
Configure a consumer optimized for minimum latency:
- Messages should be consumed within 100ms of being produced
- Measure and document actual latency
- Compare with default configuration
Exercise 2: Simulate Poll Timeout
Create a test that:
- Configures short
max.poll.interval.ms(30 seconds) - Simulates slow processing
- Observes consumer getting kicked
- Implements proper fix
Exercise 3: Implement Graceful Shutdown
Build a consumer that:
- Handles shutdown signals (SIGTERM)
- Commits all processed offsets before exit
- Uses
wakeup()to interrupt blocking poll - Logs clean shutdown confirmation
Exercise 4: Consumer Metrics Dashboard
Create a monitoring component that tracks:
- Records consumed per second
- Average processing time per record
- Consumer lag per partition
- Rebalance frequency
Exercise 5: Adaptive Polling
Implement a consumer that:
- Monitors processing time per record
- Dynamically adjusts
max.poll.records - Stays within 50% of
max.poll.interval.ms - Logs adjustments made
Interview Questions
Q1: Explain the difference between session.timeout.ms and max.poll.interval.ms.
- Monitored by: Background heartbeat thread
- Purpose: Detect if consumer process is alive
- Mechanism: Group coordinator removes consumer if no heartbeat received
- Typical failure: Process crash, network partition, GC pause
- Keep relatively short for fast failure detection
- Monitored by: Time between poll() calls
- Purpose: Detect if consumer is making progress
- Mechanism: Consumer library leaves group if poll() not called in time
- Typical failure: Processing taking too long, deadlock, infinite loop
- Set based on processing time per batch
Q2: How does the consumer decide when to fetch more data from brokers?
- Buffer is empty or below threshold
- No outstanding fetch requests
- Time since last fetch exceeds threshold
fetch.min.bytes: Wait for at least this much data (or timeout)fetch.max.wait.ms: Max time broker waits for fetch.min.bytesfetch.max.bytes: Max total data in responsemax.partition.fetch.bytes: Max data per partition
- poll() checks internal buffer
- If sufficient records, return up to max.poll.records
- If buffer low, send FetchRequest to broker
- Broker waits for fetch.min.bytes OR fetch.max.wait.ms
- Response fills buffer
- Next poll() returns from buffer
This design separates network fetch from application poll, enabling efficient batching while controlling application-level batch size.
Q3: What happens when a consumer takes too long to process messages?
- Consumer leaves the group voluntarily
- Group coordinator initiates rebalance
- Partitions reassigned to other consumers
- Uncommitted offsets not saved
- Other consumers will reprocess those messages
- Message duplication (messages processed but not committed)
- Rebalancing overhead
- Processing latency increases cluster-wide
- Possible cascade if multiple consumers affected
- Reduce
max.poll.recordsto process fewer per batch - Increase
max.poll.interval.msif processing is legitimately slow - Process asynchronously (submit to thread pool, acknowledge later)
- Use batch processing for efficiency
Q4: How would you configure a consumer for very large messages?
JAVA(18 lines)CodeLoading syntax highlighter...
fetch.max.bytes >= max.partition.fetch.bytes >= max.message.bytes
If fetch.max.bytes is smaller than a message, that message can never be consumed!
Q5: Describe the consumer poll loop and why blocking in it is dangerous.
JAVA(7 lines)CodeLoading syntax highlighter...
-
Poll timeout: If processing blocks longer than
max.poll.interval.ms, consumer is kicked -
Heartbeat decoupling: Heartbeats continue (separate thread), so coordinator thinks consumer is alive while it's actually stuck
-
Cascading rebalances: When kicked, partitions move to other consumers, potentially overloading them
-
Duplicate processing: Uncommitted messages will be reprocessed by new owner
- Keep processing fast (< 100ms per record typically)
- Offload slow work to separate thread pool
- Use async acknowledgment for long processing
- Set appropriate
max.poll.recordsfor your processing speed
Summary
Key Takeaways
-
poll() is the heartbeat of your consumer - call it frequently to stay in the group
-
Two timeout mechanisms:
session.timeout.msfor process liveness,max.poll.interval.msfor processing progress -
max.poll.records controls batch size from poll(), not fetch size from broker
-
Internal buffer decouples fetching from polling for efficiency
-
Spring @KafkaListener handles the poll loop for you, but you still need proper configuration
-
Processing time must fit within
max.poll.interval.msor consumer gets kicked -
Concurrency should match partition count - extra consumers are idle
-
Graceful shutdown requires
wakeup()to interrupt blocking poll
Quick Reference
Essential Consumer Configuration
PROPERTIES(11 lines)CodeLoading syntax highlighter...
@KafkaListener Cheat Sheet
JAVA(25 lines)CodeLoading syntax highlighter...
Series Navigation
| Previous | Current | Next |
|---|---|---|
| Part 7: Advanced Producer | Part 8: Consumer Internals | Part 9: Consumer Groups |
Series Overview
- Part 0: How to Use This Series
- Parts 1-4: Fundamentals
- Parts 5-7: Producers
- Parts 8-11: Consumers (Internals, Groups, Offset Management, Exactly-Once)
- Parts 12-14: Operations
- Parts 15-17: Kafka Streams
- Parts 18-20: Patterns & Practices
- Part 21: Cheatsheet & Decision Guide