Devops

Advanced Producer Patterns


At a Glance

AspectDetails
TopicCustom partitioners, transactional producer, interceptors, headers
ComplexityAdvanced
PrerequisitesParts 5-6 (Producer Internals, Delivery Guarantees)
Time90 minutes
Spring KafkaTransactional templates, custom partitioner beans

What You'll Learn

After completing this article, you will be able to:

  1. Implement custom partitioners for domain-specific distribution
  2. Configure transactional producers for exactly-once semantics
  3. Use ProducerInterceptors for cross-cutting concerns
  4. Leverage headers for metadata propagation
  5. Choose between fire-and-forget, async, and sync patterns

Production Story: The Lost Ordering

The Incident

Our inventory system tracks stock changes per warehouse. Each warehouse should process its events in strict order - you can't sell item 5 before item 3 arrives. But we started seeing "negative inventory" alerts - items sold before they were received.

The Investigation

JAVA(18 lines)
Code
Loading syntax highlighter...

The problem:

Expected partition assignment (by productId):
  P-123 → partition 7
  P-456 → partition 3
  P-789 → partition 7

But events need ordering by WAREHOUSE, not product!

WH-NYC events (different products):
  T1: P-123 RECEIVE → partition 7
  T2: P-123 SELL    → partition 7  ✓ Same partition
  T3: P-456 RECEIVE → partition 3  ✗ DIFFERENT partition!
  T4: P-456 SELL    → partition 3  ✗ DIFFERENT partition!

Problem: Warehouse WH-NYC has events spread across partitions
         No ordering guarantee across partitions!
         P-456 SELL might be processed before P-123 RECEIVE

The Root Cause

Using productId as the key, but needing ordering by warehouseId:
┌─────────────────────────────────────────────────────────────────────┐
│                    WRONG: Key by Product                            │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Partition 3:  [P-456 RCV][P-456 SELL][P-789 SELL]                  │
│  Partition 7:  [P-123 RCV][P-123 SELL][P-789 RCV]                   │
│                                                                     │
│  Consumer for WH-NYC gets events from multiple partitions           │
│  No cross-partition ordering → events processed out of order        │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│                    RIGHT: Key by Warehouse                          │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Partition 3:  [WH-NYC: P-123 RCV][WH-NYC: P-456 RCV][WH-NYC: SELL] │
│  Partition 7:  [WH-LAX: P-789 RCV][WH-LAX: P-789 SELL]              │
│                                                                     │
│  All WH-NYC events in same partition → strict ordering              │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

The Fix

JAVA(40 lines)
Code
Loading syntax highlighter...

After the fix: Zero ordering violations, inventory always accurate.


Mental Model: Producer Patterns

┌─────────────────────────────────────────────────────────────────────────┐
│                      PRODUCER PATTERNS COMPARISON                       │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  1. FIRE AND FORGET                                                     │
│  ───────────────────                                                    │
│  ┌──────────┐    send()     ┌─────────┐                                 │
│  │ Producer │ ─────────────►│  Kafka  │                                 │
│  │          │  (no wait)    │         │                                 │
│  │ Continue │               │   ???   │  May or may not arrive          │
│  └──────────┘               └─────────┘                                 │
│                                                                         │
│  Latency: ~0ms (from producer perspective)                              │
│  Guarantee: None                                                        │
│  Use: Metrics, logs, non-critical events                                │
│                                                                         │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  2. ASYNC WITH CALLBACK                                                 │
│  ──────────────────────                                                 │
│  ┌──────────┐    send()     ┌─────────┐                                 │
│  │ Producer │ ─────────────►│  Kafka  │                                 │
│  │          │               │         │                                 │
│  │ Continue │               │ Process │                                 │
│  │ working  │    callback   │         │                                 │
│  │    ◄─────┼───────────────│         │                                 │
│  │  Handle  │  (later)      │         │                                 │
│  └──────────┘               └─────────┘                                 │
│                                                                         │
│  Latency: ~0ms send, callback invoked async                             │
│  Guarantee: Know if succeeded/failed                                    │
│  Use: Most production use cases                                         │
│                                                                         │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  3. SYNCHRONOUS                                                         │
│  ─────────────                                                          │
│  ┌──────────┐    send()     ┌─────────┐                                 │
│  │ Producer │ ─────────────►│  Kafka  │                                 │
│  │          │               │         │                                 │
│  │ BLOCKED  │               │ Process │                                 │
│  │    .     │    ack        │         │                                 │
│  │    .     │ ◄─────────────│         │                                 │
│  │ Continue │               │         │                                 │
│  └──────────┘               └─────────┘                                 │
│                                                                         │
│  Latency: Full round-trip (~5-50ms)                                     │
│  Guarantee: Know result before continuing                               │
│  Use: When must confirm before proceeding                               │
│                                                                         │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  4. TRANSACTIONAL                                                       │
│  ───────────────                                                        │
│  ┌──────────┐  beginTx()    ┌─────────┐                                 │
│  │ Producer │ ─────────────►│  Kafka  │                                 │
│  │          │  send()       │         │                                 │
│  │          │ ─────────────►│ Pending │                                 │
│  │          │  send()       │         │                                 │
│  │          │ ─────────────►│ Pending │                                 │
│  │          │  commit()     │         │                                 │
│  │          │ ─────────────►│ Commit! │  All or nothing                 │
│  └──────────┘               └─────────┘                                 │
│                                                                         │
│  Latency: Higher (transaction overhead)                                 │
│  Guarantee: Exactly-once, atomic multi-message                          │
│  Use: Financial, cross-topic consistency                                │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Deep Dive

1. Custom Partitioners

Default Partitioning Behavior

JAVA(21 lines)
Code
Loading syntax highlighter...

Implementing Custom Partitioners

JAVA(90 lines)
Code
Loading syntax highlighter...

Spring Kafka Custom Partitioner

JAVA(20 lines)
Code
Loading syntax highlighter...

2. Transactional Producer

JAVA(41 lines)
Code
Loading syntax highlighter...

Using Transactional Producer

JAVA(50 lines)
Code
Loading syntax highlighter...

Transactional Producer Internals

TRANSACTION LIFECYCLE:

┌─────────────────────────────────────────────────────────────────────┐
│  1. INIT TRANSACTIONS                                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Producer                      Transaction Coordinator              │
│  ┌──────────┐                 ┌─────────────────────┐               │
│  │   Init   │ ───────────────► │ Register PID+TxID   │              │
│  │          │                 │ Fence old producers │               │
│  │          │ ◄─────────────── │ with same TxID      │              │
│  └──────────┘                 └─────────────────────┘               │
│                                                                     │
│  "Fencing" ensures only ONE producer with this transactional.id     │
│  Previous instance (if any) is prevented from completing txs        │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│  2. BEGIN TRANSACTION                                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Producer                      Transaction Coordinator              │
│  ┌──────────┐                 ┌─────────────────────┐               │
│  │  Begin   │ ───────────────► │ Create tx state:    │              │
│  │          │                 │   state=ONGOING     │               │
│  │          │                 │   partitions=[]     │               │
│  └──────────┘                 └─────────────────────┘               │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│  3. SEND MESSAGES                                                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  For each target partition:                                         │
│                                                                     │
│  Producer          Coordinator           Partition Leader           │
│  ┌──────────┐     ┌───────────┐         ┌─────────────────┐         │
│  │AddPartTo │────►│ Track     │         │                 │         │
│  │Transaction│    │ partition │         │                 │         │
│  │          │     │ in tx     │         │                 │         │
│  │          │     └───────────┘         │                 │         │
│  │          │                           │                 │         │
│  │  Send    │ ─────────────────────────►│ Write with      │         │
│  │  Record  │                           │ tx marker       │         │
│  │          │                           │ (not visible)   │         │
│  └──────────┘                           └─────────────────┘         │
│                                                                     │
│  Records written but NOT visible to consumers until commit          │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│  4. COMMIT/ABORT                                                    │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  COMMIT:                                                            │
│  Producer          Coordinator           All Partition Leaders      │
│  ┌──────────┐     ┌───────────┐         ┌─────────────────┐         │
│  │  Commit  │────►│ state=    │────────►│ Write COMMIT    │         │
│  │          │     │ PREPARING │         │ marker          │         │
│  │          │     │           │◄────────│                 │         │
│  │          │     │ state=    │         │ Records now     │         │
│  │          │     │ COMMITTED │         │ VISIBLE         │         │
│  └──────────┘     └───────────┘         └─────────────────┘         │
│                                                                     │
│  ABORT:                                                             │
│  ┌──────────┐     ┌───────────┐         ┌─────────────────┐         │
│  │  Abort   │────►│ state=    │────────►│ Write ABORT     │         │
│  │          │     │ ABORTING  │         │ marker          │         │
│  │          │     │           │◄────────│                 │         │
│  │          │     │ state=    │         │ Records         │         │
│  │          │     │ ABORTED   │         │ INVISIBLE       │         │
│  └──────────┘     └───────────┘         └─────────────────┘         │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

3. Producer Interceptors

JAVA(122 lines)
Code
Loading syntax highlighter...

4. Headers for Metadata Propagation

JAVA(103 lines)
Code
Loading syntax highlighter...

5. Choosing the Right Pattern

JAVA(86 lines)
Code
Loading syntax highlighter...

Common Mistakes

Mistake 1: Inconsistent Partitioning Strategy

JAVA(20 lines)
Code
Loading syntax highlighter...

Mistake 2: Non-Unique Transactional ID

JAVA(14 lines)
Code
Loading syntax highlighter...

Mistake 3: Interceptor Blocks the Producer

JAVA(20 lines)
Code
Loading syntax highlighter...

Mistake 4: Headers Too Large

JAVA(10 lines)
Code
Loading syntax highlighter...

Mistake 5: Forgetting Transaction Timeout

JAVA(27 lines)
Code
Loading syntax highlighter...

Debug This

Scenario: Transactional Producer Hangs

Symptoms:
  • executeInTransaction never returns
  • No errors in producer logs
  • CPU usage low
  • Consumer not seeing messages
Investigation:
JAVA(16 lines)
Code
Loading syntax highlighter...
BASH(13 lines)
Code
Loading syntax highlighter...
Common Causes:
  1. Transaction coordinator unavailable: __transaction_state topic not ready
  2. Producer fenced: Another producer with same transactional.id started
  3. Timeout: Transaction took longer than transaction.timeout.ms
  4. Broker issue: Transaction coordinator broker down
Resolution:
JAVA(18 lines)
Code
Loading syntax highlighter...

Exercises

Exercise 1: Implement Consistent Hash Partitioner

Create a partitioner that uses consistent hashing:

  1. Handles partition count changes gracefully
  2. Minimizes message redistribution when partitions added
  3. Supports weighted partitions
JAVA(11 lines)
Code
Loading syntax highlighter...

Exercise 2: Build a Transactional Saga

Implement an order saga with compensating transactions:

  1. Create order (Kafka)
  2. Reserve inventory (Kafka)
  3. Process payment (Kafka)
  4. If any step fails, send compensating events

Exercise 3: Create a Tracing Interceptor

Build a complete distributed tracing interceptor:

  1. Create spans for each send
  2. Propagate trace context in headers
  3. Record success/failure status
  4. Integrate with Jaeger or Zipkin

Exercise 4: Header-based Message Router

Implement a producer that:

  1. Adds routing headers based on message content
  2. Supports priority routing
  3. Adds TTL headers for message expiration
  4. Consumers can filter by headers

Exercise 5: Multi-tenant Partitioner

Build a partitioner for multi-tenant SaaS:

  1. Each tenant gets dedicated partitions
  2. Fair distribution among tenants
  3. Prevents one tenant from affecting others
  4. Supports tenant priorities

Interview Questions

Q1: When would you implement a custom partitioner?

A: Custom partitioners are needed when default hash-based partitioning doesn't match your data locality requirements:
Use cases:
  1. Ordering by different attribute: Default uses key, but you need ordering by a different field (warehouse example)
  2. Priority partitioning: High-priority messages to dedicated partitions for faster processing
  3. Geographic locality: Route messages to partitions that will be consumed by regional processors
  4. Tenant isolation: Multi-tenant systems where each tenant gets specific partitions
  5. Load balancing by size: Large messages to partitions with more capacity
Implementation considerations:
  • Must be deterministic (same input → same partition)
  • Should distribute evenly to avoid hot partitions
  • Must handle null keys gracefully
  • Consider partition count changes

Q2: Explain the difference between idempotent and transactional producers.

A:
Idempotent Producer:
  • Prevents duplicates within single partition
  • Per-session guarantee (new PID on restart)
  • Uses sequence numbers to detect duplicates
  • No atomic multi-partition writes
  • Lower overhead
JAVA(2 lines)
Code
Loading syntax highlighter...
Transactional Producer:
  • Atomic writes across multiple partitions
  • Survives producer restarts (via transactional.id)
  • Supports exactly-once with consumers (read_committed)
  • Higher overhead (two-phase commit)
JAVA(2 lines)
Code
Loading syntax highlighter...
When to use each:
  • Idempotent: Single-topic/partition writes, most applications
  • Transactional: Multi-topic atomic writes, read-process-write patterns, financial systems

Q3: How do you ensure message ordering when using multiple producer instances?

A: Ordering is per-partition in Kafka. Multiple producers can maintain order:
Strategy 1: Partition-aware producers
JAVA(6 lines)
Code
Loading syntax highlighter...
Strategy 2: Key-based ordering
JAVA(4 lines)
Code
Loading syntax highlighter...
Strategy 3: Single producer for ordered streams
JAVA(2 lines)
Code
Loading syntax highlighter...
Key insight: Kafka guarantees ordering within partition. Multiple producers sending to different partitions is fine. Multiple producers to same partition is also fine - ordering is per-key when using keys.

Q4: What are the best practices for using Kafka headers?

A: Headers are metadata attached to messages without affecting the key/value:
Do use headers for:
  • Correlation IDs for tracing
  • Source service identification
  • Schema version information
  • Routing hints
  • Audit metadata (user ID, timestamp)
Don't use headers for:
  • Large payloads (counts toward message size)
  • Business-critical data (not all tools support headers)
  • Data that needs indexing (headers aren't searchable)
Best practices:
  1. Keep headers small (< 1KB total)
  2. Use consistent naming convention (X-Custom-Header)
  3. Document header contracts between services
  4. Handle missing headers gracefully
  5. Copy relevant headers when forwarding messages
JAVA(5 lines)
Code
Loading syntax highlighter...

Q5: How do you handle producer failures in a transactional context?

A: Transactional failures require careful handling:
ProducerFencedException:
JAVA(7 lines)
Code
Loading syntax highlighter...
TransactionAbortedException:
JAVA(5 lines)
Code
Loading syntax highlighter...
TimeoutException in transaction:
JAVA(5 lines)
Code
Loading syntax highlighter...
General error handling pattern:
JAVA(12 lines)
Code
Loading syntax highlighter...

Summary

Key Takeaways

  1. Custom partitioners give you control over message distribution for ordering and locality requirements
  2. Transactional producers provide exactly-once semantics for atomic multi-partition writes
  3. Interceptors enable cross-cutting concerns (tracing, metrics) without changing business code
  4. Headers are perfect for metadata that needs to travel with messages
  5. Choose the right pattern: fire-and-forget for metrics, async for most cases, sync when confirmation needed
  6. Transactional.id must be unique per producer instance to avoid fencing
  7. Keep interceptors fast - they run in the producer's hot path
  8. Align partitioning strategy with your processing requirements, not just key selection

Quick Reference

Custom Partitioner Template

JAVA(10 lines)
Code
Loading syntax highlighter...

Transactional Template

JAVA(12 lines)
Code
Loading syntax highlighter...

Common Headers

JAVA(6 lines)
Code
Loading syntax highlighter...

Series Navigation

PreviousCurrentNext
Part 6: Delivery GuaranteesPart 7: Advanced Producer PatternsPart 8: Consumer Internals

Series Overview

  • Part 0: How to Use This Series
  • Parts 1-4: Fundamentals
  • Parts 5-7: Producers (Internals, Delivery Guarantees, Advanced Patterns)
  • Parts 8-11: Consumers
  • Parts 12-14: Operations
  • Parts 15-17: Kafka Streams
  • Parts 18-20: Patterns & Practices
  • Part 21: Cheatsheet & Decision Guide