blog by OSO

Kafka queues in Apache Kafka 4.0 via Share Groups

Sion Smith 30 April 2025
kafka queues

For years, Apache Kafka’s consumer group mechanism has been the backbone of how applications consume data from Kafka topics. This model, while powerful and well-suited for many use cases, imposes certain constraints that have led engineers to develop various workarounds. But what if you need to scale your consumers beyond your partition count? What if your messages require variable processing times? What if your workload experiences dramatic peaks and troughs that demand elastic scaling?

The recent release of Apache Kafka 4.0 introduces a groundbreaking feature that addresses these challenges head-on. “Queues for Kafka,” officially known as Share Groups, represents a fundamental shift in how applications can consume data from Kafka topics. The name itself is intentionally provocative – it acknowledges that many organisations have been trying to use Kafka as a queue, despite its design as a distributed log.

Share Groups solve critical limitations of the traditional consumer group model by enabling multiple consumers to cooperatively process messages from the same partition. This seemingly simple change unlocks entirely new patterns for building data processing applications on Kafka, expanding its utility into domains previously dominated by traditional message queues.

For enterprises dealing with unpredictable workloads, variable processing requirements, or those simply seeking more flexible scaling options, Share Groups offer a compelling new approach worth serious consideration. This article explores this innovative feature in depth, examining how it works, when to use it, and how it might transform your Kafka architecture.

Understanding the Limitations of Traditional Consumer Groups

To appreciate the significance of Share Groups, we must first understand the constraints of Kafka’s traditional consumption model and why these constraints have become problematic for certain use cases.

Consumer groups in Kafka follow a strict partition-consumer coupling model. When a consumer joins a group, the group coordinator assigns specific partitions to that consumer. Once assigned, the consumer has exclusive ownership of those partitions within its group. This means that at any given time, each partition can only be processed by a single consumer in a group.

The consequences of this design become apparent when we examine scalability. If your topic has five partitions, you can have at most five active consumers in a group – any additional consumers would remain idle. This creates a hard upper limit on parallelism tied directly to your partition count. When processing demands increase, you cannot simply add more consumers to handle the load unless you have unassigned partitions.

This limitation has led many organisations to adopt an “over-partitioning” strategy – creating far more partitions than initially needed to accommodate potential future growth. A system architect might reason: “We currently need five consumers, but during peak hours, we might need twenty, so let’s create twenty partitions.” While this approach works, it introduces its own set of problems.

Over-partitioning increases resource consumption on brokers, can impact performance, complicates rebalancing operations, and may lead to smaller, less efficient batches for producers and consumers. Moreover, once you’ve created a topic, changing the partition count later is possible but can be disruptive, especially if you’re using keyed messages or need to maintain ordering guarantees.

Another significant challenge emerges when dealing with messages that require variable processing times. Consider a system where most messages take milliseconds to process, but occasional messages might take seconds or even minutes due to complex calculations, external service calls, or resource-intensive operations.

In the consumer group model, a consumer cannot move to the next message in a partition until it finishes with the current one. If a single message takes significantly longer to process, it blocks progress on the entire partition. This can lead to inefficient resource utilisation, with consumers frequently waiting on a few slow messages while other consumers have processed their assigned partitions and sit idle.

These limitations have driven organisations to implement various workarounds, from over-partitioning to creating intermediate topics or implementing custom message-level commit tracking. Share Groups provide a more elegant solution built directly into Kafka itself.

Share Groups: A New Consumption Paradigm

Share Groups represent a fundamentally different approach to consuming data from Kafka topics. Rather than assigning exclusive ownership of partitions to consumers, Share Groups enable cooperative consumption where multiple consumers can process messages from the same partition concurrently.

When Andrew Schofield, one of the presenters in the transcript, describes Share Groups, he introduces an important conceptual distinction: the difference between a topic partition and a share partition. A topic partition is the familiar concept in Kafka – a segment of a topic that serves as the unit of parallelism. A share partition, on the other hand, is the view of a topic partition from the perspective of a specific Share Group.

This distinction is crucial because it highlights a significant shift in responsibility. In consumer groups, consumers control their position in the log through offset management. In Share Groups, this control moves to the broker side. The broker, not the consumer, decides which records to deliver next, allowing it to distribute messages among multiple consumers accessing the same partition.

Several key characteristics distinguish Share Groups from traditional consumer groups:

First, consumers cooperatively process records without exclusive partition ownership. Multiple consumers in a Share Group can simultaneously be assigned the same partition and receive different messages from it. This cooperative model fundamentally breaks the tight coupling between partition count and maximum consumer count.

Second, Share Groups enable horizontal scaling beyond partition count. If your topic has three partitions, you can now have ten or more active consumers in a Share Group, all productively processing messages. This is particularly valuable for workloads that experience significant variation in volume or have messages with highly variable processing times.

Third, Share Groups implement individual message acknowledgement rather than offset commitment. Instead of periodically committing the highest processed offset, consumers in a Share Group acknowledge each message individually, indicating whether it was successfully processed, should be retried, or should be rejected. This granular control allows for more sophisticated error handling and prevents a single problematic message from blocking an entire partition.

Under the hood, Share Groups implement a sophisticated state machine to track message delivery. When a message is first eligible for delivery, it enters the “available” state. Once sent to a consumer, it moves to the “acquired” state and a delivery timer starts (defaulting to 30 seconds). The consumer must then explicitly acknowledge the message, moving it to either the “acknowledged” state for successful processing or requesting a release or rejection.

If a consumer fails to respond within the timer period (perhaps due to a crash), the message automatically returns to the available state and its delivery count increments. After a configurable number of delivery attempts (default: 5), unprocessable messages transition to the “archived” state, effectively removing them from further delivery attempts.

The broker plays a central role in this process, coordinating message distribution and tracking the status of each message within the Share Group. The share partition manages a set of “in-flight” records, coalesces fetch requests from multiple consumers for efficiency, and ensures that each message is only acquired by one consumer at a time.

This architecture maintains Kafka’s core publish-subscribe model while providing queue-like semantics for consumption. Messages still reside on durable Kafka topics with all the reliability and retention benefits that entails, but can now be consumed in patterns previously reserved for traditional message queues.

Practical Applications and Use Cases

The introduction of Share Groups unlocks several valuable consumption patterns that were previously difficult or inefficient to implement with traditional consumer groups. Let’s explore some of the most compelling use cases.

Perhaps the most straightforward application is handling unpredictable workload patterns. Many real-world systems experience dramatic variations in traffic – think of an e-commerce platform during a flash sale, a payment processor during peak shopping hours, or an analytics system that receives batch uploads at specific times.

With consumer groups, accommodating these peaks often required over-partitioning, as discussed earlier. Share Groups offer a more elegant solution: maintain a base number of consumers for average load, then dynamically scale up during peak periods by adding more consumers to the same Share Group. Since multiple consumers can process messages from the same partition, you can achieve much finer-grained scaling without changing your topic configuration.

Consider a concrete example: you have a Kafka topic with 5 partitions that normally requires 5 consumers to process at an acceptable rate. During peak hours, processing demand triples. With consumer groups, you would need to create 15 partitions upfront to accommodate this peak, leaving two-thirds of your partitions underutilised during normal operations. With Share Groups, you can maintain 5 partitions and simply scale from 5 to 15 consumers during peak times, then scale back down when demand subsides.

Another compelling use case involves implementing work queue patterns for messages that require substantial processing time. Imagine a system where each message represents an independent task that might take anywhere from seconds to minutes to complete – such as image processing, complex calculations, or workflows involving multiple external service calls.

In this scenario, the traditional consumer group model is highly inefficient. If one consumer receives a message that takes several minutes to process, that entire partition is blocked until processing completes. Share Groups solve this elegantly by allowing multiple consumers to work on different messages from the same partition concurrently.

For example, a financial services company might use Kafka to distribute risk calculations that vary significantly in complexity. Some calculations complete in milliseconds, while others might take minutes depending on the specific instruments involved. With Share Groups, they can implement a pool of workers that dynamically pull the next available calculation regardless of which partition it comes from, maximizing throughput and resource utilisation.

Share Groups also excel at automatically handling problematic messages. In any real-world system, unexpected data issues can arise – malformed messages, missing fields, or edge cases that cause processing errors. With consumer groups, a single poisonous message can effectively block an entire partition until manual intervention occurs or custom error-handling logic skips the message.

Share Groups address this through built-in delivery attempt tracking. If a message consistently fails processing across multiple delivery attempts, it automatically transitions to the “archived” state, allowing processing to continue with subsequent messages. This built-in circuit breaker prevents individual problematic messages from causing system-wide disruptions.

A real-world example might be a log processing system that occasionally encounters malformed log entries. Instead of having these entries block processing for valid logs in the same partition, Share Groups would attempt delivery a configurable number of times before archiving the problematic entry. Future enhancements will likely include dead letter queues, allowing these problematic messages to be redirected for separate analysis rather than being discarded.

These use cases highlight how Share Groups extend Kafka’s applicability into domains that previously might have required separate queue technologies alongside Kafka, simplifying architecture and reducing operational complexity.

Getting Started with Share Groups

If you’re interested in exploring Share Groups in your environment, here’s a practical guide to getting started with this new feature in Apache Kafka 4.0.

First, it’s important to note that Share Groups are currently in early access in Kafka 4.0. As the presenter emphasized, this means the feature is functional enough for experimentation and evaluation but not yet recommended for production use. The API may evolve in upcoming releases, with Kafka 4.1 expected to move it to preview status and Kafka 4.2 targeting general availability.

To enable Share Groups in your Kafka 4.0 environment, you’ll need to modify your server.properties configuration with two specific settings:

share.groups.enabled=true
unstable.api.versions.enable=true

The first setting explicitly enables the Share Groups feature, while the second allows the broker to use protocol elements that are still considered unstable and subject to change. These configuration flags ensure that the feature is opt-in during its early access phase.

Creating your first Share Group consumer is remarkably straightforward, especially if you’re already familiar with Kafka’s consumer API. The coding pattern is intentionally similar to traditional consumers, with a few key differences. Let’s explore several implementation patterns with detailed code examples.

Basic Share Group Consumer with Implicit Acknowledgement

Here’s a complete example of a basic Share Group consumer that processes messages with implicit acknowledgement:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class BasicShareConsumer {
    public static void main(String[] args) {
        // Configure the consumer
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-share-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        // Create a KafkaShareConsumer instead of KafkaConsumer
        KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
        
        // Subscribe to the topic
        consumer.subscribe(Collections.singletonList("my-topic"));

        try {
            // Basic consumption loop with implicit acknowledgement
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord<String, String> record : records) {
                    // Process the record
                    System.out.printf("Partition = %d, Offset = %d, Key = %s, Value = %s%n", 
                                     record.partition(), record.offset(), record.key(), record.value());
                    
                    // Perform business logic on the record
                    processRecord(record);
                }
                
                // With implicit acknowledgement, all records are automatically
                // accepted when the next poll occurs
                
                // Sleep briefly to avoid tight polling loops in this example
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            System.out.println("Consumer interrupted");
        } finally {
            // Close the consumer to leave the group cleanly
            consumer.close();
        }
    }
    
    private static void processRecord(ConsumerRecord<String, String> record) {
        // Simulate processing delay
        try {
            long processingTime = (long) (Math.random() * 50);
            Thread.sleep(processingTime);
            System.out.println("Processed record with value: " + record.value() + 
                              " (took " + processingTime + "ms)");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Share Group Consumer with Explicit Acknowledgement

For more control over message handling, most production scenarios will benefit from explicit acknowledgement. Here’s a complete example:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ShareAcknowledgeType;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

public class ExplicitAcknowledgementShareConsumer {
    
    // Counter for demonstration purposes
    private static final AtomicInteger processed = new AtomicInteger(0);
    private static final AtomicInteger failures = new AtomicInteger(0);
    
    public static void main(String[] args) {
        // Configure the consumer
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "explicit-ack-share-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("share.acknowledgement.mode", "explicit"); // For Kafka 4.1+
        
        // Create a share consumer
        KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
        
        // Subscribe to the topic
        consumer.subscribe(Collections.singletonList("orders-topic"));

        try {
            // Consumption loop with explicit acknowledgement
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        // Process the record
                        System.out.printf("Processing: Partition = %d, Offset = %d, Value = %s%n",
                                         record.partition(), record.offset(), record.value());
                        
                        // Simulate business logic
                        boolean success = processOrderRecord(record);
                        
                        if (success) {
                            // Explicitly acknowledge successful processing
                            consumer.acknowledge(record, ShareAcknowledgeType.ACCEPT);
                            processed.incrementAndGet();
                            System.out.println("Successfully processed order: " + record.value());
                        } else {
                            // Temporary failure, release for retry
                            consumer.acknowledge(record, ShareAcknowledgeType.RELEASE);
                            System.out.println("Temporarily failed to process order, releasing for retry: " + record.value());
                        }
                    } catch (FatalProcessingException e) {
                        // Permanent failure, reject the record
                        consumer.acknowledge(record, ShareAcknowledgeType.REJECT);
                        failures.incrementAndGet();
                        System.err.println("Permanently failed to process order: " + record.value() + 
                                         " - Reason: " + e.getMessage());
                    } catch (Exception e) {
                        // Unexpected exception, release for retry
                        consumer.acknowledge(record, ShareAcknowledgeType.RELEASE);
                        System.err.println("Unexpected error while processing order: " + e.getMessage());
                    }
                }
                
                // Commit acknowledgements to the broker
                consumer.commitSync();
                
                // Print statistics every 100 records
                int total = processed.get();
                if (total > 0 && total % 100 == 0) {
                    System.out.printf("Progress: Processed %d orders, Failed %d orders%n", 
                                     processed.get(), failures.get());
                }
            }
        } catch (Exception e) {
            System.err.println("Consumer error: " + e.getMessage());
        } finally {
            consumer.close();
            System.out.printf("Final count: Processed %d orders, Failed %d orders%n", 
                             processed.get(), failures.get());
        }
    }
    
    private static boolean processOrderRecord(ConsumerRecord<String, String> record) throws FatalProcessingException {
        String orderValue = record.value();
        
        // Simulate processing with occasional failures
        try {
            // Simulate processing time
            Thread.sleep((long) (Math.random() * 200));
            
            // Simulate occasional temporary failures (25% chance)
            if (Math.random() < 0.25) {
                return false;
            }
            
            // Simulate occasional permanent failures (5% chance)
            if (Math.random() < 0.05) {
                throw new FatalProcessingException("Invalid order format");
            }
            
            return true;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
    
    // Custom exception for permanent failures
    static class FatalProcessingException extends Exception {
        public FatalProcessingException(String message) {
            super(message);
        }
    }
}

This pattern provides finer control over message handling, allowing you to specify different outcomes based on processing results:

  • ACCEPT indicates successful processing and removes the record from further delivery
  • RELEASE returns the record to the available state for redelivery (useful for temporary failures)
  • REJECT indicates permanent failure and eventually moves the record to the archived state after the maximum delivery attempts

Advanced Share Group Consumer with Batched Processing and Error Handling

For production systems dealing with high throughput, you might want to optimize processing with batched operations while still maintaining the ability to handle errors at the individual record level. Here’s a more sophisticated example:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ShareAcknowledgeType;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class BatchProcessingShareConsumer {

    // Number of worker threads for parallel processing
    private static final int WORKER_THREADS = 4;
    
    public static void main(String[] args) {
        // Configure the consumer
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "batch-share-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("share.max.delivery.attempts", "3");  // Customize retry attempts
        props.put("share.acquisition.lock.duration.ms", "60000");  // 60-second processing window
        props.put("share.acknowledgement.mode", "explicit");
        
        // Create a thread pool for parallel processing
        ExecutorService executor = Executors.newFixedThreadPool(WORKER_THREADS);
        
        // Create a share consumer
        KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
        
        // Subscribe to multiple topics
        consumer.subscribe(Collections.singletonList("high-volume-topic"));

        try {
            // Track metrics
            long startTime = System.currentTimeMillis();
            long recordsProcessed = 0;
            
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                
                if (records.count() > 0) {
                    System.out.printf("Received batch of %d records%n", records.count());
                    
                    // Process records in parallel
                    List<Future<RecordResult>> futures = new ArrayList<>();
                    
                    for (ConsumerRecord<String, String> record : records) {
                        // Submit each record for processing
                        futures.add(executor.submit(() -> processRecord(record)));
                    }
                    
                    // Wait for all processing to complete and handle results
                    for (int i = 0; i < futures.size(); i++) {
                        Future<RecordResult> future = futures.get(i);
                        try {
                            RecordResult result = future.get(55, TimeUnit.SECONDS); // Leave 5s margin
                            ConsumerRecord<String, String> record = result.getRecord();
                            
                            // Acknowledge based on processing result
                            switch (result.getStatus()) {
                                case SUCCESS:
                                    consumer.acknowledge(record, ShareAcknowledgeType.ACCEPT);
                                    recordsProcessed++;
                                    break;
                                case TEMPORARY_FAILURE:
                                    consumer.acknowledge(record, ShareAcknowledgeType.RELEASE);
                                    System.out.println("Released record for retry: " + record.key());
                                    break;
                                case PERMANENT_FAILURE:
                                    consumer.acknowledge(record, ShareAcknowledgeType.REJECT);
                                    System.err.println("Rejected record: " + record.key());
                                    break;
                            }
                        } catch (Exception e) {
                            // Timed out or execution exception
                            ConsumerRecord<String, String> record = records.iterator().next(); // Approximation
                            consumer.acknowledge(record, ShareAcknowledgeType.RELEASE);
                            System.err.println("Failed to process record: " + e.getMessage());
                        }
                    }
                    
                    // Commit all acknowledgements
                    consumer.commitSync();
                    
                    // Log throughput statistics every 10000 records
                    if (recordsProcessed % 10000 == 0) {
                        long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
                        double recordsPerSecond = elapsedSeconds > 0 ? 
                            (double) recordsProcessed / elapsedSeconds : 0;
                        System.out.printf("Throughput: %.2f records/second (%d total)%n", 
                                          recordsPerSecond, recordsProcessed);
                    }
                }
            }
        } catch (Exception e) {
            System.err.println("Consumer error: " + e.getMessage());
        } finally {
            consumer.close();
            executor.shutdown();
            try {
                if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                    System.err.println("Executor did not terminate in the specified time.");
                    List<Runnable> droppedTasks = executor.shutdownNow();
                    System.err.println("Executor was abruptly shut down. " + 
                                      droppedTasks.size() + " tasks will not be executed.");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    private static RecordResult processRecord(ConsumerRecord<String, String> record) {
        try {
            // Simulate complex processing that takes variable time
            long processingTime = (long) (Math.random() * 500);
            Thread.sleep(processingTime);
            
            // Simulate occasional failures
            double random = Math.random();
            if (random < 0.05) {
                return new RecordResult(record, ProcessingStatus.PERMANENT_FAILURE);
            } else if (random < 0.15) {
                return new RecordResult(record, ProcessingStatus.TEMPORARY_FAILURE);
            }
            
            return new RecordResult(record, ProcessingStatus.SUCCESS);
        } catch (Exception e) {
            return new RecordResult(record, ProcessingStatus.TEMPORARY_FAILURE);
        }
    }
    
    // Helper class to track processing results
    static class RecordResult {
        private final ConsumerRecord<String, String> record;
        private final ProcessingStatus status;
        
        public RecordResult(ConsumerRecord<String, String> record, ProcessingStatus status) {
            this.record = record;
            this.status = status;
        }
        
        public ConsumerRecord<String, String> getRecord() {
            return record;
        }
        
        public ProcessingStatus getStatus() {
            return status;
        }
    }
    
    // Possible processing outcomes
    enum ProcessingStatus {
        SUCCESS,
        TEMPORARY_FAILURE,
        PERMANENT_FAILURE
    }
}

Implementing a Work Queue Pattern with Share Groups

One of the key use cases for Share Groups is implementing a work queue pattern, where each message represents an independent task that might take significant time to process. Here’s a concrete example:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.ShareAcknowledgeType;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

public class WorkQueueShareConsumer {
    
    // Flag for graceful shutdown
    private static final AtomicBoolean running = new AtomicBoolean(true);
    
    public static void main(String[] args) {
        // Consumer ID for logging
        final String consumerId = UUID.randomUUID().toString().substring(0, 8);
        
        // Configure the consumer
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", "localhost:9092");
        consumerProps.put("group.id", "work-queue-share-group");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("share.acknowledgement.mode", "explicit");
        consumerProps.put("share.acquisition.lock.duration.ms", "300000"); // 5 minutes for long tasks
        
        // Configure the producer for results
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9092");
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        // Create consumer and producer
        KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(consumerProps);
        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
        
        // Register shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Shutting down consumer " + consumerId + "...");
            running.set(false);
        }));
        
        // Subscribe to the tasks topic
        consumer.subscribe(Collections.singletonList("tasks-topic"));
        
        System.out.println("Worker " + consumerId + " started and waiting for tasks...");
        
        try {
            // Work queue consumption pattern - process one message at a time
            while (running.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                // Process at most one record per poll
                if (!records.isEmpty()) {
                    ConsumerRecord<String, String> task = records.iterator().next();
                    
                    System.out.println("Worker " + consumerId + " received task: " + task.key());
                    
                    try {
                        // Process the task - this could take significant time
                        String result = processLongRunningTask(task);
                        
                        // Send result to results topic
                        producer.send(new ProducerRecord<>("tasks-results", task.key(), result));
                        
                        // Acknowledge successful processing
                        consumer.acknowledge(task, ShareAcknowledgeType.ACCEPT);
                        System.out.println("Worker " + consumerId + " completed task: " + task.key());
                    } catch (Exception e) {
                        System.err.println("Worker " + consumerId + " failed on task: " + task.key() +
                                         " - " + e.getMessage());
                        
                        // Release for retry if it's a recoverable error
                        if (e instanceof RecoverableTaskException) {
                            consumer.acknowledge(task, ShareAcknowledgeType.RELEASE);
                            System.out.println("Task released for retry");
                        } else {
                            // Reject permanently failed tasks
                            consumer.acknowledge(task, ShareAcknowledgeType.REJECT);
                            System.out.println("Task rejected permanently");
                            
                            // Send failure notification
                            producer.send(new ProducerRecord<>("tasks-failures", 
                                         task.key(), "Failed: " + e.getMessage()));
                        }
                    }
                    
                    // Commit after processing each task
                    consumer.commitSync();
                }
            }
        } catch (Exception e) {
            System.err.println("Consumer error: " + e.getMessage());
            e.printStackTrace();
        } finally {
            consumer.close();
            producer.close();
            System.out.println("Worker " + consumerId + " shut down");
        }
    }
    
    private static String processLongRunningTask(ConsumerRecord<String, String> task) throws Exception {
        System.out.println("Processing task: " + task.key());
        
        // Simulate a long-running process
        try {
            // Simulate variable processing time (20 seconds to 2 minutes)
            long processingTime = 20000 + (long)(Math.random() * 100000);
            System.out.println("Task will take approximately " + (processingTime / 1000) + " seconds");
            
            // Simulate work with progress updates
            long startTime = System.currentTimeMillis();
            int steps = 10;
            for (int i = 1; i <= steps; i++) {
                // Sleep for a portion of the total time
                Thread.sleep(processingTime / steps);
                
                // Report progress
                System.out.printf("Task %s: %d%% complete%n", task.key(), i * 100 / steps);
                
                // Simulate occasional recoverable errors
                if (Math.random() < 0.05) {
                    throw new RecoverableTaskException("Temporary resource unavailable");
                }
                
                // Simulate occasional permanent failures
                if (Math.random() < 0.02) {
                    throw new PermanentTaskException("Invalid task parameters");
                }
            }
            
            long actualTime = System.currentTimeMillis() - startTime;
            return "Task completed successfully in " + (actualTime / 1000) + " seconds";
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RecoverableTaskException("Task was interrupted");
        }
    }
    
    // Custom exceptions for different error types
    static class RecoverableTaskException extends Exception {
        public RecoverableTaskException(String message) {
            super(message);
        }
    }
    
    static class PermanentTaskException extends Exception {
        public PermanentTaskException(String message) {
            super(message);
        }
    }
}

Several key configuration parameters affect Share Group behavior:

  • share.max.delivery.attempts: Controls how many times a record will be delivered before being archived (default: 5)
  • share.acquisition.lock.duration.ms: The time a consumer has to acknowledge a record before it’s automatically released for redelivery (default: 30000 ms)
  • share.max.inflight.records.per.partition: The maximum number of unacknowledged records allowed per partition (default: 200, but likely to increase in future releases)

When monitoring and administering Share Groups, Kafka 4.0 introduces several new command-line tools:

  • kafka-console-share-consumer: A command-line consumer that uses Share Groups, useful for quickly testing
  • kafka-share-consumer-perf-test: Performance testing tool for Share Groups
  • kafka-share-groups: Administrative tool for viewing and managing Share Groups

For example, to view all existing Share Groups and their current state:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --list

And to see detailed information about a specific Share Group:

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --describe --group my-share-group

This will show information such as which consumers are in the group, which partitions they’re assigned to, and the current share partition start offset.

As you experiment with Share Groups, be aware of a few common pitfalls. The most significant one highlighted in the presentation involves acknowledgement behavior. In the current early access implementation, failing to explicitly acknowledge all records in a batch can lead to infinite redelivery of the same records. This behavior is being improved in Kafka 4.1 with the introduction of explicit acknowledgement modes.

Adopting Share Groups in Your Architecture

Share Groups represent one of the most significant enhancements to Kafka’s consumption model since its inception. By breaking the strict one-to-one relationship between partitions and consumers, they extend Kafka’s applicability into domains previously served by separate messaging technologies.

The key benefits Share Groups bring to Kafka architectures are compelling:

Elastic scaling allows consumer count to vary independently of partition count, enabling more efficient resource utilization and better handling of workload variations. The ability to scale consumers up during peak periods and down during quiet times can lead to significant cost savings in cloud environments where resources are paid for by usage.

Improved resilience comes from built-in handling of problematic messages and finer-grained recovery mechanisms. The automatic tracking of delivery attempts prevents individual malformed messages from blocking entire partitions, increasing overall system stability.

Simplified architecture results from being able to implement queue-like patterns directly in Kafka without requiring additional technologies. Organizations that previously maintained separate queue systems alongside Kafka for certain workloads may now be able to consolidate on a single platform, reducing operational complexity.

When considering whether to adopt Share Groups in your architecture, several strategic questions can guide your decision:

Do your workloads experience significant variations in volume or processing requirements? Share Groups excel at handling elastic scaling needs.

Are your messages truly independent or do they require strict ordering? While Share Groups don’t guarantee ordering within a partition, they provide flexibility for workloads where messages can be processed independently.

How mature is your Kafka deployment and operational expertise? Early adoption of Share Groups requires comfort with evolving APIs and potential behavioral changes in future releases.

For teams looking to evaluate this new capability, consider starting with non-critical workloads that specifically benefit from the queue-like behavior that Share Groups provide. Particularly good candidates include:

  • Backend processing tasks with variable completion times
  • Systems that experience dramatic peaks and troughs in workload
  • Scenarios where messages occasionally require retry or special handling

As you implement proof-of-concept applications, pay close attention to acknowledgement patterns and error handling, as these differ most significantly from traditional consumer groups.

Looking forward, Share Groups expand Kafka’s applicability into new domains that were previously challenging to implement within its architecture. The ability to combine the durable log semantics of Kafka with queue-like consumption patterns offers architects new options for designing resilient, scalable data processing systems.

While still in early access, this feature represents a significant evolution in Kafka’s consumption model – one that acknowledges and embraces the real-world patterns that many organizations have been trying to implement through various workarounds. By formalizing these patterns within the broker itself, Share Groups make Kafka more flexible, more efficient, and ultimately more valuable as a central nervous system for data-intensive applications.

Need help migrating to Apache Kafka 4.0

Let us help you assess your migration journey and plan how you can start using Kafka Queues in your architecture.

CONTACT US
OSO
Privacy Overview

This website uses cookies so that we can provide you with the best user experience possible. Cookie information is stored in your browser and performs functions such as recognising you when you return to our website and helping our team to understand which sections of the website you find most interesting and useful.