Implementing a Kafka consumer in Java
In this post we will take a look at different ways how messages can be read from Kafka. Our goal will be to find the simplest way to implement a Kafka consumer in Java, exposing potential traps and showing interesting intricacies.
The code samples will be provided in Java 11 but they could be also easily translated to other versions of Java (or even to other JVM languages, like Kotlin or Scala).
Naive implementation
Getting started with Kafka is incredibly straightforward. You just need to add an official Kafka dependency to your preferred dependency management tool, e.g. add this to your pom.xml :
This is normally done in a long-running loop:
- poll for new messages: consumer.poll(. )
- process messages: records.forEach(. )
- repeat (go back to step 1.)
The complete code of a naive implementation
Putting it all together this is how the consumer’s code in the subscribing variant can look like:
For cases like that you may decide to take a fine-grained control over how and when messages are committed. You can disable auto-commit by setting enable.auto.commit=false and then commit manually by calling either commitSync() or commitAsync() , depending on your use-case.
If you just call the parameterless variant of the method then it will commit all the records returned by this poll:
var records = consumer.poll(Duration.ofSeconds(1)); . consumer.commitSync(); // commits all the records from the current poll
But if you wish to be more specific then you can also provide exactly which offsets should be committed for which partitions. For instance like this:
consumer.commitSync(Collections.singletonMap( partition, new OffsetAndMetadata(lastOffset + 1) ));
Single- vs multi-threaded
It is important to keep in mind that KafkaConsumer is not thread-safe. Therefore it is easier to correctly implement a consumer with the “one consumer per thread” pattern. In this pattern scaling boils down to just adding more partitions and more consumers.
As an alternative, you can decouple consumption and processing which should result in reduced number of TCP connections to the cluster and increased throughput. On the downside, you will end up with much more complicated code because not only will you need to coordinate threads but also commit only specific offsets while preserving order between processed messages.
If you are not discouraged by potential code complexity then Igor Buzatovic recently wrote an article explaining how to write a multi-threaded consumer. Igor also provided a ready-to-use implementation if you would like to jump straight to the code.
Spring Kafka
Up so far we have mostly complicated the code and discussed what can go wrong. And it is good to be aware of various gotchas you can run into.
But we have strayed from our original goal which was to come up with the simplest Kafka consumer possible.
When thinking about code simplification in Java then usually it is a good idea to check if Spring Framework offers an abstraction that can make our code more compact and less error-prone.
Actually, there is more than one way how Spring can simplify our integration with Kafka. For example you can use Spring Boot togther with Spring Kafka:
org.springframework.kafka spring-kafka
Then we only need to configure connection details in application.yml (or application.properties ):
spring.kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
…and add @KafkaListener annotation on a method that will process messages:
@Component public class MyKafkaListener @KafkaListener(topics = "my-topic") public void processMessage(ConsumerRecordString, String> record) System.out.println(record.offset() + " -> " + record.value()); > >
If you do not need to access messages’ metadata (like partition, offset or timestamp) then you can make the code even shorter:
@KafkaListener(topics = "my-topic") public void processMessage(String content) System.out.println(content); >
There is also an alternative way to access this kind of information using annotations:
@KafkaListener(topics = "my-topic") public void processMessage( @Header(KafkaHeaders.PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) int offset, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp, @Payload String content) // . >
Commit modes in Spring Kafka
We have already discussed that you can safely rely on automatic committing for a wide range of use-cases. Surprisingly, Spring Kafka by default sets enable.auto.commit=false but actually makes it work in a very similar way. What Spring does instead, is emulate auto-commit by explicitly committing after all the records from the poll are finally processed.
This acknowledgment mode is called BATCH because Spring commits messages from the previous batch of records returned by poll(. )
However, if you wish to manually commit offsets you can switch to MANUAL or MANUAL_IMMEDIATE ACK mode. This can be accomplished by changing Kafka Listener mode in your Spring Boot config:
spring.kafka: # . listener: ack-mode: MANUAL_IMMEDIATE
Then you are expected to use the Acknowledgment object to mark messages as consumed:
@KafkaListener(topics = "my-topic") public void processMessage(String content, Acknowledgment ack) // . ack.acknowledge(); >
There is a dedicated section about different acknowledgment modes in the official Spring Kafka documentation if you would like to read more about it.
Spring Cloud Stream
Another option is to use Spring Cloud Stream with Kafka Binder:
org.springframework.cloud spring-cloud-stream org.springframework.cloud spring-cloud-stream-binder-kafka
Spring Cloud Stream can help you write even more generic code that you can quickly integrate with a variety of modern messaging systems (e.g. RabbitMQ, Apache Kafka, Amazon Kinesis, Google PubSub and more).
The basic concept is that you just provide implementations of Java functional interfaces:
- java.util.function.Supplier for sources
- java.util.function.Consumer for sinks
- java.util.function.Function for processors
…and then in your configuration you bind them to specific queues or topics in the messaging system(s) of your choice.
There is a very good talk by Soby Chacko and Oleg Zhurakousky from Pivotal about integrating Spring Cloud Stream with Kafka, explaining the approach with Java functional interfaces. Oleg Zhurakousky also wrote an interesting article explaining the motives behind the move to functional programming model in Spring Cloud Stream.
For example, you can quickly put together a Spring Boot application like this:
// . import java.util.function.Consumer; @SpringBootApplication public class KafkaConsumerApplication public static void main(String[] args) SpringApplication.run(KafkaConsumerApplication.class, args); > @Bean public ConsumerString> myMessageConsumer() return content -> System.out.println(content); > >
Please note the java.util.function.Consumer implementation (returned from myMessageConsumer() method) that you can replace with your own logic for processing messages. After you finish the implementation of this interface you just need to properly bind it. As an example, you can configure the consumer to read from my-topic using my-group as consumer group:
spring.cloud.stream: bindings: myMessageConsumer-in-0: destination: my-topic group: my-group
The name of the binding looks peculiar, but in most cases this will just be -in-0 for inbound and -out-0 for outbound topics.
Please refer to documentation on binding names for more details (especially if you are curious about the -0 suffix representing the index of the binding).
You might also notice that we have not specified any bootstrap.servers . By default it connects to a Kafka cluster running on localhost:9092 .
It can be easily changed to a different list of brokers:
spring.cloud.stream: kafka.binder: brokers: my-node1:9090,my-node2:9090,my-node3:9090
Leveraging Spring Cloud Stream totally decoupled our code from Kafka. Now it is possible to switch to an entirely different message broker without having to do a single code change. All this can be done through configuration changes.
Parting thoughts
To sum up, Spring Kafka and Spring Cloud Stream with Kafka Binder offer quick ways to build incredibly succinct code for reading messages from Kafka.
That being said, even though the abstractions provided by Spring framework are usually very good, it can be beneficial to know what happens under the hood.
Here are some topics worth exploring: