Kafka java lang outofmemoryerror java heap space

Saved searches

Use saved searches to filter your results more quickly

You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session. You switched accounts on another tab or window. Reload to refresh your session.

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaConsumer can hit OutOfMemoryException. #91

KafkaConsumer can hit OutOfMemoryException. #91

Comments

With large messages and when the consumer is faster than downstream processing (or there is a spike in messages) the message queue can start to fill up, thus consuming memory.

With a test case of ~12KB messages and the default settings for the operator and JVM an OutOfMemoryException is hit around ~22,000 messages. The default queue capacity is 50,000 messages.

While this can be worked around with changing the max memory for the VM or the max polled records, it’s possible for the operator to detect low memory and react.

I have code that detects this situation and pauses the operator similar to the way it pauses today if the queue is full. In this case the queue stabilized with about 18,000 messages and no OutOfMemoryException.

Читайте также:  Javascript вывод текущей даты

I also added a couple of metrics:

image

The text was updated successfully, but these errors were encountered:

Fix #91 - Check for low memory when adding messages to queue

re-opened because this fix is in no release yet — only in develop branch

resolved with toolkit version 1.4.0

With large max.partition.fetch.bytes consumer configuration the memory monitoring before fetching messages is not sufficient.
Tested with max.partition.fetch.bytes = 20971520 and message blobs of 1MByte size.

[com.ibm.streams.operator.internal.jni.JNIBridge.complete:-1] - Java heap space - java.lang.OutOfMemoryError: Java heap space - java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) - java.nio.ByteBuffer.allocate(ByteBuffer.java:335) - org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) - org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112) - org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381) - org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342) - org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609) - org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541) - org.apache.kafka.common.network.Selector.poll(Selector.java:467) - org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535) - org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) - org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) - org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243) - org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188) - org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1123) - com.ibm.streamsx.kafka.clients.consumer.AbstractNonCrKafkaConsumerClient.pollAndEnqueue(AbstractNonCrKafkaConsumerClient.java:378) . 

The stack trace shows that the OOM occurs when the buffer for the fetch result is allocated.
Shortly before, the Heartbeat thread dies with OOM:

 - java.lang.OutOfMemoryError: Java heap space - at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) - at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) - at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) - at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112) - at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381) - at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342) - at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609) - at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541) - at org.apache.kafka.common.network.Selector.poll(Selector.java:467) - at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535) - at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) - at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304) - at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996) 

Источник

Issue

I’m using Spring-Kafka 2.7.1 in a spring boot project.

When I connect it to a SSL-configured Kafka Broker it gives a «OutofMemory» Error as below even though I have increased Heap Size multiple times to no avail.

java.lang.OutOfMemoryError: Java heap space\ at java.base/java.nio.HeapByteBuffer.(HeapByteBuffer.java:61) ~[na:na]\ at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348) ~[na:na]\ at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) ~[kafka-clients-2.7.1.jar!/:na]\ at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113) ~[kafka-clients-2.7.1.jar!/:na]\ at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447) ~[kafka-clients-2.7.1.jar!/:na]\ at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397) ~[kafka-clients-2.7.1.jar!/:na]\ at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) ~[kafka-clients-2.7.1.jar!/:na]\ at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) ~[kafka-clients-2.7.1.jar!/:na]\ at org.apache.kafka.common.network.Selector.poll(Selector.java:481) ~[kafka-clients-2.7.1.jar!/:na]\ at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:563) ~[kafka-clients-2.7.1.jar!/:na]\ at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) ~[kafka-clients-2.7.1.jar!/:na]\ at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.7.1.jar!/:na]\ at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-2.7.1.jar!/:na]\ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245) ~[kafka-clients-2.7.1.jar!/:na]\ at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480) ~[kafka-clients-2.7.1.jar!/:na]\ at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257) ~[kafka-clients-2.7.1.jar!/:na]\ at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226) ~[kafka-clients-2.7.1.jar!/:na]\ at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[kafka-clients-2.7.1.jar!/:na]\ at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1414) ~[spring-kafka-2.7.7.jar!/:2.7.7]\ at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1251) ~[spring-kafka-2.7.7.jar!/:2.7.7]\ at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1163) ~[spring-kafka-2.7.7.jar!/:2.7.7]\ at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]\ at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]\ at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]\ 

My Current YAML configuration is as below:

spring: kafka: bootstrap-servers: KAFKA_BOOTSTRAP_SERVER security: protocol: "SSL" consumer: auto-offset-reset: earliest producer: topic: TOPIC bootstrap-servers: KAFKA_BOOTSTRAP_SERVER consumer: topic: TOPIC bootstrap-servers: KAFKA_BOOTSTRAP_SERVERS 

It works as expected when connected to a NON-SSL Kafka Broker.

I have tested all other possiblities and singled out that it’s related to the SSL configuration of the client.

Solution

It is possible to run into out of memory errors when trying to use Kafka secured endpoint in a non-secure way. (It is a known issue when wrong security protocol is used or required authentication properties are not passed; OOM error is totally unrelated but it is what it is)

In case of Kafka CLI commands, usually, a property file path is passed with the command to provide security related properties.

kafka-topics --command-config kafka-console-producer --producer.config kafka-console-consumer --consumer.config
security.protocol= ssl.truststore.location= ssl.truststore.password= ssl.keystore.location= ssl.keystore.password= ssl.key.password=

From the question, I assumed, both producer and consumer components are connecting to the same broker(s) and declared all the required properties to connect to secured broker under spring.kafka section in the following example.

spring: kafka: bootstrap-servers: KAFKA_BOOTSTRAP_SERVER security: protocol: "SSL" ssl: trust-store-location: "truststore.jks" trust-store-password: "" key-store-location: "keystore.jks" key-store-password: "" key-password: "" 

If the producer and consumer are connecting to different broker(s), these properties should be specified under spring.kafka.producer and spring.kafka.consumer sections respectively.

spring: kafka: bootstrap-servers: KAFKA_BOOTSTRAP_SERVER security: protocol: "SSL" producer: topic: TOPIC bootstrap-servers: KAFKA_BOOTSTRAP_SERVER ssl.protocol: "SSL" ssl.endpoint.identification.algorithm: "https" ssl: keystore-location: "" keystore-password: "" consumer: topic: TOPIC auto-offset-reset: "earliest" bootstrap-servers: KAFKA_BOOTSTRAP_SERVERS ssl.protocol: "SSL" ssl.endpoint.identification.algorithm: "https" ssl: keystore-location: "" keystore-password: "" 

If there is no client authentication required from the broker side, then the following is a minimal configuration example:

security.protocol=SSL ssl.truststore.location= ssl.truststore.password=

If client authentication is required, following properties are also needs to be included.

ssl.keystore.location= ssl.keystore.password= ssl.key.password=

Please note that the property naming convention might differ in Spring Kafka configuration.

More details on Kafka security — Official Doc

Источник

Оцените статью