Java Producer — Consumer Examples Using BlockingQueue
In this tutorial, we are about to show you how to use Queue to solve real life problems in programming. Typically, queue is used to implement producer-consumer scenarios. The following kinds of program will need to use queue:
— Chat applications: Messages are put into a queue. When you are sending a message, you are the producer; and your friend who reads the message, is the consumer. Messages need to be kept in queue because of network latency. Imagine network connection dropped when you are trying to send a message. In this case, the message is still in the queue, awaiting the receiver to consume upon the connection becomes available.
— Online help desk applications: Imagine a company has 5 persons working as customer support staffs. They chat with clients through a help desk application. They can talk with maximum 5 clients at a time, so other clients will be queued up. When a staff finishes serving a client, the next client in the queue is served next.
— Real-time processing applications such as screen recorder. The logic behind this kind of application is there are two threads working concurrently: The producer thread captures screenshots constantly and puts the images into a queue; the consumer thread takes the images from the queue to process the video.
Above we name only few types of application in which we need to use queue. Remember using queue when you need to implement producer-consumer processing.
In Java, using a BlockingQueue implementation is a good choice, as its put(e) method let the producer thread waits for space to become available in the queue, and its take() method let the consumer thread waits for an element become available in the queue.
1. Producer — Consumer Pseudo Code
class Producer implements Runnable < private final BlockingQueue queue; Producer(BlockingQueue q) < queue = q; >public void run() < while (true) < queue.put(produce()); >> Object produce() < . >> class Consumer implements Runnable < private final BlockingQueue queue; Consumer(BlockingQueue q) < queue = q; >public void run() < while (true) < consume(queue.take()); >> void consume(Object x) < . >> class Program < void main() < BlockingQueue q = new SomeBlockingQueueImplementation(); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); >>
Here, the Producer class is a thread which constantly produces objects and put them into the queue. In practice, we should specify condition to exit the loop, such as closing/shutdown the program or a maximum number of objects reached.
The Consumer class is another thread which constantly takes objects from the queue to process. In practice, we should specify condition to stop this thread by checking the queue for a special object (null, false or special value), for example:
And in this case, the producer is responsible to put this special object into the queue to indicate there are no more elements to process.
Let’s look at real examples.
2. One Producer — One Consumer Example
* Producer:
import java.util.*; import java.util.concurrent.*; /** * Producer using BlockingQueue example * @author www.codejava.net * */ public class Producer implements Runnable < private BlockingQueuequeue; public Producer (BlockingQueue queue) < this.queue = queue; >public void run() < try < for (int i = 0; i < 10; i++) < queue.put(produce()); Thread.sleep(500); >queue.put(-1); // indicates end of producing System.out.println("Producer STOPPED."); > catch (InterruptedException ie) < ie.printStackTrace(); >> private Integer produce() < Integer number = new Integer((int) (Math.random() * 100)); System.out.println("Producing number =>" + number); return number; > >
* Consumer:
import java.util.*; import java.util.concurrent.*; /** * Consumer using BlockingQueue example * @author www.codejava.net * */ public class Consumer implements Runnable < private BlockingQueuequeue; public Consumer(BlockingQueue queue) < this.queue = queue; >public void run() < try < while (true) < Integer number = queue.take(); if (number == null || number == -1) < break; >consume(number); Thread.sleep(1000); > System.out.println("Consumer STOPPED."); > catch (InterruptedException ie) < ie.printStackTrace(); >> private void consume(Integer number) < System.out.println("Consuming number < brush:java">import java.util.*; import java.util.concurrent.*; /** * Producer-Consumer test example (1 producer thread - 1 consumer thread) * @author www.codejava.net * */ public class ProducerConsumerTest < public static void main(String[] args) < BlockingQueuequeue = new ArrayBlockingQueue<>(20); Thread producer = new Thread(new Producer(queue)); Thread consumer = new Thread(new Consumer(queue)); producer.start(); consumer.start(); > >
Producing number => 21 Consuming number 90 Producing number => 51 Consuming number 23 Producing number => 61 Consuming number 63 Producing number => 75 Consuming number 99 Consuming number 59 Producing number => 31 Producer STOPPED. Consuming number3. One Producer - Multiple Consumers Example
In case there are multiple consumer threads, we should use the poll(time, unit) method to take elements from the head of the queue. This method does not wait forever like the take() method. Instead, it just waits for a specified of time. The lock is released either when an element found or the time out period expires. This avoids deadlock among different consumer threads.
* Producer: same code as above.
We add some code to help identify which thread is running in the output, and replace the take() method by the poll(time, unit) method. Here’s the code:
import java.util.*; import java.util.concurrent.*; /** * Consumer using BlockingQueue example * @author www.codejava.net * */ public class Consumer implements Runnable < private BlockingQueuequeue; private String threadId; public Consumer(BlockingQueue queue) < this.queue = queue; >public void run() < threadId = "Consumer-" + Thread.currentThread().getId(); try < while (true) < Integer number = queue.poll(5, TimeUnit.SECONDS); if (number == null || number == -1) < break; >consume(number); Thread.sleep(1000); > System.out.println(threadId + " STOPPED."); > catch (InterruptedException ie) < ie.printStackTrace(); >> private void consume(Integer number) < System.out.println(threadId + ": Consuming number < brush:java">import java.util.*; import java.util.concurrent.*; /** * Producer-Consumer test example (1 producer thread - N consumer threads) * @author www.codejava.net * */ public class ProducerConsumerTest < public static void main(String[] args) < BlockingQueuequeue = new ArrayBlockingQueue<>(20); Thread producer = new Thread(new Producer(queue)); Thread consumer1 = new Thread(new Consumer(queue)); Thread consumer2 = new Thread(new Consumer(queue)); Thread consumer3 = new Thread(new Consumer(queue)); producer.start(); consumer1.start(); consumer2.start(); consumer3.start(); > >Producing number => 34 Consumer-11: Consuming number 79 Consumer-12: Consuming number 43 Consumer-13: Consuming number 44 Consumer-11: Consuming number 49 Consumer-12: Consuming number 28 Consumer-13: Consuming number 2 Consumer-11: Consuming number 92 Consumer-12: Consuming number 89 Consumer-13: Consuming number 85 Consumer-11: Consuming numberWe hope these examples give you the ideas and help you implement similar scenarios in your program. Feel free to help us improve the code by replying in the comments section.
Related Queue tutorials:
Other Java Collections Tutorials:
About the Author:
Nam Ha Minh is certified Java programmer (SCJP and SCWCD). He started programming with Java in the time of Java 1.4 and has been falling in love with Java since then. Make friend with him on Facebook and watch his Java videos you YouTube.
Interface BlockingQueueA Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
BlockingQueue methods come in four forms, with different ways of handling operations that cannot be satisfied immediately, but may be satisfied at some point in the future: one throws an exception, the second returns a special value (either null or false , depending on the operation), the third blocks the current thread indefinitely until the operation can succeed, and the fourth blocks for only a given maximum time limit before giving up. These methods are summarized in the following table:
Throws exception | Special value | Blocks | Times out | |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek() | not applicable | not applicable |
A BlockingQueue does not accept null elements. Implementations throw NullPointerException on attempts to add , put or offer a null . A null is used as a sentinel value to indicate failure of poll operations.
A BlockingQueue may be capacity bounded. At any given time it may have a remainingCapacity beyond which no additional elements can be put without blocking. A BlockingQueue without any intrinsic capacity constraints always reports a remaining capacity of Integer.MAX_VALUE .
BlockingQueue implementations are designed to be used primarily for producer-consumer queues, but additionally support the Collection interface. So, for example, it is possible to remove an arbitrary element from a queue using remove(x) . However, such operations are in general not performed very efficiently, and are intended for only occasional use, such as when a queued message is cancelled.
BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control. However, the bulk Collection operations addAll , containsAll , retainAll and removeAll are not necessarily performed atomically unless specified otherwise in an implementation. So it is possible, for example, for addAll(c) to fail (throwing an exception) after adding only some of the elements in c .
A BlockingQueue does not intrinsically support any kind of "close" or "shutdown" operation to indicate that no more items will be added. The needs and usage of such features tend to be implementation-dependent. For example, a common tactic is for producers to insert special end-of-stream or poison objects, that are interpreted accordingly when taken by consumers.
Usage example, based on a typical producer-consumer scenario. Note that a BlockingQueue can safely be used with multiple producers and multiple consumers.
class Producer implements Runnable < private final BlockingQueue queue; Producer(BlockingQueue q) < queue = q; >public void run() < try < while (true) < queue.put(produce()); >> catch (InterruptedException ex) < . handle . >> Object produce() < . >> class Consumer implements Runnable < private final BlockingQueue queue; Consumer(BlockingQueue q) < queue = q; >public void run() < try < while (true) < consume(queue.take()); >> catch (InterruptedException ex) < . handle . >> void consume(Object x) < . >> class Setup < void main() < BlockingQueue q = new SomeQueueImplementation(); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); >>
Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a BlockingQueue happen-before actions subsequent to the access or removal of that element from the BlockingQueue in another thread.
This interface is a member of the Java Collections Framework.
Interface BlockingQueue
A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
BlockingQueue methods come in four forms, with different ways of handling operations that cannot be satisfied immediately, but may be satisfied at some point in the future: one throws an exception, the second returns a special value (either null or false , depending on the operation), the third blocks the current thread indefinitely until the operation can succeed, and the fourth blocks for only a given maximum time limit before giving up. These methods are summarized in the following table:
Throws exception | Special value | Blocks | Times out | |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek() | not applicable | not applicable |
A BlockingQueue does not accept null elements. Implementations throw NullPointerException on attempts to add , put or offer a null . A null is used as a sentinel value to indicate failure of poll operations.
A BlockingQueue may be capacity bounded. At any given time it may have a remainingCapacity beyond which no additional elements can be put without blocking. A BlockingQueue without any intrinsic capacity constraints always reports a remaining capacity of Integer.MAX_VALUE .
BlockingQueue implementations are designed to be used primarily for producer-consumer queues, but additionally support the Collection interface. So, for example, it is possible to remove an arbitrary element from a queue using remove(x) . However, such operations are in general not performed very efficiently, and are intended for only occasional use, such as when a queued message is cancelled.
BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control. However, the bulk Collection operations addAll , containsAll , retainAll and removeAll are not necessarily performed atomically unless specified otherwise in an implementation. So it is possible, for example, for addAll(c) to fail (throwing an exception) after adding only some of the elements in c .
A BlockingQueue does not intrinsically support any kind of "close" or "shutdown" operation to indicate that no more items will be added. The needs and usage of such features tend to be implementation-dependent. For example, a common tactic is for producers to insert special end-of-stream or poison objects, that are interpreted accordingly when taken by consumers.
Usage example, based on a typical producer-consumer scenario. Note that a BlockingQueue can safely be used with multiple producers and multiple consumers.
class Producer implements Runnable < private final BlockingQueue queue; Producer(BlockingQueue q) < queue = q; >public void run() < try < while (true) < queue.put(produce()); >> catch (InterruptedException ex) < . handle . >> Object produce() < . >> class Consumer implements Runnable < private final BlockingQueue queue; Consumer(BlockingQueue q) < queue = q; >public void run() < try < while (true) < consume(queue.take()); >> catch (InterruptedException ex) < . handle . >> void consume(Object x) < . >> class Setup < void main() < BlockingQueue q = new SomeQueueImplementation(); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); >>
Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a BlockingQueue happen-before actions subsequent to the access or removal of that element from the BlockingQueue in another thread.
This interface is a member of the Java Collections Framework.