Ideas2IT rewards key players with 1/3rd of the Company in New Initiative.  Read More >
Back to Blogs

A Deep Dive Into Asynchronous Programming in Java 8

Java has introduced Executors, runnable and callable threads to implement asynchronous programming with ease.This blog post will guide you on how to implement asynchronous task execution correctly in the following instances:

  1. Execute the list of tasks without waiting for their completion.
  2. Execute the list of tasks at the scheduled time and no need to wait for the completion
  3. Need to finish an unpredictable number of tasks, as soon as possible
  4. Need to finish a predictable number of tasks, as soon as possible

Before going into a detailed view of the above scenarios let us take a quick glance at a few of the components that we will be commonly using.

Executors

The class Executors provides convenient factory methods for creating different kinds of executor services.

Callables and Futures

Callables are functional interfaces like runnable but with a return value.

Since submit() doesn't wait until the task completes, the executor service returns a special result of type Future which can be used to retrieve the actual result later.

public class FeatureProgram {

    public static void main(String[] args) {

        Callable<Integer> task = () -> {

            try {

                TimeUnit.SECONDS.sleep(1);

                return 1;

            } catch (InterruptedException e) {}

            return null;

        };

        ExecutorService executor = Executors.newFixedThreadPool(1);

        Future<Integer> future = executor.submit(task);

        System.out.println("completed? " + future.isDone());

     

        Integer result = null;

        try {

            result = future.get(2, TimeUnit.SECONDS);

        } catch (InterruptedException | ExecutionException | TimeoutException e) {}

        System.out.println("completed? " + future.isDone());

        System.out.print("result: " + result);

    }

}

Output:completed? falsecompleted? trueresult: 1TimeoutsAny call to future.get() is synchronous and waits until the callable has been terminated. In the worst case a callable runs forever - thus making the application unresponsive. Passing a timeout saves us. In the above example, there will be a TimeoutException thrown if the callable does not complete within 2 seconds.

Now let us dive into the 4 types of scenarios

Instance 1

Execute a list of asynchronous tasks in parallel, but the execution needs to start instantly.Here we can leverage the newSingleThreadExecutor which will initiate a single thread and we create n number of newSingleThreadExecutor instances for n number of parallel executable tasks.

public class SingleThreadExecutor implements Runnable {

    private static int index = 0;

    private final String id = "index" + ++index;

    public SingleThreadExecutor() {}

    @Override

    public void run() {

        System.out.println("Started: " + id);

    }

    public static void main(String[] args) {

        System.out.println("Starting Single Thread Executor");

        ExecutorService executor = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 5; ++i) {

            executor.execute(new SingleThreadExecutor());

        }

        executor.shutdown();

    }

}

Output:Starting Single Thread ExecutorStarted: index1Started: index2Started: index3Started: index4Started: index5

Instance 2

Execute a list of asynchronous tasks in parallel but the execution needs to be scheduled for later.We can use newSingleThreadScheduledExecutor() which supports scheduling.To execute a task in the scheduled executor, you have to use the schedule() method that receives the following three parameters:

  1. The task you want to execute
  2. The period of time you want the task to wait before its execution
  3. The unit of the period of time, specified as a constant of the TimeUnit class

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();executor.schedule(task1, 5 , TimeUnit.SECONDS); // To start the task after 5 secsIf the single thread terminates due to a failure during execution prior to shutdown, the newSingleThreadScheduledExecutor can create a new one to execute subsequent tasks.

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.TimeUnit;

import java.util.Date;

public class ScheduledThreadPoolExample implements Runnable {

    private String name;

    public ScheduledThreadPoolExample(String name) {

        this.name = name;

    }

    public String getName() {

        return name;

    }

    @Override

    public void run() {

        try {

            System.out.println("Execute : " + name + " at" + new Date());

        } catch (Exception e) {}

    }

    public static void main(String[] args) {

        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

        ScheduledThreadPoolExample task1 = new ScheduledThreadPoolExample("Scheduled Task 1");

        ScheduledThreadPoolExample task2 = new ScheduledThreadPoolExample("Scheduled Task 2");

        System.out.println("The time is : " + new Date());

        executor.schedule(task1, 5, TimeUnit.SECONDS);

        executor.schedule(task2, 10, TimeUnit.SECONDS);

        executor.shutdown();

        System.out.println("Now thread is ready to do next task" + new Date());

    }

}

Output:The time is : Wed Oct 13 10:18:37 IST 2021Now thread is ready to do next taskWed Oct 13 10:18:37 IST 2021Execute : Scheduled Task 1 atWed Oct 13 10:18:42 IST 2021Execute : Scheduled Task 2 atWed Oct 13 10:18:47 IST 2021

Instance 3

Execute an unpredictable number of tasks, as soon as possible.We can use newCachedThreadPool(), which creates a new thread for an incoming task when there are no idle threads in the pool. It can create a new thread for every incoming task on demand up to Integer.MAX_VALUE.To better manage system resources, cached thread pools will remove threads that remain idle for one minute.

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.Date;

public class CachedThreadPoolExample {

    public static void main(final String[] arguments) {

        ExecutorService executor = Executors.newCachedThreadPool();

        ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;

        for (int i = 0; i < 4; ++i) {

            executor.submit(new Task(i));

        }

        System.out.println("Max Threads | Max Execution | Current Threads | Total Threads" + pool.getLargestPoolSize());

        System.out.println(pool.getMaximumPoolSize() + " | " + pool.getLargestPoolSize() + " | " + pool.getPoolSize() + " | " + pool.getTaskCount());

        executor.shutdown();

    }

    static class Task implements Runnable {

        int id;

        public Task(int id) {

            this.id = id;

        }

        public void run() {

            System.out.println("Thread:" + id + ", Executed at: " + new Date());

        }

    }

}

OutPut:Max Threads | Max Execution | Current Threads | Total Threads42147483647   |     4       |       4      |      4Thread:2, Executed at: Wed Oct 13 10:43:03 IST 2021Thread:1, Executed at: Wed Oct 13 10:43:03 IST 2021Thread:0, Executed at: Wed Oct 13 10:43:03 IST 2021Thread:3, Executed at: Wed Oct 13 10:43:03 IST 2021

Instance 4

Execute a predictable number of tasks, as soon as possible.To resolve this problem, we can use newFixedThreadPool that uses a fixed number of never-expiring threads. Therefore, instead of an ever-increasing number of threads, the fixed thread pool tries to execute incoming tasks with a fixed amount of threads. When all threads are busy, then the executor will queue new tasks.  This way, we have more control over resource consumption.Fixed thread pools are better suited for tasks with predictable execution times.

import java.util.concurrent.*;

import java.util.*;

import java.util.concurrent.atomic.AtomicInteger;

import java.util.stream.Collectors;

public class FixedThreadPoolExample {

    public static void main(String[] args) {

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);

        List<Integer> numbers = new ArrayList<>();

        

        for (int i = 1; i < 78; i++) {

            numbers.add(i);

        }

        AtomicInteger counter = new AtomicInteger();

        int chunkSize = 10;

        Collection<List<Integer>> chunks = numbers.stream()

                .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / chunkSize))

                .values();

        int i = 1;

        Set<Future<Map<Object, String>>> futures = new HashSet<>();

        

        System.out.println("Task started at " + new Date());

        

        for (List<Integer> number : chunks) {

            Callable<Map<Object, String>> callable = new Task(number, "Task" + i++);

            Future<Map<Object, String>> future = executor.submit(callable);

            futures.add(future);

        }

        Map<Object, String> result = new HashMap<>();

        

        for (Future<Map<Object, String>> re : futures) {

            try {

                result.putAll(re.get());

            } catch (InterruptedException | ExecutionException e) {

                e.printStackTrace();

            }

        }

        System.out.println("Largest executions: " + executor.getLargestPoolSize());

        System.out.println("Maximum allowed threads: " + executor.getMaximumPoolSize());

        System.out.println("Current threads in pool: " + executor.getPoolSize());

        System.out.println("Currently executing threads: " + executor.getActiveCount());

        System.out.println("Total number of threads(ever scheduled): " + executor.getTaskCount());

        executor.shutdown();

        

        System.out.println("No of task completed:: " + result.size() + " at " + new Date());

    }

    static class Task implements Callable<Map<Object, String>> {

        private List<Integer> numbers;

        private String name;

        Task(List<Integer> numbers, String name) {

            this.numbers = numbers;

            this.name = name;

        }

        @Override

        public Map<Object, String> call() throws Exception {

            Map<Object, String> result = new HashMap<>();

            

            try {

                TimeUnit.SECONDS.sleep(5);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            

            numbers.forEach(number -> result.put(number, "success"));

            

            return result;

        }

    }

}

Output:Task started at Wed Oct 13 10:51:12 IST 2021Largest executions: 8Maximum allowed threads: 10Current threads in pool: 8Currently executing threads: 0Total number of threads(ever scheduled): 8No of task completed:: 77 at Wed Oct 13 10:51:17 IST 2021

In this example, We have an undefined number of tasks, but a predictable range. We can classify the total number of tasks into small chunks. Each chunk can be assigned to a single thread. All the chunks can be finished in more or less the same duration.

If the number of tasks may increase slightly, we can handle it by increasing the size of the chunk. If the number of tasks increases huge, we can handle it by increasing the size of the thread pool.Are you looking to build a great product or service? Do you foresee technical challenges? If you answered yes to the above questions, then you must talk to us. Ready to master asynchronous programming in Java and optimize task execution? Dive into our guide today! Interested in personalized assistance? Book a free consultation with us.

Ideas2IT Team

Connect with Us

We'd love to brainstorm your priority tech initiatives and contribute to the best outcomes.