Devops

Consumer Internals


At a Glance

AspectDetails
TopicFetch mechanics, poll loop, heartbeats, consumer timeouts
ComplexityIntermediate
PrerequisitesParts 1-4 (Fundamentals)
Time90 minutes
Spring Kafka@KafkaListener, container configuration

What You'll Learn

After completing this article, you will be able to:

  1. Understand the consumer's internal architecture and poll loop
  2. Configure fetch parameters for throughput vs latency trade-offs
  3. Tune max.poll.interval.ms and max.poll.records to prevent consumer kicks
  4. Implement Spring Kafka listeners with proper configuration
  5. Monitor consumer health through heartbeats and session timeouts

Production Story: The Consumer That Kept Getting Kicked

The Incident

Our order processing service was mysteriously losing messages. The consumer would process orders for a few minutes, then suddenly stop. Looking at consumer group status showed constant rebalancing - consumers joining and leaving repeatedly.

The Investigation

BASH(10 lines)
Code
Loading syntax highlighter...

The consumer logs revealed the pattern:

10:15:00.000 INFO  - Poll returned 500 records
10:15:00.001 INFO  - Processing order ORD-001...
10:15:05.000 INFO  - Processing order ORD-050...
10:15:30.000 INFO  - Processing order ORD-100...
10:16:00.000 INFO  - Processing order ORD-150...
... (processing continues slowly)
10:20:00.000 WARN  - Member consumer-1 sending LeaveGroup due to
  consumer poll timeout has expired. This means the time between
  subsequent calls to poll() was longer than max.poll.interval.ms
10:20:00.001 INFO  - Rebalancing triggered

The problem became clear:

┌─────────────────────────────────────────────────────────────────────┐
│                    THE POLL TIMEOUT SCENARIO                        │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Configuration:                                                     │
│  • max.poll.records = 500 (default)                                 │
│  • max.poll.interval.ms = 300000 (5 minutes, default)               │
│  • Processing time per order = ~600ms (external API calls)          │
│                                                                     │
│  Timeline:                                                          │
│  ────────────────────────────────────────────────────────────────   │
│  T=0:00    poll() returns 500 orders                                │
│  T=0:00    Start processing order 1                                 │
│  T=0:36    Finished 60 orders (60 × 600ms)                          │
│  T=1:00    Finished 100 orders                                      │
│  T=3:00    Finished 300 orders                                      │
│  T=5:00    max.poll.interval.ms EXCEEDED!                           │
│            ↓                                                        │
│  T=5:00    Consumer kicked from group (poll timeout)                │
│  T=5:01    Rebalance triggered                                      │
│  T=5:02    Consumer rejoins                                         │
│  T=5:03    Partitions reassigned                                    │
│  T=5:04    poll() returns same 500 orders (offset not committed)    │
│  T=10:04   Same thing happens again!                                │
│                                                                     │
│  Result: Infinite loop, orders reprocessed but never committed      │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

The Root Cause

The consumer was taking too long between poll() calls because:
  1. Processing 500 records × 600ms each = 5 minutes
  2. max.poll.interval.ms = 5 minutes
  3. Racing against the timeout every single poll

The Fix

JAVA(29 lines)
Code
Loading syntax highlighter...

After the fix: Zero rebalances, all orders processed successfully.


Mental Model: Consumer Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                         KAFKA CONSUMER INTERNALS                        │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  Application Thread                                                     │
│  ┌──────────────────────────────────────────────────────────────────┐   │
│  │                                                                  │   │
│  │  while (running) {                                               │   │
│  │      ConsumerRecords records = consumer.poll(Duration);          │   │
│  │      for (record : records) {                              │     │   │
│  │          process(record);                                  │     │   │
│  │      }                                                     │     │   │
│  │      consumer.commitSync(); // or auto-commit              │     │   │
│  │  }                                                         │     │   │
│  │                                                            │     │   │
│  └────────────────────────────────────────────────────────────┼─────┘   │
│                                                               │         │
│                                                               ▼         │
│  ┌──────────────────────────────────────────────────────────────────┐   │
│  │                         POLL() INTERNALS                         │   │
│  │                                                                  │   │
│  │  1. Check if time since last poll > max.poll.interval.ms         │   │
│  │     └── If yes: Leave group, trigger rebalance                   │   │
│  │                                                                  │   │
│  │  2. Send heartbeat (if background heartbeat thread active)       │   │
│  │                                                                  │   │
│  │  3. Handle any pending rebalance callbacks                       │   │
│  │                                                                  │   │
│  │  4. If records available in internal buffer → return them        │   │
│  │                                                                  │   │
│  │  5. If buffer empty, send FetchRequest to broker(s)              │   │
│  │     ├── Request includes: partitions, offsets, fetch.max.bytes   │   │
│  │     └── Wait up to fetch.max.wait.ms for data                    │   │
│  │                                                                  │   │
│  │  6. Return fetched records (up to max.poll.records)              │   │
│  │                                                                  │   │
│  └──────────────────────────────────────────────────────────────────┘   │
│                                                                         │
│  Background Heartbeat Thread (since Kafka 0.10.1)                       │
│  ┌──────────────────────────────────────────────────────────────────┐   │
│  │                                                                  │   │
│  │  while (running) {                                               │   │
│  │      sleep(heartbeat.interval.ms);  // default 3s                │   │
│  │      sendHeartbeat();               // to group coordinator      │   │
│  │  }                                                               │   │
│  │                                                                  │   │
│  │  If no heartbeat for session.timeout.ms → consumer considered    │   │
│  │  dead → removed from group → rebalance                           │   │
│  │                                                                  │   │
│  └──────────────────────────────────────────────────────────────────┘   │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Two Critical Timeouts

┌─────────────────────────────────────────────────────────────────────────┐
│                         TWO TIMEOUT MECHANISMS                          │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  1. SESSION TIMEOUT (session.timeout.ms)                                │
│     ─────────────────────────────────────                               │
│     • Monitored via: Heartbeat thread                                   │
│     • Default: 45000ms (45 seconds)                                     │
│     • Detects: Consumer process crashed/frozen                          │
│                                                                         │
│     Heartbeat Thread          Group Coordinator                         │
│     ┌──────────┐              ┌──────────────┐                          │
│     │ ♥ 3s     │─────────────►│ Last HB: now │                          │
│     │ ♥ 3s     │─────────────►│ Last HB: now │                          │
│     │ ♥ 3s     │─────────────►│ Last HB: now │                          │
│     │ X crash  │              │              │                          │
│     │          │              │ ...45s...    │                          │
│     │          │              │ TIMEOUT!     │                          │
│     └──────────┘              └──────────────┘                          │
│                                                                         │
│                                                                         │
│  2. POLL INTERVAL TIMEOUT (max.poll.interval.ms)                        │
│     ─────────────────────────────────────────────                       │
│     • Monitored via: Time between poll() calls                          │
│     • Default: 300000ms (5 minutes)                                     │
│     • Detects: Consumer stuck in processing (alive but not progressing) │
│                                                                         │
│     Application Thread        Consumer Library                          │
│     ┌──────────┐              ┌──────────────┐                          │
│     │ poll()   │─────────────►│ last poll:T0 │                          │
│     │ process  │              │              │                          │
│     │ process  │              │ T0+1min      │                          │
│     │ process  │              │ T0+2min      │                          │
│     │ process  │              │ T0+3min      │                          │
│     │ process  │              │ T0+4min      │                          │
│     │ process  │              │ T0+5min      │                          │
│     │          │              │ TIMEOUT!     │ → Leave group            │
│     └──────────┘              └──────────────┘                          │
│                                                                         │
│                                                                         │
│  KEY DIFFERENCE:                                                        │
│  ────────────────                                                       │
│  • session.timeout.ms: "Is the consumer process alive?"                 │
│  • max.poll.interval.ms: "Is the consumer making progress?"             │
│                                                                         │
│  Both cause rebalance when exceeded, but for different reasons          │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Deep Dive

1. Fetch Mechanics

JAVA(33 lines)
Code
Loading syntax highlighter...

Fetch Flow Visualization

CONSUMER FETCH FLOW:

Consumer                              Broker
┌──────────┐                         ┌──────────────────────────────────┐
│          │                         │                                  │
│ poll()   │                         │  Partition: orders-0             │
│  │       │                         │  ┌────────────────────────────┐  │
│  │       │  FetchRequest:          │  │ offset 100: msg1 (500B)    │  │
│  │       │  ├─ topic: orders       │  │ offset 101: msg2 (500B)    │  │
│  │       │  ├─ partitions: [0,1]   │  │ offset 102: msg3 (500B)    │  │
│  │       │  ├─ offsets: [100, 50]  │  │ offset 103: msg4 (500B)    │  │
│  │       │  └─ maxBytes: 1MB       │  │ ...                        │  │
│  │       │ ─────────────────────►  │  └────────────────────────────┘  │
│  │       │                         │                                  │
│  │       │                         │  Wait for:                       │
│  │       │                         │  • fetch.min.bytes (1KB)         │
│  │       │                         │  OR                              │
│  │       │                         │  • fetch.max.wait.ms (500ms)     │
│  │       │                         │                                  │
│  │       │  FetchResponse:         │                                  │
│  │       │  ├─ orders-0: 10 msgs   │                                  │
│  │       │  └─ orders-1: 5 msgs    │                                  │
│  │       │ ◄─────────────────────  │                                  │
│  │       │                         │                                  │
│  │  ┌────▼────┐                    │                                  │
│  │  │ Internal│  All 15 records    │                                  │
│  │  │ Buffer  │  stored here       │                                  │
│  │  └────┬────┘                    │                                  │
│  │       │                         │                                  │
│  │       │ max.poll.records=5      │                                  │
│  │       ▼                         │                                  │
│  │  Returns 5 records              │                                  │
│  │  (buffer has 10 more)           │                                  │
│  │       │                         │                                  │
│  │ Next poll():                    │                                  │
│  │ Returns 5 from buffer           │                                  │
│  │ (no fetch request needed)       │                                  │
│          │                         │                                  │
└──────────┘                         └──────────────────────────────────┘

2. Heartbeat Configuration

JAVA(29 lines)
Code
Loading syntax highlighter...

Timeout Relationship

TIMEOUT CONFIGURATION GUIDELINES:

┌──────────────────────────────────────────────────────────────────────┐
│                                                                      │
│  heartbeat.interval.ms < session.timeout.ms / 3                      │
│  │                      │                                            │
│  │                      └── Allows 2+ missed heartbeats              │
│  │                          before timeout                           │
│  │                                                                   │
│  └── 10s heartbeat with 30s session = 2 missed OK                    │
│                                                                      │
│                                                                      │
│  max.poll.interval.ms > (max.poll.records × processing_time)         │
│  │                       │                                           │
│  │                       └── Account for worst case processing       │
│  │                                                                   │
│  └── 100 records × 500ms = 50s, so use 60s+ timeout                  │
│                                                                      │
│                                                                      │
│  EXAMPLE CONFIGURATION:                                              │
│  ───────────────────────                                             │
│  │                                                                   │
│  │  Fast processing (< 10ms per message):                            │
│  │    max.poll.records = 500                                         │
│  │    max.poll.interval.ms = 60000  (1 minute)                       │
│  │                                                                   │
│  │  Slow processing (100-500ms per message):                         │
│  │    max.poll.records = 50-100                                      │
│  │    max.poll.interval.ms = 60000-120000                            │
│  │                                                                   │
│  │  Very slow processing (> 1s per message):                         │
│  │    max.poll.records = 10-20                                       │
│  │    max.poll.interval.ms = 300000+ OR use async processing         │
│  │                                                                   │
└──────────────────────────────────────────────────────────────────────┘

3. Spring Kafka @KafkaListener

JAVA(40 lines)
Code
Loading syntax highlighter...

@KafkaListener Usage Patterns

JAVA(75 lines)
Code
Loading syntax highlighter...

4. Consumer Record Buffer

JAVA(31 lines)
Code
Loading syntax highlighter...

5. Consumer Position Management

JAVA(46 lines)
Code
Loading syntax highlighter...

6. Consumer Lifecycle

JAVA(76 lines)
Code
Loading syntax highlighter...

Common Mistakes

Mistake 1: Ignoring max.poll.interval.ms

JAVA(24 lines)
Code
Loading syntax highlighter...

Mistake 2: Confusing session.timeout.ms and max.poll.interval.ms

JAVA(9 lines)
Code
Loading syntax highlighter...

Mistake 3: Wrong fetch.max.bytes with large messages

JAVA(9 lines)
Code
Loading syntax highlighter...

Mistake 4: Too many consumers for partitions

JAVA(11 lines)
Code
Loading syntax highlighter...

Mistake 5: Not handling deserializer errors

JAVA(13 lines)
Code
Loading syntax highlighter...

Debug This

Scenario: Consumer Lag Keeps Growing

Symptoms:
  • Consumer lag (unconsumed messages) increases over time
  • Processing appears normal
  • No errors in logs
Investigation:
BASH(12 lines)
Code
Loading syntax highlighter...
JAVA(18 lines)
Code
Loading syntax highlighter...
Common Causes:
  1. Processing too slow: Consumer can't keep up with producer rate
  2. Too few partitions: Limited parallelism
  3. Underprovisioned consumers: Need more consumer instances
  4. Fetch config issues: fetch.min.bytes too high, causing delays
Resolution:
JAVA(15 lines)
Code
Loading syntax highlighter...

Exercises

Exercise 1: Tune Consumer for Latency

Configure a consumer optimized for minimum latency:

  1. Messages should be consumed within 100ms of being produced
  2. Measure and document actual latency
  3. Compare with default configuration

Exercise 2: Simulate Poll Timeout

Create a test that:

  1. Configures short max.poll.interval.ms (30 seconds)
  2. Simulates slow processing
  3. Observes consumer getting kicked
  4. Implements proper fix

Exercise 3: Implement Graceful Shutdown

Build a consumer that:

  1. Handles shutdown signals (SIGTERM)
  2. Commits all processed offsets before exit
  3. Uses wakeup() to interrupt blocking poll
  4. Logs clean shutdown confirmation

Exercise 4: Consumer Metrics Dashboard

Create a monitoring component that tracks:

  1. Records consumed per second
  2. Average processing time per record
  3. Consumer lag per partition
  4. Rebalance frequency

Exercise 5: Adaptive Polling

Implement a consumer that:

  1. Monitors processing time per record
  2. Dynamically adjusts max.poll.records
  3. Stays within 50% of max.poll.interval.ms
  4. Logs adjustments made

Interview Questions

Q1: Explain the difference between session.timeout.ms and max.poll.interval.ms.

A: These are two different timeout mechanisms for consumer health:
session.timeout.ms (default 45s):
  • Monitored by: Background heartbeat thread
  • Purpose: Detect if consumer process is alive
  • Mechanism: Group coordinator removes consumer if no heartbeat received
  • Typical failure: Process crash, network partition, GC pause
  • Keep relatively short for fast failure detection
max.poll.interval.ms (default 5 minutes):
  • Monitored by: Time between poll() calls
  • Purpose: Detect if consumer is making progress
  • Mechanism: Consumer library leaves group if poll() not called in time
  • Typical failure: Processing taking too long, deadlock, infinite loop
  • Set based on processing time per batch
Key insight: A consumer can be "alive" (sending heartbeats) but "stuck" (not calling poll). The heartbeat thread runs independently, so a stuck processing thread won't prevent heartbeats. That's why we need both timeouts.

Q2: How does the consumer decide when to fetch more data from brokers?

A: The consumer uses an internal buffer and several parameters:
Fetch triggers:
  1. Buffer is empty or below threshold
  2. No outstanding fetch requests
  3. Time since last fetch exceeds threshold
Fetch parameters:
  • fetch.min.bytes: Wait for at least this much data (or timeout)
  • fetch.max.wait.ms: Max time broker waits for fetch.min.bytes
  • fetch.max.bytes: Max total data in response
  • max.partition.fetch.bytes: Max data per partition
Flow:
  1. poll() checks internal buffer
  2. If sufficient records, return up to max.poll.records
  3. If buffer low, send FetchRequest to broker
  4. Broker waits for fetch.min.bytes OR fetch.max.wait.ms
  5. Response fills buffer
  6. Next poll() returns from buffer

This design separates network fetch from application poll, enabling efficient batching while controlling application-level batch size.

Q3: What happens when a consumer takes too long to process messages?

A: Several things can happen depending on configuration:
If processing exceeds max.poll.interval.ms:
  1. Consumer leaves the group voluntarily
  2. Group coordinator initiates rebalance
  3. Partitions reassigned to other consumers
  4. Uncommitted offsets not saved
  5. Other consumers will reprocess those messages
Consequences:
  • Message duplication (messages processed but not committed)
  • Rebalancing overhead
  • Processing latency increases cluster-wide
  • Possible cascade if multiple consumers affected
Prevention strategies:
  1. Reduce max.poll.records to process fewer per batch
  2. Increase max.poll.interval.ms if processing is legitimately slow
  3. Process asynchronously (submit to thread pool, acknowledge later)
  4. Use batch processing for efficiency

Q4: How would you configure a consumer for very large messages?

A: Large messages require tuning several parameters:
JAVA(18 lines)
Code
Loading syntax highlighter...
Key relationship:
fetch.max.bytes >= max.partition.fetch.bytes >= max.message.bytes

If fetch.max.bytes is smaller than a message, that message can never be consumed!

Q5: Describe the consumer poll loop and why blocking in it is dangerous.

A: The poll loop is the heart of consumer operation:
JAVA(7 lines)
Code
Loading syntax highlighter...
Why blocking is dangerous:
  1. Poll timeout: If processing blocks longer than max.poll.interval.ms, consumer is kicked
  2. Heartbeat decoupling: Heartbeats continue (separate thread), so coordinator thinks consumer is alive while it's actually stuck
  3. Cascading rebalances: When kicked, partitions move to other consumers, potentially overloading them
  4. Duplicate processing: Uncommitted messages will be reprocessed by new owner
Safe patterns:
  1. Keep processing fast (< 100ms per record typically)
  2. Offload slow work to separate thread pool
  3. Use async acknowledgment for long processing
  4. Set appropriate max.poll.records for your processing speed

Summary

Key Takeaways

  1. poll() is the heartbeat of your consumer - call it frequently to stay in the group
  2. Two timeout mechanisms: session.timeout.ms for process liveness, max.poll.interval.ms for processing progress
  3. max.poll.records controls batch size from poll(), not fetch size from broker
  4. Internal buffer decouples fetching from polling for efficiency
  5. Spring @KafkaListener handles the poll loop for you, but you still need proper configuration
  6. Processing time must fit within max.poll.interval.ms or consumer gets kicked
  7. Concurrency should match partition count - extra consumers are idle
  8. Graceful shutdown requires wakeup() to interrupt blocking poll

Quick Reference

Essential Consumer Configuration

PROPERTIES(11 lines)
Code
Loading syntax highlighter...

@KafkaListener Cheat Sheet

JAVA(25 lines)
Code
Loading syntax highlighter...

Series Navigation

PreviousCurrentNext
Part 7: Advanced ProducerPart 8: Consumer InternalsPart 9: Consumer Groups

Series Overview

  • Part 0: How to Use This Series
  • Parts 1-4: Fundamentals
  • Parts 5-7: Producers
  • Parts 8-11: Consumers (Internals, Groups, Offset Management, Exactly-Once)
  • Parts 12-14: Operations
  • Parts 15-17: Kafka Streams
  • Parts 18-20: Patterns & Practices
  • Part 21: Cheatsheet & Decision Guide