Executor and Executor Service Framework

In Java , a task is a unit of work. To accomplish the work done we need to create a new Thread every time. There is a performance overhead associated with starting a new thread, and each thread is also allocated some memory for its stack etc.

In low level Java threading , we have to follow task execution policy, in which the thread to which task has been submitted will only execute the task.

How does Executor Framework come over the task execution policy and improve performance ?

Executor framework provides a way to decouple the task execution policy stages from submition and execution. Instead of starting a new thread for every task to execute concurrently, the task can be passed to a thread pool. As soon as the pool has any idle threads the task is assigned to one of them and executed. Internally the tasks are inserted into a Blocking Queue which the threads in the pool are dequeuing from. When a new task is inserted into the queue one of the idle threads will dequeue it successfully and execute it. The rest of the idle threads in the pool will be blocked waiting to dequeue tasks.

Executor framework is based on the Executor interface, which describes an executor as any object capable of executing java.lang.Runnable tasks. This interface declares the following solitary method for executing a Runnable task:
void execute(Runnable command)

You submit a Runnable task by passing it to execute(Runnable). If the executor cannot execute the task for any reason (for instance, if the executor has been shut down), this method will throw a RejectedExecutionException.


Limitation in Executor interface and need of ExecutorService interface

Executor interface is very limited. For example, you can't shut down an executor or determine whether an asynchronous task has finished. You also can't cancel a running task. For these and other reasons, the Executor framework provides an ExecutorService interface, which extends Executor.

Five of ExecutorService's methods are especially noteworthy:

  • boolean awaitTermination(long timeout, TimeUnit unit) blocks the calling thread until all tasks have completed execution after a shutdown request, the timeout occurs, or the current thread is interrupted, whichever happens first. The maximum time to wait is specified by timeout, and this value is expressed in the unit units specified by the TimeUnit enum; for example, TimeUnit.SECONDS. This method throws java.lang.InterruptedException when the current thread is interrupted. It returns true when the executor is terminated and false when the timeout elapses before termination.
  • boolean isShutdown() returns true when the executor has been shut down.
  • void shutdown() initiates an orderly shutdown in which previously submitted tasks are executed but no new tasks are accepted.
  • <T> Future<T> submit(Callable<T> task) submits a value-returning task for execution and returns a Future representing the pending results of the task.Once task is completed Future's get method will return result.
  • Future<?> submit(Runnable task) submits a Runnable task for execution and returns a Future representing that task.Once task is completed Future's get method will return null.
There are few things which has been used above which are worth noteworthy to have more explanation :

Future<V> :  Future<V> interface represents the result of an asynchronous computation. The result is known as a future because it typically will not be available until some moment in the future. You can invoke methods to cancel a task, return a task's result (waiting indefinitely or for a timeout to elapse when the task hasn't finished), and determine if a task has been cancelled or has finished. 

Callable and Runnable interface difference


Callable<V> interface is similar to the Runnable interface in that it provides a single method describing a task to execute. Unlike Runnable's void run() method, Callable<V>'s V call() throws Exception method can return a value and throw an exception.


Executor factory methods


Executor framework supplies the Executors utility class for this purpose. Executors offers several factory methods for obtaining different kinds of executors that offer specific thread-execution policies. Here are three examples:

  • ExecutorService newCachedThreadPool() creates a thread pool that creates new threads as needed, but which reuses previously constructed threads when they're available. Threads that haven't been used for 60 seconds are terminated and removed from the cache. This thread pool typically improves the performance of programs that execute many short-lived asynchronous tasks.
  • ExecutorService newSingleThreadExecutor() creates an executor that uses a single worker thread operating off an unbounded queue -- tasks are added to the queue and execute sequentially (no more than one task is active at any one time). If this thread terminates through failure during execution before shutdown of the executor, a new thread will be created to take its place when subsequent tasks need to be executed.
  • ExecutorService newFixedThreadPool(int nThreads) creates a thread pool that re-uses a fixed number of threads operating off a shared unbounded queue. At most nThreads threads are actively processing tasks. If additional tasks are submitted when all threads are active, they wait in the queue until a thread is available. If any thread terminates through failure during execution before shutdown, a new thread will be created to take its place when subsequent tasks need to be executed. The pool's threads exist until the executor is shut down.
Lets see an example to make the concept more clear.

We are creating two class below Task.java and ExecutorExample. EexecutorExample is main class which will drive the execution. Inside the ExecutorExample we have created Thread pool with 2 worker thread and 10 task will be submitted to thread pool. Lest see the code now and will analyse the output after that.

Task.java
public class Task implements Runnable {

    int taskNumber;

    Task(int taskNumber){
        this.taskNumber=taskNumber;
    }


    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+ 
        " executing task no "+ taskNumber);

        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


ExecutorExample.java
public class ExecutorExample {
    private final static int MAX_THREAD = 2;
    private final static int NO_TASK = 10;

    public static void main(String[] args) {

       // Creating the thread pool with fixed number of thread,
       // Executors is factory class which provide us different
       // kind of thread Pool , here we have created fixed
       // Thread Pool.

        ExecutorService threadPool = Executors.newFixedThreadPool(MAX_THREAD);

        // Two threads will be executing 10 task, so onyl 2 task
        // will be executing at a time,* rest of the task will
        // reside in queue. Once previous task is completed.
        // Next task is dequed from the queue and executed by
        // free thread.

        for (int i = 1; i <= NO_TASK; i++) {
            threadPool.execute(new Task(i));
        }

        /*
            * Initiates shutdown of executor, previously submitted
            * tasks are executed, but no new tasks will be accepted.
        */
        threadPool.shutdown();

    }
}

Output:
pool-1-thread-1 executing task no 1
pool-1-thread-2 executing task no 2
pool-1-thread-1 executing task no 3
pool-1-thread-2 executing task no 4
pool-1-thread-2 executing task no 5
pool-1-thread-1 executing task no 6
pool-1-thread-2 executing task no 7
pool-1-thread-1 executing task no 8
pool-1-thread-1 executing task no 10
pool-1-thread-2 executing task no 9

From the above output we can see that 2 task are being executed at a time. Once 2 task are completed another 2 task are dequed from queue.

Now we will see an example to see Future and callable are used in multi threading to get the result back from worker thread.

FutureExample.Java

public class FutureExample {

    private static final ExecutorService threadpool = Executors.newFixedThreadPool(1);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        FactorialCalculator task = new FactorialCalculator(10);
        System.out.println("Submitting task....");

        //only one task has been submitted
        Future future = threadpool.submit(task);

        System.out.println("Task is submitted");

        while (!future.isDone()) {
            System.out.println("Task is not completed yet ....");
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


        System.out.println("Task is completed, lets check result ....");
        long factorial = (Long) future.get();
        System.out.println("Factorial of 10 is :" + factorial);

        /*
          thread pool should always be shut down, previously submitted
          task will be executed but no new task will be accepted
        */
        threadpool.shutdown();
    }

    private static class FactorialCalculator implements Callable {

        private final int number;

        FactorialCalculator(int number) {
            this.number = number;
        }

        @Override
        public Long call() throws Exception {
            long output = 0;
            output = factorialNumber(number);
            return output;
        }


        private long factorialNumber(int number) throws InterruptedException {
            if (number < 0) {
                throw new IllegalArgumentException("Number must be greater than zero");
            }

            long result = 1;
            while (number > 0) {
                Thread.sleep(1);
                result = result * number;
                number--;
            }
            return result;
        }

    }
}

Output:
Submitting task....
Task is submitted
Task is not completed yet ....
Task is not completed yet ....
Task is not completed yet ....
Task is not completed yet ....
Task is not completed yet ....
Task is not completed yet ....
Task is not completed yet ....
Task is not completed yet ....
Task is not completed yet ....
Task is not completed yet ....
Task is not completed yet ....
Task is completed, lets check result ....
Factorial of 10 is :3628800


Output Analysis:
From the output we can see that once we submit the task to thread pool, worker thread execute the task and gives the result back.  We have applied while() loop which will keep on checking whether task is finised or not using   future.isDone() method.Once the task is finised , we get the result from future object using future.get() method and print on console.

As we have seen there are two different methods to submit the task to thread pool i.e submit() and execute()


Lets see what is the difference between executor's execute() and submit() method ?


1) execute() method is defined in Executor interface in java. whereas submit() method is defined in ExecutorService interface in java.
2) execute() method can be used for executing runnable task. whereas submit() can be used for executing runnable  task or callable task, submitted callable returns future and Future's get method will return the task's result.


This covers the concept of Executor and Executors Service Framework , if you have any doubt or you want to add anything then please comment below.



Comments

Popular posts from this blog

Deploy standalone Spring MVC Application in Docker Container

Refactor Code : Separate Query from Modifier

HTTP : Must known Protocol (Part 1)