Kafka Java Client and Streaming Quickstart

This quickstart shows you how to use the Kafka Java client with Oracle Cloud Infrastructure Streaming to publish and consume messages.

See Using Streaming with Apache Kafka for more information. Refer to the Overview of Streaming for key concepts and more Streaming details.


  1. To use the Kafka Java client with Streaming , you must have the following:
    • An Oracle Cloud Infrastructure account.
    • A user created in that account, in a group with a policy that grants the required permissions. For an example of how to set up a new user, group, compartment, and policy, see Adding Users. For a list of typical policies you may want to use, see Common Policies.
  2. Collect the following details:
    • Stream OCID
    • Messages endpoint
    • Stream pool OCID
    • Stream pool FQDN
    • Kafka connection settings:
      • Bootstrap servers
      • SASL connection strings
      • Security protocol

See Listing Streams and Stream Pools for instructions on viewing stream details. Refer to Creating a Stream and Creating a Stream Pool if you do not have an existing stream. Streams correspond to a Kafka topic.

  org.apache.kafka kafka-clients 2.8.0  
  4.0.0 oci.example StreamsExampleWithKafkaApis 1.0-SNAPSHOT 8 8   org.apache.kafka kafka-clients 2.8.0    

OCI user auth tokens are visible only at the time of creation. Copy it and keep it somewhere safe for future use.

Producing Messages

  1. Open your favorite editor, such as Visual Studio Code, from the directory wd . You should already have the Kafka SDK dependencies for Java as part of the pom.xml of your Maven Java project after you’ve met the prerequisites.
  2. Create a new file named in directory wd under the path /src/main/java/kafka/sdk/oss/example/ with following code. Replace the values of variables in the code as directed by the code comments, namely bootstrapServers through streamOrKafkaTopicName . These variables are for Kafka connection settings which you gathered in the prerequisites.
package kafka.sdk.oss.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer < static String bootstrapServers keyword varname">", // usually of the form ; static String tenancyName keyword varname"> "; static String username keyword varname"> "; static String streamPoolId keyword varname"> "; static String authToken keyword varname"> "; // from step 8 of Prerequisites section static String streamOrKafkaTopicName keyword varname"> "; // from step 2 of Prerequisites section private static Properties getKafkaProperties() < Properties properties = new Properties(); properties.put("bootstrap.servers", bootstrapServers); properties.put("security.protocol", "SASL_SSL"); properties.put("sasl.mechanism", "PLAIN"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); final String value = " required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; properties.put("sasl.jaas.config", value); properties.put("retries", 3); // retries on transient errors and load balancing disconnection properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB return properties; >public static void main(String args[]) < try < Properties properties = getKafkaProperties(); KafkaProducer producer = new KafkaProducer<>(properties); for(int i=0;i <10;i++) < ProducerRecordrecord = new ProducerRecord<>(streamOrKafkaTopicName, "messageKey" + i, "messageValue" + i); producer.send(record, (md, ex) -> < if (ex != null) < System.err.println("exception occurred in producer for review :" + record.value() + ", exception is " + ex); ex.printStackTrace(); >else < System.err.println("Sent msg to " + md.partition() + " with offset " + md.offset() + " at " + md.timestamp()); >>); > // producer.send() is async, to make sure all messages are sent we use producer.flush() producer.flush(); producer.close(); > catch (Exception e) < System.err.println("Error: exception " + e); >> > 
mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Producer

Consuming Messages

  1. First, ensure that the stream you want to consume messages from contains messages. You could use the Console to produce a test message, or use the stream and messages we created in this quickstart.
  2. Open your favorite editor, such as Visual Studio Code, from the directory wd under the path /src/main/java/kafka/sdk/oss/example/ . You should already have the Kafka SDK dependencies for Java as part of the pom.xml of your Maven Java project after you’ve met the prerequisites.
  3. Create a new file named in directory wd with following code. Replace the values of variables in the code as directed by the code comments, namely bootstrapServers through consumerGroupName . These variables are for Kafka connection settings which you gathered in the prerequisites.
package kafka.sdk.oss.example; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class Consumer < static String bootstrapServers keyword varname">", // usually of the form ; static String tenancyName keyword varname"> "; static String username keyword varname"> "; static String streamPoolId keyword varname"> "; static String authToken keyword varname"> "; // from step 8 of Prerequisites section static String streamOrKafkaTopicName keyword varname"> "; // from step 2 of Prerequisites section static String consumerGroupName keyword varname"> "; private static Properties getKafkaProperties() < Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("", consumerGroupName); props.put("", "false"); props.put("", "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("auto.offset.reset", "earliest"); final String value = " required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; props.put("sasl.jaas.config", value); return props; >public static void main(String[] args) < final KafkaConsumerconsumer = new KafkaConsumer<>(getKafkaProperties());; consumer.subscribe(Collections.singletonList(streamOrKafkaTopicName)); ConsumerRecords records = consumer.poll(10000); System.out.println("size of records polled is "+ records.count()); for (ConsumerRecord record : records) < System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); >consumer.commitSync(); consumer.close(); > > 
mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Consumer 
[INFO related maven compiling and building the Java code] size of records polled is 3 Received message: (messageKey0, message value) at offset 1284 Received message: (messageKey0, message value) at offset 1285 Received message: (null, message produced using oci console) at offset 1286

If you used the Console to produce a test message, the key for each message is Null

Next Steps

See the following resources for more information:


