Documentation
Here is a description of a few of the popular use cases for Apache Kafka®. For an overview of a number of these areas in action, see this blog post.
Messaging
Kafka works well as a replacement for a more traditional message broker. Message brokers are used for a variety of reasons (to decouple processing from data producers, to buffer unprocessed messages, etc). In comparison to most messaging systems Kafka has better throughput, built-in partitioning, replication, and fault-tolerance which makes it a good solution for large scale message processing applications.
In our experience messaging uses are often comparatively low-throughput, but may require low end-to-end latency and often depend on the strong durability guarantees Kafka provides.
In this domain Kafka is comparable to traditional messaging systems such as ActiveMQ or RabbitMQ.
Website Activity Tracking
The original use case for Kafka was to be able to rebuild a user activity tracking pipeline as a set of real-time publish-subscribe feeds. This means site activity (page views, searches, or other actions users may take) is published to central topics with one topic per activity type. These feeds are available for subscription for a range of use cases including real-time processing, real-time monitoring, and loading into Hadoop or offline data warehousing systems for offline processing and reporting.
Activity tracking is often very high volume as many activity messages are generated for each user page view.
Metrics
Kafka is often used for operational monitoring data. This involves aggregating statistics from distributed applications to produce centralized feeds of operational data.
Log Aggregation
Many people use Kafka as a replacement for a log aggregation solution. Log aggregation typically collects physical log files off servers and puts them in a central place (a file server or HDFS perhaps) for processing. Kafka abstracts away the details of files and gives a cleaner abstraction of log or event data as a stream of messages. This allows for lower-latency processing and easier support for multiple data sources and distributed data consumption. In comparison to log-centric systems like Scribe or Flume, Kafka offers equally good performance, stronger durability guarantees due to replication, and much lower end-to-end latency.
Stream Processing
Many users of Kafka process data in processing pipelines consisting of multiple stages, where raw input data is consumed from Kafka topics and then aggregated, enriched, or otherwise transformed into new topics for further consumption or follow-up processing. For example, a processing pipeline for recommending news articles might crawl article content from RSS feeds and publish it to an «articles» topic; further processing might normalize or deduplicate this content and publish the cleansed article content to a new topic; a final processing stage might attempt to recommend this content to users. Such processing pipelines create graphs of real-time data flows based on the individual topics. Starting in 0.10.0.0, a light-weight but powerful stream processing library called Kafka Streams is available in Apache Kafka to perform such data processing as described above. Apart from Kafka Streams, alternative open source stream processing tools include Apache Storm and Apache Samza.
Event Sourcing
Event sourcing is a style of application design where state changes are logged as a time-ordered sequence of records. Kafka’s support for very large stored log data makes it an excellent backend for an application built in this style.
Commit Log
Kafka can serve as a kind of external commit-log for a distributed system. The log helps replicate data between nodes and acts as a re-syncing mechanism for failed nodes to restore their data. The log compaction feature in Kafka helps support this usage. In this usage Kafka is similar to Apache BookKeeper project.
1.3 Quick Start
1.4 Ecosystem
There are a plethora of tools that integrate with Kafka outside the main distribution. The ecosystem page lists many of these, including stream processing systems, Hadoop integration, monitoring, and deployment tools.
Kafka Java Client and Streaming Quickstart
This quickstart shows you how to use the Kafka Java client with Oracle Cloud Infrastructure Streaming to publish and consume messages.
See Using Streaming with Apache Kafka for more information. Refer to the Overview of Streaming for key concepts and more Streaming details.
Prerequisites
- To use the Kafka Java client with Streaming , you must have the following:
- An Oracle Cloud Infrastructure account.
- A user created in that account, in a group with a policy that grants the required permissions. For an example of how to set up a new user, group, compartment, and policy, see Adding Users. For a list of typical policies you may want to use, see Common Policies.
- Collect the following details:
- Stream OCID
- Messages endpoint
- Stream pool OCID
- Stream pool FQDN
- Kafka connection settings:
- Bootstrap servers
- SASL connection strings
- Security protocol
See Listing Streams and Stream Pools for instructions on viewing stream details. Refer to Creating a Stream and Creating a Stream Pool if you do not have an existing stream. Streams correspond to a Kafka topic.
org.apache.kafka kafka-clients 2.8.0
4.0.0 oci.example StreamsExampleWithKafkaApis 1.0-SNAPSHOT 8 8 org.apache.kafka kafka-clients 2.8.0
OCI user auth tokens are visible only at the time of creation. Copy it and keep it somewhere safe for future use.
Producing Messages
- Open your favorite editor, such as Visual Studio Code, from the directory wd . You should already have the Kafka SDK dependencies for Java as part of the pom.xml of your Maven Java project after you’ve met the prerequisites.
- Create a new file named Producer.java in directory wd under the path /src/main/java/kafka/sdk/oss/example/ with following code. Replace the values of variables in the code as directed by the code comments, namely bootstrapServers through streamOrKafkaTopicName . These variables are for Kafka connection settings which you gathered in the prerequisites.
package kafka.sdk.oss.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer < static String bootstrapServers keyword varname">", // usually of the form cell-1.streaming..oci.oraclecloud.com:9092 ; static String tenancyName keyword varname"> "; static String username keyword varname"> "; static String streamPoolId keyword varname"> "; static String authToken keyword varname"> "; // from step 8 of Prerequisites section static String streamOrKafkaTopicName keyword varname"> "; // from step 2 of Prerequisites section private static Properties getKafkaProperties() < Properties properties = new Properties(); properties.put("bootstrap.servers", bootstrapServers); properties.put("security.protocol", "SASL_SSL"); properties.put("sasl.mechanism", "PLAIN"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; properties.put("sasl.jaas.config", value); properties.put("retries", 3); // retries on transient errors and load balancing disconnection properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB return properties; >public static void main(String args[]) < try < Properties properties = getKafkaProperties(); KafkaProducer producer = new KafkaProducer<>(properties); for(int i=0;i <10;i++) < ProducerRecordrecord = new ProducerRecord<>(streamOrKafkaTopicName, "messageKey" + i, "messageValue" + i); producer.send(record, (md, ex) -> < if (ex != null) < System.err.println("exception occurred in producer for review :" + record.value() + ", exception is " + ex); ex.printStackTrace(); >else < System.err.println("Sent msg to " + md.partition() + " with offset " + md.offset() + " at " + md.timestamp()); >>); > // producer.send() is async, to make sure all messages are sent we use producer.flush() producer.flush(); producer.close(); > catch (Exception e) < System.err.println("Error: exception " + e); >> >
mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Producer
Consuming Messages
- First, ensure that the stream you want to consume messages from contains messages. You could use the Console to produce a test message, or use the stream and messages we created in this quickstart.
- Open your favorite editor, such as Visual Studio Code, from the directory wd under the path /src/main/java/kafka/sdk/oss/example/ . You should already have the Kafka SDK dependencies for Java as part of the pom.xml of your Maven Java project after you’ve met the prerequisites.
- Create a new file named Consumer.java in directory wd with following code. Replace the values of variables in the code as directed by the code comments, namely bootstrapServers through consumerGroupName . These variables are for Kafka connection settings which you gathered in the prerequisites.
package kafka.sdk.oss.example; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class Consumer < static String bootstrapServers keyword varname">", // usually of the form cell-1.streaming..oci.oraclecloud.com:9092 ; static String tenancyName keyword varname"> "; static String username keyword varname"> "; static String streamPoolId keyword varname"> "; static String authToken keyword varname"> "; // from step 8 of Prerequisites section static String streamOrKafkaTopicName keyword varname"> "; // from step 2 of Prerequisites section static String consumerGroupName keyword varname"> "; private static Properties getKafkaProperties() < Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("group.id", consumerGroupName); props.put("enable.auto.commit", "false"); props.put("session.timeout.ms", "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("auto.offset.reset", "earliest"); final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; props.put("sasl.jaas.config", value); return props; >public static void main(String[] args) < final KafkaConsumerconsumer = new KafkaConsumer<>(getKafkaProperties());; consumer.subscribe(Collections.singletonList(streamOrKafkaTopicName)); ConsumerRecords records = consumer.poll(10000); System.out.println("size of records polled is "+ records.count()); for (ConsumerRecord record : records) < System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); >consumer.commitSync(); consumer.close(); > >
mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Consumer
[INFO related maven compiling and building the Java code] size of records polled is 3 Received message: (messageKey0, message value) at offset 1284 Received message: (messageKey0, message value) at offset 1285 Received message: (null, message produced using oci console) at offset 1286
If you used the Console to produce a test message, the key for each message is Null
Next Steps
See the following resources for more information: