A deep dive into Asynchronous Programming in Java 8 - Ideas2IT

A deep dive into Asynchronous Programming in Java 8

Share This

Java has introduced Executors, runnable and callable threads to implement asynchronous programming with ease.

This blog post will guide you on how to implement asynchronous tasks execution in the right manner under the following instances.

  1. Execute the list of tasks and no need to wait for completion
  2. Execute the list of tasks at the scheduled time and no need to wait for completion
  3. Need to finish an unpredictable number of tasks, as soon as possible
  4. Need to finish a predictable number of tasks, as soon as possible

Before going into a detailed view of the above scenarios let us take a quick glance on a few of the components that we will be commonly using.

Executors

The class Executors provides convenient factory methods for creating different kinds of executor services.

Callables and Futures

Callables are functional interfaces like runnables but with a return value.

Since submit() doesn’t wait until the task completes, the executor service returns a special result of type Future which can be used to retrieve the actual result later.

public class FeatureProgram {

   public static void main(String[] args) {

       Callable<Integer> task = () -> {

           try {

               TimeUnit.SECONDS.sleep(1);

               return 1;

           }

           catch (InterruptedException e) {}

           return null;

       };

       ExecutorService executor = Executors.newFixedThreadPool(1);

       Future<Integer> future = executor.submit(task);

       System.out.println(“completed? ” + future.isDone());

       Integer result = null;

       try {

           result = future.get(2, TimeUnit.SECONDS);

       } catch (InterruptedException | ExecutionException | TimeoutException e) {}

       System.out.println(“completed? ” + future.isDone());

       System.out.print(“result: ” + result);

   }

}

Output:

completed? false

completed? true

result: 1

Timeouts

Any call to future.get() is synchronous and waits until the callable has been terminated. In the worst case a callable runs forever – thus making the application unresponsive. Passing a timeout saves us. In the above example, there will be a TimeoutException thrown if the callable does not complete within 2 seconds.

Now let us dive into the 4 types of scenarios

Instance 1

Execute a list of asynchronous tasks in parallel, but the execution needs to start instantly.

Here we can leverage the newSingleThreadExecutor which will initiate a single thread and we create n number of newSingleThreadExecutor instances for n number of parallel executable tasks.

public class SingleThreadExecutor implements Runnable {

   private static int index = 0;

   private final String id = “index” + ++index;

   public SingleThreadExecutor() {}

   @Override

   public void run() {

       System.out.println(“Started: ” + id);

   }

   public static void main(String[] args) {

       System.out.println(“Starting Single Thread Executor”);

       ExecutorService executor = Executors.newSingleThreadExecutor();

       for (int i = 0; i < 5; ++i) {

           executor.execute(new SingleThreadExecutor());

       }

       executor.shutdown();

   }

}

Output:

Starting Single Thread Executor

Started: index1

Started: index2

Started: index3

Started: index4

Started: index5

Instance 2

Execute a list of asynchronous tasks in parallel but the execution needs to be scheduled for later.

We can use newSingleThreadScheduledExecutor() which supports scheduling.

To execute a task in the scheduled executor, you have to use the schedule() method that receives the following three parameters:

  1. The task you want to execute
  2. The period of time you want the task to wait before its execution
  3. The unit of the period of time, specified as a constant of the TimeUnit class

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

executor.schedule(task1, 5 , TimeUnit.SECONDS); // To start the task after 5 secs

If the single thread terminates due to a failure during execution prior to shutdown, the newSingleThreadScheduledExecutor can create a new one to execute subsequent tasks.

 

public class ScheduledThreadPoolExample implements Runnable {

   public static void main(String[] args) {

       ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

       ScheduledThreadPoolExample task1 = new ScheduledThreadPoolExample (“Scheduled Task 1”);

       ScheduledThreadPoolExample task2 = new ScheduledThreadPoolExample (“Scheduled Task 2”);

       System.out.println(“The time is : ” + new Date());

       executor.schedule(task1, 5 , TimeUnit.SECONDS);

       executor.schedule(task2, 10 , TimeUnit.SECONDS);

       executor.shutdown();

       System.out.println(“Now thread is ready to do next task” + new Date());

   }

   private String name;

   public ScheduledThreadPoolExample(String name) {

       this.name = name;

   }

   public String getName() {

       return name;

   }

   @Override

   public void run()

   {

       try {

           System.out.println(“Execute : ” + name + ” at” + new Date());

       }

       catch (Exception e) {}

   }

}

Output:

The time is : Wed Oct 13 10:18:37 IST 2021

Now thread is ready to do next taskWed Oct 13 10:18:37 IST 2021

Execute : Scheduled Task 1 atWed Oct 13 10:18:42 IST 2021

Execute : Scheduled Task 2 atWed Oct 13 10:18:47 IST 2021

Instance 3

Execute an unpredictable number of tasks, as soon as possible.

We can use newCachedThreadPool(), which creates a new thread for an incoming task when there are no idle threads in the pool. It can create a new thread for every incoming task on demand up to Integer.MAX_VALUE.

To better manage system resources, cached thread pools will remove threads that remain idle for one minute.

public class CachedThreadPoolExample {

   public static void main(final String[] arguments) {

       ExecutorService executor = Executors.newCachedThreadPool();

       ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;

       for (int i = 0; i < 4; ++i) {

           executor.submit(new Task(i));

       }

       System.out.println(“Max Threads | Max Execution | Current Threads | Total Threads” + pool.getLargestPoolSize());

       System.out.println(pool.getMaximumPoolSize() + ”   |     ” + pool.getLargestPoolSize() + ”       |       ” + pool.getPoolSize()

       + ”      |      ” + pool.getTaskCount());

       executor.shutdown();

   }

   static class Task implements Runnable {

       int id;

       public Task(int id) {

           this.id = id;

       }

       public void run() {

           System.out.println(“Thread:” + id + “, Executed at: ” + new Date());

       }

   }

}

 

OutPut:

Max Threads | Max Execution | Current Threads | Total Threads4

2147483647   |     4       |       4      |      4

Thread:2, Executed at: Wed Oct 13 10:43:03 IST 2021

Thread:1, Executed at: Wed Oct 13 10:43:03 IST 2021

Thread:0, Executed at: Wed Oct 13 10:43:03 IST 2021

Thread:3, Executed at: Wed Oct 13 10:43:03 IST 2021

Instance 4

Execute a predictable number of tasks, as soon as possible.

To resolve this problem, we can use newFixedThreadPool that uses a fixed number of never-expiring threads. Therefore, instead of an ever-increasing number of threads, the fixed thread pool tries to execute incoming tasks with a fixed amount of threads. When all threads are busy, then the executor will queue new tasks.  This way, we have more control over resource consumption.

Fixed thread pools are better suited for tasks with predictable execution times.

 

public class FixedThreadPoolExample {

   public static void main(String[] args) {

       ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);

       List<Integer> numbers = new ArrayList<>();

       for (int i = 1; i < 78; i++) {

           numbers.add(i);

       }

       //Divide list of application Ids into small chunks

       AtomicInteger counter = new AtomicInteger();

       int chunkSize = 10;//We can define based on the total number of tasks

       Collection<List<Integer>> chunks = numbers.stream()

               .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / chunkSize))

               .values();

       int i = 1;

       Set<Future<Map<Object, String>>> futures = new HashSet<>();

       System.out.println(“Task started at ” + new Date());

       for (List<Integer> number : chunks) {

           Callable<Map<Object, String>> callable = new Task(number, “Task” + i++);

           Future<Map<Object, String>> future = executor.submit(callable);

           futures.add(future);

       }

       //Wait to complete all the task and collect the result

       Map<Object, String> result = new HashMap<>();

       for (Future<Map<Object, String>> re : futures) {

           try {

               result.putAll(re.get());

           } catch (InterruptedException e) {

               e.printStackTrace();

           } catch (ExecutionException e) {

               e.printStackTrace();

           }

       }

       System.out.println(“Largest executions: ” + executor.getLargestPoolSize());

       System.out.println(“Maximum allowed threads: ” + executor.getMaximumPoolSize());

       System.out.println(“Current threads in pool: ” + executor.getPoolSize());

       System.out.println(“Currently executing threads: ” + executor.getActiveCount());

       System.out.println(“Total number of threads(ever scheduled): ” + executor.getTaskCount());

       //Shutdown the executor

       executor.shutdown();

       System.out.println(“No of task completed:: ” + result.size() + ” at ” + new Date());

   }

   static class Task implements Callable<Map<Object, String>> {

       private List<Integer> numbers;

       private String name;

       Task(List<Integer> numbers, String name) {

           this.numbers = numbers;

           this.name = name;

       }

       @Override

       public Map<Object, String> call() throws Exception {

           Map<Object, String> result = new HashMap<>();

           try {

               TimeUnit.SECONDS.sleep(5);

           } catch (InterruptedException e) {

               e.printStackTrace();

           }

           numbers.forEach(number -> {

               result.put(number, “success”);

           });

           return result;

       }

   }

}

 

Output:

Task started at Wed Oct 13 10:51:12 IST 2021

Largest executions: 8

Maximum allowed threads: 10

Current threads in pool: 8

Currently executing threads: 0

Total number of threads(ever scheduled): 8

No of task completed:: 77 at Wed Oct 13 10:51:17 IST 2021

In this example, We have an undefined number of tasks, but a predictable range. We can classify the total number of tasks into small chunks. Each chunk can be assigned to a single thread. All the chunks can be finished in more or less the same duration.

If the number of tasks may increase slightly, we can handle it by increasing the size of the chunk. If the number of tasks increases huge, we can handle it by increasing the size of the thread pool.