Devops
Advanced Producer Patterns
At a Glance
| Aspect | Details |
|---|---|
| Topic | Custom partitioners, transactional producer, interceptors, headers |
| Complexity | Advanced |
| Prerequisites | Parts 5-6 (Producer Internals, Delivery Guarantees) |
| Time | 90 minutes |
| Spring Kafka | Transactional templates, custom partitioner beans |
What You'll Learn
After completing this article, you will be able to:
- Implement custom partitioners for domain-specific distribution
- Configure transactional producers for exactly-once semantics
- Use ProducerInterceptors for cross-cutting concerns
- Leverage headers for metadata propagation
- Choose between fire-and-forget, async, and sync patterns
Production Story: The Lost Ordering
The Incident
Our inventory system tracks stock changes per warehouse. Each warehouse should process its events in strict order - you can't sell item 5 before item 3 arrives. But we started seeing "negative inventory" alerts - items sold before they were received.
The Investigation
JAVA(18 lines)CodeLoading syntax highlighter...
The problem:
Expected partition assignment (by productId): P-123 → partition 7 P-456 → partition 3 P-789 → partition 7 But events need ordering by WAREHOUSE, not product! WH-NYC events (different products): T1: P-123 RECEIVE → partition 7 T2: P-123 SELL → partition 7 ✓ Same partition T3: P-456 RECEIVE → partition 3 ✗ DIFFERENT partition! T4: P-456 SELL → partition 3 ✗ DIFFERENT partition! Problem: Warehouse WH-NYC has events spread across partitions No ordering guarantee across partitions! P-456 SELL might be processed before P-123 RECEIVE
The Root Cause
Using
productId as the key, but needing ordering by warehouseId:┌─────────────────────────────────────────────────────────────────────┐ │ WRONG: Key by Product │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ Partition 3: [P-456 RCV][P-456 SELL][P-789 SELL] │ │ Partition 7: [P-123 RCV][P-123 SELL][P-789 RCV] │ │ │ │ Consumer for WH-NYC gets events from multiple partitions │ │ No cross-partition ordering → events processed out of order │ │ │ └─────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────┐ │ RIGHT: Key by Warehouse │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ Partition 3: [WH-NYC: P-123 RCV][WH-NYC: P-456 RCV][WH-NYC: SELL] │ │ Partition 7: [WH-LAX: P-789 RCV][WH-LAX: P-789 SELL] │ │ │ │ All WH-NYC events in same partition → strict ordering │ │ │ └─────────────────────────────────────────────────────────────────────┘
The Fix
JAVA(40 lines)CodeLoading syntax highlighter...
After the fix: Zero ordering violations, inventory always accurate.
Mental Model: Producer Patterns
┌─────────────────────────────────────────────────────────────────────────┐ │ PRODUCER PATTERNS COMPARISON │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ 1. FIRE AND FORGET │ │ ─────────────────── │ │ ┌──────────┐ send() ┌─────────┐ │ │ │ Producer │ ─────────────►│ Kafka │ │ │ │ │ (no wait) │ │ │ │ │ Continue │ │ ??? │ May or may not arrive │ │ └──────────┘ └─────────┘ │ │ │ │ Latency: ~0ms (from producer perspective) │ │ Guarantee: None │ │ Use: Metrics, logs, non-critical events │ │ │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ 2. ASYNC WITH CALLBACK │ │ ────────────────────── │ │ ┌──────────┐ send() ┌─────────┐ │ │ │ Producer │ ─────────────►│ Kafka │ │ │ │ │ │ │ │ │ │ Continue │ │ Process │ │ │ │ working │ callback │ │ │ │ │ ◄─────┼───────────────│ │ │ │ │ Handle │ (later) │ │ │ │ └──────────┘ └─────────┘ │ │ │ │ Latency: ~0ms send, callback invoked async │ │ Guarantee: Know if succeeded/failed │ │ Use: Most production use cases │ │ │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ 3. SYNCHRONOUS │ │ ───────────── │ │ ┌──────────┐ send() ┌─────────┐ │ │ │ Producer │ ─────────────►│ Kafka │ │ │ │ │ │ │ │ │ │ BLOCKED │ │ Process │ │ │ │ . │ ack │ │ │ │ │ . │ ◄─────────────│ │ │ │ │ Continue │ │ │ │ │ └──────────┘ └─────────┘ │ │ │ │ Latency: Full round-trip (~5-50ms) │ │ Guarantee: Know result before continuing │ │ Use: When must confirm before proceeding │ │ │ ├─────────────────────────────────────────────────────────────────────────┤ │ │ │ 4. TRANSACTIONAL │ │ ─────────────── │ │ ┌──────────┐ beginTx() ┌─────────┐ │ │ │ Producer │ ─────────────►│ Kafka │ │ │ │ │ send() │ │ │ │ │ │ ─────────────►│ Pending │ │ │ │ │ send() │ │ │ │ │ │ ─────────────►│ Pending │ │ │ │ │ commit() │ │ │ │ │ │ ─────────────►│ Commit! │ All or nothing │ │ └──────────┘ └─────────┘ │ │ │ │ Latency: Higher (transaction overhead) │ │ Guarantee: Exactly-once, atomic multi-message │ │ Use: Financial, cross-topic consistency │ │ │ └─────────────────────────────────────────────────────────────────────────┘
Deep Dive
1. Custom Partitioners
Default Partitioning Behavior
JAVA(21 lines)CodeLoading syntax highlighter...
Implementing Custom Partitioners
JAVA(90 lines)CodeLoading syntax highlighter...
Spring Kafka Custom Partitioner
JAVA(20 lines)CodeLoading syntax highlighter...
2. Transactional Producer
JAVA(41 lines)CodeLoading syntax highlighter...
Using Transactional Producer
JAVA(50 lines)CodeLoading syntax highlighter...
Transactional Producer Internals
TRANSACTION LIFECYCLE: ┌─────────────────────────────────────────────────────────────────────┐ │ 1. INIT TRANSACTIONS │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ Producer Transaction Coordinator │ │ ┌──────────┐ ┌─────────────────────┐ │ │ │ Init │ ───────────────► │ Register PID+TxID │ │ │ │ │ │ Fence old producers │ │ │ │ │ ◄─────────────── │ with same TxID │ │ │ └──────────┘ └─────────────────────┘ │ │ │ │ "Fencing" ensures only ONE producer with this transactional.id │ │ Previous instance (if any) is prevented from completing txs │ │ │ └─────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────┐ │ 2. BEGIN TRANSACTION │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ Producer Transaction Coordinator │ │ ┌──────────┐ ┌─────────────────────┐ │ │ │ Begin │ ───────────────► │ Create tx state: │ │ │ │ │ │ state=ONGOING │ │ │ │ │ │ partitions=[] │ │ │ └──────────┘ └─────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────┐ │ 3. SEND MESSAGES │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ For each target partition: │ │ │ │ Producer Coordinator Partition Leader │ │ ┌──────────┐ ┌───────────┐ ┌─────────────────┐ │ │ │AddPartTo │────►│ Track │ │ │ │ │ │Transaction│ │ partition │ │ │ │ │ │ │ │ in tx │ │ │ │ │ │ │ └───────────┘ │ │ │ │ │ │ │ │ │ │ │ Send │ ─────────────────────────►│ Write with │ │ │ │ Record │ │ tx marker │ │ │ │ │ │ (not visible) │ │ │ └──────────┘ └─────────────────┘ │ │ │ │ Records written but NOT visible to consumers until commit │ │ │ └─────────────────────────────────────────────────────────────────────┘ ┌─────────────────────────────────────────────────────────────────────┐ │ 4. COMMIT/ABORT │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ COMMIT: │ │ Producer Coordinator All Partition Leaders │ │ ┌──────────┐ ┌───────────┐ ┌─────────────────┐ │ │ │ Commit │────►│ state= │────────►│ Write COMMIT │ │ │ │ │ │ PREPARING │ │ marker │ │ │ │ │ │ │◄────────│ │ │ │ │ │ │ state= │ │ Records now │ │ │ │ │ │ COMMITTED │ │ VISIBLE │ │ │ └──────────┘ └───────────┘ └─────────────────┘ │ │ │ │ ABORT: │ │ ┌──────────┐ ┌───────────┐ ┌─────────────────┐ │ │ │ Abort │────►│ state= │────────►│ Write ABORT │ │ │ │ │ │ ABORTING │ │ marker │ │ │ │ │ │ │◄────────│ │ │ │ │ │ │ state= │ │ Records │ │ │ │ │ │ ABORTED │ │ INVISIBLE │ │ │ └──────────┘ └───────────┘ └─────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘
3. Producer Interceptors
JAVA(122 lines)CodeLoading syntax highlighter...
4. Headers for Metadata Propagation
JAVA(103 lines)CodeLoading syntax highlighter...
5. Choosing the Right Pattern
JAVA(86 lines)CodeLoading syntax highlighter...
Common Mistakes
Mistake 1: Inconsistent Partitioning Strategy
JAVA(20 lines)CodeLoading syntax highlighter...
Mistake 2: Non-Unique Transactional ID
JAVA(14 lines)CodeLoading syntax highlighter...
Mistake 3: Interceptor Blocks the Producer
JAVA(20 lines)CodeLoading syntax highlighter...
Mistake 4: Headers Too Large
JAVA(10 lines)CodeLoading syntax highlighter...
Mistake 5: Forgetting Transaction Timeout
JAVA(27 lines)CodeLoading syntax highlighter...
Debug This
Scenario: Transactional Producer Hangs
Symptoms:
executeInTransactionnever returns- No errors in producer logs
- CPU usage low
- Consumer not seeing messages
Investigation:
JAVA(16 lines)CodeLoading syntax highlighter...
BASH(13 lines)CodeLoading syntax highlighter...
Common Causes:
- Transaction coordinator unavailable:
__transaction_statetopic not ready - Producer fenced: Another producer with same transactional.id started
- Timeout: Transaction took longer than
transaction.timeout.ms - Broker issue: Transaction coordinator broker down
Resolution:
JAVA(18 lines)CodeLoading syntax highlighter...
Exercises
Exercise 1: Implement Consistent Hash Partitioner
Create a partitioner that uses consistent hashing:
- Handles partition count changes gracefully
- Minimizes message redistribution when partitions added
- Supports weighted partitions
JAVA(11 lines)CodeLoading syntax highlighter...
Exercise 2: Build a Transactional Saga
Implement an order saga with compensating transactions:
- Create order (Kafka)
- Reserve inventory (Kafka)
- Process payment (Kafka)
- If any step fails, send compensating events
Exercise 3: Create a Tracing Interceptor
Build a complete distributed tracing interceptor:
- Create spans for each send
- Propagate trace context in headers
- Record success/failure status
- Integrate with Jaeger or Zipkin
Exercise 4: Header-based Message Router
Implement a producer that:
- Adds routing headers based on message content
- Supports priority routing
- Adds TTL headers for message expiration
- Consumers can filter by headers
Exercise 5: Multi-tenant Partitioner
Build a partitioner for multi-tenant SaaS:
- Each tenant gets dedicated partitions
- Fair distribution among tenants
- Prevents one tenant from affecting others
- Supports tenant priorities
Interview Questions
Q1: When would you implement a custom partitioner?
A: Custom partitioners are needed when default hash-based partitioning doesn't match your data locality requirements:
Use cases:
-
Ordering by different attribute: Default uses key, but you need ordering by a different field (warehouse example)
-
Priority partitioning: High-priority messages to dedicated partitions for faster processing
-
Geographic locality: Route messages to partitions that will be consumed by regional processors
-
Tenant isolation: Multi-tenant systems where each tenant gets specific partitions
-
Load balancing by size: Large messages to partitions with more capacity
Implementation considerations:
- Must be deterministic (same input → same partition)
- Should distribute evenly to avoid hot partitions
- Must handle null keys gracefully
- Consider partition count changes
Q2: Explain the difference between idempotent and transactional producers.
A:
Idempotent Producer:
- Prevents duplicates within single partition
- Per-session guarantee (new PID on restart)
- Uses sequence numbers to detect duplicates
- No atomic multi-partition writes
- Lower overhead
JAVA(2 lines)CodeLoading syntax highlighter...
Transactional Producer:
- Atomic writes across multiple partitions
- Survives producer restarts (via transactional.id)
- Supports exactly-once with consumers (read_committed)
- Higher overhead (two-phase commit)
JAVA(2 lines)CodeLoading syntax highlighter...
When to use each:
- Idempotent: Single-topic/partition writes, most applications
- Transactional: Multi-topic atomic writes, read-process-write patterns, financial systems
Q3: How do you ensure message ordering when using multiple producer instances?
A: Ordering is per-partition in Kafka. Multiple producers can maintain order:
Strategy 1: Partition-aware producers
JAVA(6 lines)CodeLoading syntax highlighter...
Strategy 2: Key-based ordering
JAVA(4 lines)CodeLoading syntax highlighter...
Strategy 3: Single producer for ordered streams
JAVA(2 lines)CodeLoading syntax highlighter...
Key insight: Kafka guarantees ordering within partition. Multiple producers sending to different partitions is fine. Multiple producers to same partition is also fine - ordering is per-key when using keys.
Q4: What are the best practices for using Kafka headers?
A: Headers are metadata attached to messages without affecting the key/value:
Do use headers for:
- Correlation IDs for tracing
- Source service identification
- Schema version information
- Routing hints
- Audit metadata (user ID, timestamp)
Don't use headers for:
- Large payloads (counts toward message size)
- Business-critical data (not all tools support headers)
- Data that needs indexing (headers aren't searchable)
Best practices:
- Keep headers small (< 1KB total)
- Use consistent naming convention (X-Custom-Header)
- Document header contracts between services
- Handle missing headers gracefully
- Copy relevant headers when forwarding messages
JAVA(5 lines)CodeLoading syntax highlighter...
Q5: How do you handle producer failures in a transactional context?
A: Transactional failures require careful handling:
ProducerFencedException:
JAVA(7 lines)CodeLoading syntax highlighter...
TransactionAbortedException:
JAVA(5 lines)CodeLoading syntax highlighter...
TimeoutException in transaction:
JAVA(5 lines)CodeLoading syntax highlighter...
General error handling pattern:
JAVA(12 lines)CodeLoading syntax highlighter...
Summary
Key Takeaways
-
Custom partitioners give you control over message distribution for ordering and locality requirements
-
Transactional producers provide exactly-once semantics for atomic multi-partition writes
-
Interceptors enable cross-cutting concerns (tracing, metrics) without changing business code
-
Headers are perfect for metadata that needs to travel with messages
-
Choose the right pattern: fire-and-forget for metrics, async for most cases, sync when confirmation needed
-
Transactional.id must be unique per producer instance to avoid fencing
-
Keep interceptors fast - they run in the producer's hot path
-
Align partitioning strategy with your processing requirements, not just key selection
Quick Reference
Custom Partitioner Template
JAVA(10 lines)CodeLoading syntax highlighter...
Transactional Template
JAVA(12 lines)CodeLoading syntax highlighter...
Common Headers
JAVA(6 lines)CodeLoading syntax highlighter...
Series Navigation
| Previous | Current | Next |
|---|---|---|
| Part 6: Delivery Guarantees | Part 7: Advanced Producer Patterns | Part 8: Consumer Internals |
Series Overview
- Part 0: How to Use This Series
- Parts 1-4: Fundamentals
- Parts 5-7: Producers (Internals, Delivery Guarantees, Advanced Patterns)
- Parts 8-11: Consumers
- Parts 12-14: Operations
- Parts 15-17: Kafka Streams
- Parts 18-20: Patterns & Practices
- Part 21: Cheatsheet & Decision Guide