BlockingQueue

The Java BlockingQueue interface in the java.util.concurrent package represents a queue which is thread safe to put into, and take instances from.  BlockingQueue is typically used to have on thread produce objects, which another thread consumes. Below is the diagram of blockingqueue.



The producing thread will keep producing new objects and insert them into the queue, until the queue reaches some upper bound on what it can contain. It's limit, in other words. If the blocking queue reaches its upper limit, the producing thread is blocked while trying to insert the new object. It remains blocked until a consuming thread takes an object out of the queue.

The consuming thread keeps taking objects out of the blocking queue, and processes them. If the consuming thread tries to take an object out of an empty queue, the consuming thread is blocked until a producing thread puts an object into the queue.

Different methods provided in BlockingQueue API :

  • remove(Object ) 
  • add(Object)
  • peek()
  • put(Object)
  • take() 

It is not possible to insert null into a BlockingQueue. If you try to insert null, the BlockingQueue will throw a NullPointerException.

BlockingQueue interface implementations :

1) ArrayBlockingQueue : ArrayBlockingQueue is a bounded, blocking queue that stores the elements internally in an array. That it is bounded means that it cannot store unlimited amounts of elements. There is an upper bound on the number of elements it can store at the same time. You set the upper bound at instantiation time, and after that it cannot be changed.

The ArrayBlockingQueue stores the elements internally in FIFO (First In, First Out) order. The head of the queue is the element which has been in queue the longest time, and the tail of the queue is the element which has been in the queue the shortest time.

How to use ArrayBlockingQueue :

BlockingQueue queue = new ArrayBlockingQueue(1024);
queue.put("1");
Object object = queue.take();

 2) LinkedBlockingQueue :  LinkedBlockingQueue keeps the elements internally in a linked structure (linked nodes). This linked structure can optionally have an upper bound if desired. If no upper bound is specified, Integer.MAX_VALUE is used as the upper bound.

The LinkedBlockingQueue stores the elements internally in FIFO (First In, First Out) order. The head of the queue is the element which has been in queue the longest time, and the tail of the queue is the element which has been in the queue the shortest time.

How to use LinkedBlockingQueue :

BlockingQueue unbounded = new LinkedBlockingQueue(); //unbounded queue
    BlockingQueue bounded   = new LinkedBlockingQueue(1024); // bounded queue
    bounded.put("Value");
    String value = bounded.take();

Above two are the most used implementation of BlockingQueue interface , other implementation of BlockingQueue are :

  • DelayQueue
  • PriorityBlockingQueue
  • SynchronousQueue

Now lets see an example to make the concept more clear . We will be solving Producer and Consumer problem using BlokcingQueue in our example.

ProducerConsumerExample.java

public class ProducerConsumerExample {

    public static void main(String args[]) {
        BlockingQueue queue = new ArrayBlockingQueue(5);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

Producer.java

public class Producer implements Runnable {

    private BlockingQueue queue=null;

    Producer(BlockingQueue queue){
        this.queue=queue;
    }

    @Override
    public void run() {
        try {
            queue.put(1);
            System.out.println("putting element 1 to queue ");
            Thread.sleep(1000);
            queue.put(2);
            System.out.println("putting element 2 to queue ");
            Thread.sleep(1000);
            queue.put(3);
            System.out.println("putting element 3 to queue ");
        }catch (Exception e){

        }
    }
}

Consumer.java
public class Consumer implements Runnable {

    private BlockingQueue queue;

    Consumer(BlockingQueue  queue){
        this.queue =queue;
    }

    @Override
    public void run() {
        try {
            System.out.println("Fetching element from queue: "+queue.take());
            System.out.println("Fetching element from queue: "+queue.take());
            System.out.println("Fetching element from queue: "+queue.take());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

Output :

putting element 1 to queue 
Fetching element from queue: 1
putting element 2 to queue 
Fetching element from queue: 2
putting element 3 to queue 
Fetching element from queue: 3

As per the output we can see that as soon producer thread puts and element in the blockingqueue, consumer thread consume it. Consumer thread keeps waiting for the producer thread to put an element in queue.

Custom Implementation of Blocking Queue:

It is a very good exercise and could be a very good interview question  to implement your own custom Blocking Queue. It will help you understanding the concept more better. To find the source code of blocking queue  You can check below post :



This covers the concept of BlockingQueue. If you have any question or you want to add anything then please comment below.

Comments

Popular posts from this blog

Deploy standalone Spring MVC Application in Docker Container

Refactor Code : Separate Query from Modifier

ConcurrentHashMap Internal Working