Devops

Producer Internals


At a Glance

AspectDetails
TopicProducer architecture, batching, compression, memory management
ComplexityIntermediate
PrerequisitesParts 1-4 (Fundamentals)
Time90 minutes
Spring KafkaKafkaTemplate, ProducerFactory, batch configuration

What You'll Learn

After completing this article, you will be able to:

  1. Understand the producer's internal architecture and data flow
  2. Configure batching with batch.size and linger.ms for optimal throughput
  3. Choose the right compression algorithm for your workload
  4. Manage producer memory with buffer.memory and handle back-pressure
  5. Implement Spring Kafka producers with production-ready configurations

Production Story: The Buffer Memory Exhaustion

The Incident

Our order processing system was humming along during a normal Tuesday afternoon when suddenly, API response times jumped from 50ms to 30 seconds. The service eventually became completely unresponsive.

The Investigation

JAVA(10 lines)
Code
Loading syntax highlighter...

The service was stuck in a loop:

  1. Send messages to Kafka
  2. Get NotLeaderOrFollowerException (broker failover in progress)
  3. Request metadata update
  4. Keep buffering new messages while waiting
  5. Buffer fills up
  6. All threads blocked waiting for buffer space
  7. API requests timeout
Timeline:
14:32:00 - Broker-2 goes down for maintenance
14:32:05 - Leader election for orders-0 partition
14:32:06 - Producer gets NotLeaderOrFollowerException
14:32:07 - Producer requests metadata, keeps trying old leader
14:32:10 - Metadata stale, retries continue
14:32:30 - Buffer at 50% (32MB of 64MB)
14:33:00 - Buffer at 90%
14:33:10 - Buffer full (64MB)
14:33:11 - All producer threads blocked on buffer.memory
14:33:12 - API requests start timing out
14:35:00 - Complete service unavailability

The Root Cause

JAVA(13 lines)
Code
Loading syntax highlighter...

The Fix

JAVA(39 lines)
Code
Loading syntax highlighter...

Mental Model: Producer Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                         KAFKA PRODUCER INTERNALS                        │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  Application Thread(s)                                                  │
│  ┌─────────────────────────────────────────────────────────────────┐    │
│  │                                                                 │    │
│  │  producer.send(ProducerRecord)                                  │    │
│  │           │                                                     │    │
│  │           ▼                                                     │    │
│  │  ┌─────────────────┐                                            │    │
│  │  │   Interceptors  │  Optional pre-send hooks                   │    │
│  │  └────────┬────────┘                                            │    │
│  │           │                                                     │    │
│  │           ▼                                                     │    │
│  │  ┌─────────────────┐                                            │    │
│  │  │   Serializer    │  Key + Value → byte[]                      │    │
│  │  └────────┬────────┘                                            │    │
│  │           │                                                     │    │
│  │           ▼                                                     │    │
│  │  ┌─────────────────┐                                            │    │
│  │  │   Partitioner   │  Determines target partition               │    │
│  │  └────────┬────────┘                                            │    │
│  │           │                                                     │    │
│  └───────────┼─────────────────────────────────────────────────────┘    │
│              │                                                          │
│              ▼                                                          │
│  ┌───────────────────────────────────────────────────────────────────┐  │
│  │                      RECORD ACCUMULATOR                           │  │
│  │                     (buffer.memory pool)                          │  │
│  │                                                                   │  │
│  │   ┌─────────────────────────────────────────────────────────────┐ │  │
│  │   │  Partition Queues (one per topic-partition)                 │ │  │
│  │   │                                                             │ │  │
│  │   │  orders-0: [Batch 1][Batch 2][Batch 3 (filling)]            │ │  │
│  │   │  orders-1: [Batch 1][Batch 2 (filling)]                     │ │  │
│  │   │  orders-2: [Batch 1 (filling)]                              │ │  │
│  │   │  users-0:  [Batch 1][Batch 2][Batch 3][Batch 4 (filling)]   │ │  │
│  │   │  users-1:  [Batch 1 (filling)]                              │ │  │
│  │   │                                                             │ │  │
│  │   │  Each batch: up to batch.size bytes                         │ │  │
│  │   └─────────────────────────────────────────────────────────────┘ │  │
│  │                                                                   │  │
│  │   Batches sent when:                                              │  │
│  │   • batch.size reached, OR                                        │  │
│  │   • linger.ms elapsed, OR                                         │  │
│  │   • flush() called                                                │  │
│  │                                                                   │  │
│  └───────────────────────────────────────────────────────────────────┘  │
│              │                                                          │
│              │ Sender thread picks ready batches                        │
│              ▼                                                          │
│  ┌───────────────────────────────────────────────────────────────────┐  │
│  │                       SENDER THREAD                               │  │
│  │                  (Single background thread)                       │  │
│  │                                                                   │  │
│  │  ┌────────────────────────────────────────────────────────────┐   │  │
│  │  │ 1. Group batches by destination broker                     │   │  │
│  │  │                                                            │   │  │
│  │  │    Broker 1: [orders-0 batch][users-1 batch]               │   │  │
│  │  │    Broker 2: [orders-1 batch][orders-2 batch]              │   │  │
│  │  │    Broker 3: [users-0 batch]                               │   │  │
│  │  └────────────────────────────────────────────────────────────┘   │  │
│  │                                                                   │  │
│  │  ┌────────────────────────────────────────────────────────────┐   │  │
│  │  │ 2. Compress batches (if compression enabled)               │   │  │
│  │  │                                                            │   │  │
│  │  │    Original: 64KB → Compressed: 8KB (snappy)               │   │  │
│  │  └────────────────────────────────────────────────────────────┘   │  │
│  │                                                                   │  │
│  │  ┌────────────────────────────────────────────────────────────┐   │  │
│  │  │ 3. Send ProduceRequest to each broker                      │   │  │
│  │  │                                                            │   │  │
│  │  │    max.in.flight.requests.per.connection batches at a time │   │  │
│  │  └────────────────────────────────────────────────────────────┘   │  │
│  │                                                                   │  │
│  └───────────────────────────────────────────────────────────────────┘  │
│              │                                                          │
│              ▼                                                          │
│         To Brokers                                                      │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Batching Timeline

TIME ──────────────────────────────────────────────────────────────►

Record arrivals for partition orders-0:
R1    R2 R3    R4 R5 R6 R7    R8         R9 R10
│     │  │     │  │  │  │     │          │  │
▼     ▼  ▼     ▼  ▼  ▼  ▼     ▼          ▼  ▼

Batch formation (batch.size=16KB, linger.ms=10ms):

├────── Batch 1 ──────┤├───── Batch 2 ─────┤├── Batch 3 ──┤
│ R1, R2, R3          ││ R4, R5, R6, R7    ││ R8          │
│ Sent: linger.ms     ││ Sent: batch.size  ││ Sent: flush │
│       expired       ││       reached     ││             │
└─────────────────────┘└───────────────────┘└─────────────┘

Legend:
- R1-R10: Records
- Batch 1: Sent after linger.ms (10ms) because batch didn't fill
- Batch 2: Sent immediately when batch.size (16KB) reached
- Batch 3: Sent on explicit flush() call

Deep Dive

1. Producer Record Lifecycle

JAVA(34 lines)
Code
Loading syntax highlighter...

2. Batching Configuration Deep Dive

JAVA(39 lines)
Code
Loading syntax highlighter...

Visualizing Batching Trade-offs

LATENCY vs THROUGHPUT TRADE-OFF:

                    linger.ms=0        linger.ms=20       linger.ms=100
                    batch.size=16KB    batch.size=32KB    batch.size=128KB
                    ─────────────────────────────────────────────────────►
                    LOW LATENCY                              HIGH THROUGHPUT

Latency:            ~1ms              ~20ms               ~100ms
Throughput:         50K msg/s         200K msg/s          500K msg/s
Network efficiency: Poor              Good                Excellent
CPU usage:          High              Medium              Low


BATCH SIZE IMPACT ON THROUGHPUT:

Messages per second vs batch.size (fixed linger.ms=20ms):

500K │                              ┌──────────────
     │                         ┌────┘
400K │                    ┌────┘
     │               ┌────┘
300K │          ┌────┘
     │     ┌────┘
200K │┌────┘
     ││
100K ││
     └┴─────────────────────────────────────────────
       8KB  16KB  32KB  64KB  128KB  256KB  512KB
                      batch.size

Note: Returns diminish after 64-128KB

3. Compression Deep Dive

JAVA(29 lines)
Code
Loading syntax highlighter...

Compression Comparison

COMPRESSION ALGORITHM COMPARISON:

                    │ Compression │ Compression │ Decompression │ Typical    │
Algorithm           │ Speed       │ Ratio       │ Speed         │ Use Case   │
────────────────────┼─────────────┼─────────────┼───────────────┼────────────┤
none                │ N/A         │ 1.0x        │ N/A           │ Already    │
                    │             │             │               │ compressed │
────────────────────┼─────────────┼─────────────┼───────────────┼────────────┤
lz4                 │ 750 MB/s    │ 2.1x        │ 3500 MB/s     │ High       │
                    │ Fastest     │             │ Fastest       │ throughput │
────────────────────┼─────────────┼─────────────┼───────────────┼────────────┤
snappy              │ 500 MB/s    │ 2.2x        │ 1500 MB/s     │ General    │
                    │ Fast        │             │ Fast          │ purpose    │
────────────────────┼─────────────┼─────────────┼───────────────┼────────────┤
zstd                │ 150 MB/s    │ 3.5x        │ 1000 MB/s     │ Storage    │
                    │ Medium      │ Best        │ Fast          │ savings    │
────────────────────┼─────────────┼─────────────┼───────────────┼────────────┤
gzip                │ 50 MB/s     │ 3.2x        │ 300 MB/s      │ Legacy     │
                    │ Slowest     │             │ Slowest       │ compat     │
────────────────────┴─────────────┴─────────────┴───────────────┴────────────┘

WHEN COMPRESSION HELPS:

Original batch size: 64KB

With snappy (2.2x ratio):
├── Uncompressed: 64KB → Network: 64KB
├── Compressed:   64KB → Network: 29KB (55% savings)
└── Producer CPU: ~0.1ms additional

When compression helps MOST:
• Text/JSON data (high redundancy)
• Large batches (better ratio)
• Network-constrained environments
• Cross-datacenter replication

When compression helps LEAST:
• Binary/already-compressed data (images, encrypted)
• Very small messages
• CPU-constrained producers

4. Buffer Memory Management

JAVA(24 lines)
Code
Loading syntax highlighter...

Buffer Memory Visualization

BUFFER MEMORY ALLOCATION:

buffer.memory = 64MB
┌───────────────────────────────────────────────────────────────────────┐
│                                                                       │
│  ┌─────────────────────────────────────────────────────────────────┐  │
│  │                     RECORD ACCUMULATOR                          │  │
│  │                                                                 │  │
│  │   Partition Batches:                                            │  │
│  │   ┌──────────────┐ ┌──────────────┐ ┌──────────────┐            │  │
│  │   │  orders-0    │ │  orders-1    │ │  orders-2    │            │  │
│  │   │  ████████    │ │  ████        │ │  ██          │            │  │
│  │   │  12MB        │ │  8MB         │ │  4MB         │            │  │
│  │   └──────────────┘ └──────────────┘ └──────────────┘            │  │
│  │                                                                 │  │
│  │   ┌──────────────┐ ┌──────────────┐                             │  │
│  │   │  users-0     │ │  users-1     │                             │  │
│  │   │  ██████      │ │  ████████████│                             │  │
│  │   │  10MB        │ │  18MB        │                             │  │
│  │   └──────────────┘ └──────────────┘                             │  │
│  │                                                                 │  │
│  │   Used: 52MB / 64MB (81%)                                       │  │
│  │   Free: 12MB                                                    │  │
│  │                                                                 │  │
│  └─────────────────────────────────────────────────────────────────┘  │
│                                                                       │
└───────────────────────────────────────────────────────────────────────┘

WHEN BUFFER FILLS UP:

1. Normal operation (buffer < 100%):
   send() → add to batch → return immediately

2. Buffer full (buffer = 100%):
   send() → BLOCK waiting for space
           │
           ├── Space freed (batch sent) → continue
           │
           └── max.block.ms exceeded → TimeoutException

3. Back-pressure scenarios:
   • Broker slow/down → batches not draining
   • High message rate → faster than can send
   • Large messages → fills buffer quickly

5. Spring Kafka Producer Configuration

JAVA(41 lines)
Code
Loading syntax highlighter...

Using KafkaTemplate

JAVA(71 lines)
Code
Loading syntax highlighter...

6. Producer Interceptors

JAVA(74 lines)
Code
Loading syntax highlighter...

7. Producer Metrics

JAVA(36 lines)
Code
Loading syntax highlighter...

Common Mistakes

Mistake 1: Not Handling Send Failures

JAVA(25 lines)
Code
Loading syntax highlighter...

Mistake 2: Blocking in High-Throughput Code

JAVA(27 lines)
Code
Loading syntax highlighter...

Mistake 3: Wrong Batch Configuration

JAVA(14 lines)
Code
Loading syntax highlighter...

Mistake 4: Ignoring Buffer Memory Limits

JAVA(10 lines)
Code
Loading syntax highlighter...

Mistake 5: Wrong Compression for Data Type

JAVA(13 lines)
Code
Loading syntax highlighter...

Debug This

Scenario: Producer Throughput Suddenly Drops

Symptoms:
  • Producer was sending 100K msg/s, now only 10K msg/s
  • No errors in logs
  • Broker CPU and disk seem fine
  • Application CPU usage increased
Investigation:
JAVA(27 lines)
Code
Loading syntax highlighter...
BASH(5 lines)
Code
Loading syntax highlighter...
Likely Causes:
  1. Data characteristics changed: Messages larger or less compressible
  2. Broker performance: Disk full, replication lag, under-replicated partitions
  3. Network: Increased latency to brokers
  4. Configuration drift: Someone changed batch.size or compression
Resolution:
JAVA(5 lines)
Code
Loading syntax highlighter...

Exercises

Exercise 1: Benchmark Batching Configurations

Write a producer benchmark that:

  1. Sends 1 million small messages (100 bytes each)
  2. Tests different batch.size values: 4KB, 16KB, 32KB, 64KB, 128KB
  3. Tests different linger.ms values: 0, 5, 20, 50, 100
  4. Measures and reports:
    • Total time to send all messages
    • Messages per second
    • Average latency per message

Exercise 2: Compression Comparison

Build a test that:

  1. Uses real production-like JSON messages (order events, user events)
  2. Compares all compression types: none, snappy, lz4, zstd, gzip
  3. Measures:
    • Compression ratio
    • Producer CPU usage
    • End-to-end latency

Exercise 3: Back-pressure Handling

Implement a producer that:

  1. Detects when buffer is nearly full (> 90%)
  2. Implements back-pressure by slowing down message production
  3. Alerts when buffer consistently full
  4. Gracefully handles TimeoutException from max.block.ms
JAVA(8 lines)
Code
Loading syntax highlighter...

Exercise 4: Custom Metrics Dashboard

Create a Micrometer-based metrics collector that exposes:

  1. Records sent per second (by topic)
  2. Average batch size
  3. Buffer utilization percentage
  4. Send latency percentiles (p50, p95, p99)
  5. Error rate by error type

Exercise 5: Producer Health Check

Implement a Spring Boot actuator health indicator for Kafka producer that checks:

  1. Can connect to bootstrap servers
  2. Buffer not critically full (< 80%)
  3. Recent send success rate > 99%
  4. No sustained send failures in last minute

Interview Questions

Q1: Explain the producer's batching mechanism and how to tune it.

A: The producer uses a RecordAccumulator to batch records before sending:
How it works:
  1. When send() is called, records are added to partition-specific queues
  2. Records accumulate into batches (up to batch.size bytes)
  3. Batches are sent when either:
    • batch.size is reached (batch full)
    • linger.ms time passes (timer expires)
    • flush() is called explicitly
Key parameters:
batch.size (default 16KB):
  • Maximum bytes per batch
  • Larger batches = better throughput, higher latency
  • Consider message size and rate when tuning
linger.ms (default 0):
  • How long to wait for more records
  • 0 means send immediately if a batch exists
  • Higher values allow better batching at cost of latency
Tuning approach:
  1. Start with defaults (16KB, 0ms)
  2. If throughput matters: increase both (32KB, 20ms)
  3. If latency matters: keep linger.ms low (0-5ms)
  4. Monitor batch-size-avg metric to see if batches are filling
  5. Adjust based on observed metrics, not theory

Q2: How does producer memory management work, and what happens when the buffer fills up?

A: The producer has a fixed memory buffer controlled by buffer.memory:
Memory allocation:
  • buffer.memory defines total memory for buffering (default 32MB)
  • Memory is shared across all partitions
  • Each partition has a queue of batches
  • Memory is released when batches are acknowledged
When buffer fills:
  1. send() blocks waiting for memory
  2. Waits up to max.block.ms (default 60 seconds)
  3. If timeout expires, throws TimeoutException
  4. Memory freed when in-flight batches complete
Back-pressure mechanism:
  • Full buffer naturally slows down producers
  • Protects against OOM
  • But can cause cascading timeouts if misconfigured
Production recommendations:
  • Size buffer for expected burst traffic
  • Set max.block.ms appropriately (don't block forever)
  • Monitor buffer utilization
  • Implement circuit breaker pattern for failure handling

Q3: Compare the different compression algorithms available in Kafka.

A: Kafka supports four compression algorithms, each with different trade-offs:
LZ4 (Recommended for most cases):
  • Fastest compression and decompression
  • Good compression ratio (2-3x for text)
  • Minimal CPU overhead
  • Best for high-throughput, latency-sensitive workloads
Snappy:
  • Very fast, slightly slower than LZ4
  • Similar compression ratio to LZ4
  • Google-developed, well-tested
  • Good default choice, widely used
ZSTD (Zstandard):
  • Best compression ratio (3-5x for text)
  • Slower compression than LZ4/Snappy
  • Fast decompression
  • Best for storage-sensitive environments
GZIP:
  • Good compression ratio
  • Slowest compression and decompression
  • High CPU usage
  • Legacy option, generally not recommended
Choosing algorithm:
  • Text/JSON data: LZ4 or Snappy
  • Storage costs matter: ZSTD
  • Already compressed data: None
  • CPU-constrained: LZ4
  • Legacy compatibility: GZIP

Q4: What is the role of the Sender thread and how does it interact with the RecordAccumulator?

A: The Sender is a single background thread that handles all network communication:
Responsibilities:
  1. Poll RecordAccumulator for ready batches
  2. Group batches by destination broker
  3. Apply compression (if configured)
  4. Send ProduceRequests to brokers
  5. Handle responses and invoke callbacks
  6. Manage metadata updates
Interaction with RecordAccumulator:
Application Thread(s)     Sender Thread
        │                      │
        │  send()              │
        ▼                      │
┌───────────────┐              │
│ Add record to │              │
│ partition     │              │
│ queue         │              │
└───────┬───────┘              │
        │                      │
        │ wakeup if batch ready│
        ├─────────────────────►│
        │                      ▼
        │              ┌───────────────┐
        │              │ Get ready     │
        │              │ batches       │
        │              └───────┬───────┘
        │                      │
        │                      ▼
        │              ┌───────────────┐
        │              │ Group by      │
        │              │ broker        │
        │              └───────┬───────┘
        │                      │
        │                      ▼
        │              ┌───────────────┐
        │              │ Send requests │
        │              └───────────────┘
Key considerations:
  • Single Sender thread = ordered network operations
  • max.in.flight.requests.per.connection controls pipelining
  • Sender handles all retries internally
  • Callbacks invoked from Sender thread (keep them fast!)

Q5: How would you design a high-throughput producer for a system sending 500K messages per second?

A: For 500K msg/s, I'd focus on batching, parallelism, and resource sizing:
Configuration:
JAVA(16 lines)
Code
Loading syntax highlighter...
Architecture:
  1. Multiple producers: Sender thread is single-threaded per producer. Use multiple producer instances (e.g., one per application thread).
  2. Partition strategy: Ensure even distribution across partitions. With 500K/s over 50 partitions = 10K/s per partition.
  3. Async sending: Never use synchronous get() in hot path.
  4. Monitoring: Watch buffer utilization, batch sizes, and request latency.
Hardware considerations:
  • Network: 500K * 500 bytes = 250MB/s minimum bandwidth
  • CPU: LZ4 compression needs cores
  • Memory: 256MB buffer * number of producers
Testing: Load test before production to verify configuration handles target throughput.

Summary

Key Takeaways

  1. Producer architecture consists of serializers, partitioner, RecordAccumulator, and Sender thread working together
  2. Batching is critical for throughput - configure batch.size and linger.ms based on your latency/throughput requirements
  3. Compression happens at batch level - LZ4 for speed, ZSTD for ratio, Snappy for balance
  4. Buffer memory is your back-pressure mechanism - size appropriately and set reasonable max.block.ms
  5. Always handle send failures - fire-and-forget is only acceptable if data loss is tolerable
  6. Monitor producer metrics - batch size, queue time, and buffer utilization tell you how producer is performing
  7. Sender thread is single-threaded - use multiple producers for CPU-bound serialization workloads
  8. Spring Kafka provides KafkaTemplate for easy integration with all these concepts

Quick Reference

Essential Producer Configuration

PROPERTIES(18 lines)
Code
Loading syntax highlighter...

KafkaTemplate Cheat Sheet

JAVA(18 lines)
Code
Loading syntax highlighter...

Series Navigation

PreviousCurrentNext
Part 4: Cluster CoordinationPart 5: Producer InternalsPart 6: Delivery Guarantees

Series Overview

  • Part 0: How to Use This Series
  • Parts 1-4: Fundamentals
  • Parts 5-7: Producers (Internals, Delivery Guarantees, Advanced Patterns)
  • Parts 8-11: Consumers
  • Parts 12-14: Operations
  • Parts 15-17: Kafka Streams
  • Parts 18-20: Patterns & Practices
  • Part 21: Cheatsheet & Decision Guide