PolarSPARC

Java 8 CompletableFuture :: Part 1


Bhaskar S 10/26/2018


Straight-through business process(es) typically involve multiple step(s) and todays Enterprises have many of them. Some of those step(s) could be time consuming and hence performed in parallel (asynchronously) and some in sequential (synchronous) fashion. In other words, a straight-through business process can be thought as a pipeline of task(s), some of which could be executed in parallel.

In Java, one could perform those asynchronous step(s) using Threads. However, one needs to carefully plan out the orchestration of the various step(s) in the business process without blocking and locking for optimal performance, which could be complex, error-prone, and difficult to reason about.

What if Java provided an out-of-the-box capability to chain a series of task(s), with some task(s) executing in parallel, without one having to write complex multi-threaded code ???

Please welcome CompletableFuture from Java 8!!!

NOTE: that we are referring to the tasks (or steps) within a single JVM and not distributed computing.

Lets jump right into some examples to illustrate the power and intricacies of CompletableFuture.

The following is one of the simplest examples that uses CompletableFuture:

Listing.1
package com.polarsparc.cf.CompletableFuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Sample01 {
    public static void main(String[] args) {
        {
            CompletableFuture cf = CompletableFuture.runAsync(() -> {
                System.out.printf("[%s] I am Cool\n", Thread.currentThread().getName());
            });
            
            try {
                cf.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }

        {
            CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
                System.out.printf("[%s] Am Awesome\n", Thread.currentThread().getName());
                return null;
            });
            
            try {
                cf.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
    
        {
            ExecutorService executor = Executors.newSingleThreadExecutor();
            
            CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
                System.out.printf("[%s] And am Smart\n", Thread.currentThread().getName());
                return null;
            }, executor);
            
            executor.shutdown();
            
            try {
                cf.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
    }
}

Executing the program from Listing.1 will generate the following output:

Output.1

[ForkJoinPool.commonPool-worker-3] I am Cool
[ForkJoinPool.commonPool-worker-3] Am Awesome
[pool-1-thread-1] And am Smart

The following are some of the concepts in the context of the code in Listing.1:

As seen from Listing.1 above, there are two ways to initiate an asynchronous operation - using runAsync() or using supplyAsync().

In the next example, we will demonstrate the case where an asynchronous task produces some value, which can then be consumed by the next task in the chain.

Listing.2
package com.polarsparc.cf.CompletableFuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Sample02 {
    public static void main(String[] args) {
        {
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I am Cool");
            CompletableFuture cf2 = cf1.thenAccept(msg -> 
                System.out.printf("[1] [%s] %s and am also Awesome\n", Thread.currentThread().getName(), msg));
            
            try {
                cf2.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        {
            ExecutorService executor = Executors.newSingleThreadExecutor();
            
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I am New", executor);
            CompletableFuture cf2 = cf1.thenAccept(msg -> 
                System.out.printf("[2] [%s] %s and am also Smart\n", Thread.currentThread().getName(), msg));
            
            executor.shutdown();
            
            try {
                cf2.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        {
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I am Fast");
            CompletableFuture cf2 = cf1.thenAcceptAsync(msg -> 
                System.out.printf("[3] [%s] %s and am also Elegant\n", Thread.currentThread().getName(), msg));
            
            try {
                cf2.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        {
            ExecutorService executor = Executors.newFixedThreadPool(2);
            
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I am Slick", executor);
            CompletableFuture cf2 = cf1.thenAcceptAsync(msg -> 
                System.out.printf("[4] [%s] %s and am also Nimble\n", Thread.currentThread().getName(), msg),
                executor);
            
            executor.shutdown();
            
            try {
                cf2.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
    }
}

Executing the program from Listing.2 will generate the following output:

Output.2

[1] [main] I am Cool and am also Awesome
[2] [main] I am New and am also Smart
[3] [ForkJoinPool.commonPool-worker-3] I am Fast and am also Elegant
[4] [pool-2-thread-2] I am Slick and am also Nimble

Re-running the program from Listing.2 once more will generate the following output:

Output.3

[1] [ForkJoinPool.commonPool-worker-3] I am Cool and am also Awesome
[2] [main] I am New and am also Smart
[3] [ForkJoinPool.commonPool-worker-3] I am Fast and am also Elegant
[4] [pool-2-thread-2] I am Slick and am also Nimble

Notice the change in the thread name of the first line between Output.2 and Output.3. We will explain that shortly in the following section.

The following are some of the concepts in the context of the code in Listing.2:

CAUTION

The default fork-join thread pool is used in other cases, such as, the parallel Streams, etc., and is hard to
customize since it is created by the JVM. It is therefore better to create and use a custom Executor.

Moving on to the next example, we will demonstrate the case where an asynchronous task produces some value, which can then be consumed by the next task in the chain as input and then generate a totally different value. This is similar to a map() operation on Streams.

Listing.3
package com.polarsparc.cf.CompletableFuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Sample03 {
    public static void main(String[] args) {
        {
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I am Cool");
            CompletableFuture cf2 = cf1.thenApply(msg -> {
                System.out.printf("[1] [%s] %s\n", Thread.currentThread().getName(), msg);
                return String.format("%s and AWESOME !!!", msg);
            });
        
            try {
                String msg = cf2.get();
                System.out.printf("[1] %s\n", msg);
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        {
            ExecutorService executor = Executors.newSingleThreadExecutor();
            
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I am New");
            CompletableFuture cf2 = cf1.thenApply(msg -> {
                System.out.printf("[2] [%s] %s\n", Thread.currentThread().getName(), msg);
                return String.format("%s and SMART !!!", msg);
            });
            
            executor.shutdown();
            
            try {
                String msg = cf2.get();
                
                System.out.printf("[2] %s\n", msg);
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        {
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I am Fast");
            CompletableFuture cf2 = cf1.thenApplyAsync(msg -> {
                System.out.printf("[3] [%s] %s\n", Thread.currentThread().getName(), msg);
                return String.format("%s and ELEGANT !!!", msg);
            });
        
            try {
                String msg = cf2.get();
                
                System.out.printf("[3] %s\n", msg);
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        {
            ExecutorService executor = Executors.newFixedThreadPool(2);
            
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I am Slick");
            CompletableFuture cf2 = cf1.thenApplyAsync(msg -> {
                System.out.printf("[4] [%s] %s\n", Thread.currentThread().getName(), msg);
                return String.format("%s and NIMBLE !!!", msg);
            }, executor);
            
            executor.shutdown();
        
            try {
                String msg = cf2.get();
                
                System.out.printf("[4] %s\n", msg);
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
    }
}

Executing the program from Listing.3 will generate the following output:

Output.4

[1] [ForkJoinPool.commonPool-worker-3] I am Cool
[1] I am Cool and AWESOME !!!
[2] [main] I am New
[2] I am New and SMART !!!
[3] [ForkJoinPool.commonPool-worker-3] I am Fast
[3] I am Fast and ELEGANT !!!
[4] [pool-2-thread-1] I am Slick
[4] I am Slick and NIMBLE !!!

The following are some of the concepts in the context of the code in Listing.3:

In the next example, we will demonstrate the case where an asynchronous task produces some value, that is then consumed by a second task in the chain to produce some output, which is finally consumed by a third task in the chain.

Listing.4
package com.polarsparc.cf.CompletableFuture;

import java.util.concurrent.CompletableFuture;

public class Sample04 {
    public static void main(String[] args) {
        {
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I'm Cool");
            CompletableFuture cf2 = cf1.thenApply(msg -> 
                String.format("%s and am Super AWESOME !!!", msg));
            CompletableFuture cf3 = cf2.thenAccept(msg -> 
                System.out.printf("[1] %s\n", msg));
            
            try {
                cf3.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        // Fluent style
        {
            CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Awesome")
                .thenApply(msg -> String.format("%s and am Super COOL !!!", msg))
                .thenAccept(msg ->     System.out.printf("[2] %s\n", msg));
            
            try {
                cf.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
    }
}

Executing the program from Listing.4 will generate the following output:

Output.5

[1] I'm Cool and am Super AWESOME !!!
[2] I'm Awesome and am Super COOL !!!

The first block of code in Listing.4 shows how one can chain the tasks explicitly, while the second block shows the same code in the fluent style.

The following table summarizes the methods we covered in this article from the CompletableFuture class:

Method Usage Description
runAsync(Runnable) Uses an instance of Runnable to start the pipeline. Does not generate any output value
supplyAsync(Supplier<T>) Uses an instance of Supplier<T> to start the pipeline. Generates an output value of type T
thenAccept(Consumer<T>) Uses an instance of Consumer<T> to accept a value of type T from the prior task in the pipeline. Does not generate any output value
thenAcceptAsync(Consumer<T>) Uses an instance of Consumer<T> to accept a value of type T from the prior task in the pipeline in an asynchronous fashion. Does not generate any output value
thenApply(Function<T, U>) Uses an instance of Function<T, U> to accept a value of type T from the prior task in the pipeline. Generates an output value of type U
thenApplyAsync(Function<T, U>) Uses an instance of Function<T, U> to accept a value of type T from the prior task in the pipeline in an asynchronous fashion. Generates an output value of type U

More to come in Part 2 of this article ...



© PolarSPARC