PolarSPARC

Java 8 CompletableFuture :: Part 2


Bhaskar S 11/02/2018


In Part 1 of this series, we demonstrated some of the capabilities and nuances of CompletableFuture. We will continue our journey to illustrate the other features in CompletableFuture.

The following example demonstrates a pipeline with the ability to combine outputs from two asynchronous tasks that are independent of each other:

Listing.5
package com.polarsparc.cf.CompletableFuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Sample05 {
    public static void main(String[] args) {
        // Explicit style
        {
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I'm Cool");
            CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> "am Slick !!!");
            CompletableFuture cf3 = cf1.thenCombine(cf2,
                (s1, s2) -> String.format("%s AND %s", s1, s2));
            CompletableFuture cf = cf3.thenAccept(msg ->
                System.out.printf("[1] [%s] %s\n", Thread.currentThread().getName(), msg));
            
            try {
                cf.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        // Fluent style
        {
            CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Smart")
                .thenCombine(CompletableFuture.supplyAsync(() -> "am Nimble !!!"),
                    (s1, s2) -> String.format("%s AND %s", s1, s2))
                .thenAccept(msg -> 
                    System.out.printf("[2] [%s] %s\n", Thread.currentThread().getName(), msg));
            
            try {
                cf.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        // Fluent style using Async
        {
            CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Awesome")
                .thenCombineAsync(CompletableFuture.supplyAsync(() -> "am Fast !!!"),
                    (s1, s2) -> String.format("%s AND %s", s1, s2))
                .thenAcceptAsync(msg -> 
                    System.out.printf("[3] [%s] %s\n", Thread.currentThread().getName(), msg));
            
            try {
                cf.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        // Fluent style using Async with an Executor [1]
        {
            ExecutorService executor = Executors.newFixedThreadPool(4);
            
            CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Stunning", executor)
                .thenCombineAsync(CompletableFuture.supplyAsync(() -> "am New !!!"),
                    (s1, s2) -> String.format("%s AND %s", s1, s2), executor)
                .thenAcceptAsync(msg -> 
                    System.out.printf("[4] [%s] %s\n", Thread.currentThread().getName(), msg), executor);
            
            executor.shutdown();
            
            try {
                cf.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        // Fluent style using Async with an Executor [2]
        {
            ExecutorService executor = Executors.newFixedThreadPool(4);
            
            CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Agile", executor)
                .thenCombineAsync(CompletableFuture.supplyAsync(() -> "am Quick !!!"),
                    (s1, s2) -> String.format("%s AND %s", s1, s2), executor)
                .thenAcceptAsync(msg -> 
                    System.out.printf("[5] [%s] %s\n", Thread.currentThread().getName(), msg), executor);
            
            try {
                cf.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
            
            executor.shutdown();
        }
    }
}

Executing the program from Listing.5 will generate the following output:

Output.6

[1] [main] I'm Cool AND am Slick !!!
[2] [main] I'm Smart AND am Nimble !!!
[3] [ForkJoinPool.commonPool-worker-5] I'm Awesome AND am Fast !!!
[4] [pool-1-thread-3] I'm Stunning AND am New !!!
[5] [pool-2-thread-3] I'm Agile AND am Quick !!!

Re-running the program from Listing.5 few more times will generate the following output:

Output.7

[1] [ForkJoinPool.commonPool-worker-3] I'm Cool AND am Slick !!!
[2] [main] I'm Smart AND am Nimble !!!
[3] [ForkJoinPool.commonPool-worker-3] I'm Awesome AND am Fast !!!
java.util.concurrent.ExecutionException: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniAccept@1767e42a rejected from java.util.concurrent.ThreadPoolExecutor@1358721e[Shutting down, pool size = 2, active threads = 1, queued tasks = 0, completed tasks = 1]
  at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
  at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
  at com.polarsparc.cf.CompletableFuture.Sample05.main(Sample05.java:71)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$UniAccept@1767e42a rejected from java.util.concurrent.ThreadPoolExecutor@1358721e[Shutting down, pool size = 2, active threads = 1, queued tasks = 0, completed tasks = 1]
  at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
  at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
  at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
  at java.base/java.util.concurrent.CompletableFuture$UniCompletion.claim(CompletableFuture.java:568)
  at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:710)
  at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
  at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
  at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:1186)
  at java.base/java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1208)
  at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:834)
[5] [pool-2-thread-3] I'm Agile AND am Quick !!!

Notice the excepion in Output.7. This happens because the executor was shutdown before all the task submissions could be completed.

The following are some of the concepts in the context of the code in Listing.5:

CAUTION

When using a custom Executor, ensure the shutdown() method is invoked only after all the tasks have completed execution.

In the next example, we will demonstrate a pipeline with the ability to combine outputs from two asynchronous tasks where the next task in the chain is dependent on the result from the current task in the chain. In other words, the output from the current task is consumed by the next task in the chain as input, and returns an instance of CompletionStage that will generate the result (in the future) when that task completes execution.

This may seem mouthful and confusing - but we will break it down in the following section.

Listing.6
package com.polarsparc.cf.CompletableFuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Sample06 {
    public static void main(String[] args) {
        // Explicit style
        {
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I'm Cool");
            CompletableFuture cf2 = cf1.thenCompose(s ->
                CompletableFuture.supplyAsync(() -> s + " & am SLICK !!!"));
            CompletableFuture cf = cf2.thenAccept(msg ->
                System.out.printf("[1] [%s] %s\n", Thread.currentThread().getName(), msg));
            
            try {
                cf.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        // Fluent style
        {
            CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Smart")
                .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " & am NIMBLE !!!"))
                .thenAccept(msg -> 
                    System.out.printf("[2] [%s] %s\n", Thread.currentThread().getName(), msg));
            
            try {
                cf.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        // Fluent style using Async
        {
            CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Awesome")
                    .thenComposeAsync(s -> CompletableFuture.supplyAsync(() -> s + " & am FAST !!!"))
                    .thenAcceptAsync(msg -> 
                        System.out.printf("[3] [%s] %s\n", Thread.currentThread().getName(), msg));
            
            try {
                cf.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        // Fluent style using Async with an Executor
        {
            ExecutorService executor = Executors.newFixedThreadPool(4);
            
            CompletableFuture cf = CompletableFuture.supplyAsync(() -> "I'm Awesome", executor)
                    .thenComposeAsync(s -> CompletableFuture.supplyAsync(() -> s + " & am FAST !!!"),
                        executor)
                    .thenAcceptAsync(msg -> 
                        System.out.printf("[4] [%s] %s\n", Thread.currentThread().getName(), msg),
                        executor);
            
            try {
                cf.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
            
            executor.shutdown();
        }
    }
}

Executing the program from Listing.6 will generate the following output:

Output.8

[1] [ForkJoinPool.commonPool-worker-5] I'm Cool & am SLICK !!!
[2] [main] I'm Smart & am NIMBLE !!!
[3] [ForkJoinPool.commonPool-worker-5] I'm Awesome & am FAST !!!
[4] [pool-1-thread-3] I'm Awesome & am FAST !!!

The following are some of the concepts in the context of the code in Listing.6:

On one hand, this may seem similar to the method thenApply(Function<T, U>), which takes an input value (of type T) from the current task in the pipeline and generates a result (of type U) immediately, when the specified Function<T, U> completes execution. This is similar to a map() operation on Streams.

The following picture illustrates how thenApply(Function) works:

theApply
thenApply

On the other hand, the method thenCompose(Function<T, CompletionStage<U>>) takes an input value (of type T) from the current task in the pipeline and returns an instance of type CompletionStage<U>>. When this returned CompletionStage<U>> completes execution (in the future), that task will generate the desired result (of type U). This is similar to a flatmap() operation on Streams.

The following picture illustrates how thenCompose(Function) works:

theCompose
thenCompose

Note, if we had used the method thenApply(Function<T, CompletionStage<U>>) instead of the method thenCompose(Function<T, CompletionStage<U>>), we would get a return value of type CompletionStage<CompletionStage<U>>. Instead it is being flattened by using the method thenCompose(Function<T, CompletionStage<U>>).

Moving on to the next example, we will demonstrate the case where two separate asynchronous tasks each produce a value, which can then be consumed by the next task in the chain as inputs to generate a totally different value.

Listing.7
package com.polarsparc.cf.CompletableFuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Sample07 {
    public static void main(String[] args) {
        {
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I am Cool");
            CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> "am Slick !!!");
            CompletableFuture cf3 = cf1.thenAcceptBoth(cf2, (s1, s2) -> 
                System.out.printf("[1] [%s] %s and %s\n", Thread.currentThread().getName(), s1, s2));
            
            try {
                cf3.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        {
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I am Fast");
            CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> "am Nimble !!!");
            CompletableFuture cf3 = cf1.thenAcceptBothAsync(cf2, (s1, s2) -> 
                System.out.printf("[2] [%s] %s and %s\n", Thread.currentThread().getName(), s1, s2));
            
            try {
                cf3.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        {
            ExecutorService executor = Executors.newFixedThreadPool(3);
            
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "I am Stunning", executor);
            CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> "am Quick !!!", executor);
            CompletableFuture cf3 = cf1.thenAcceptBothAsync(cf2, (s1, s2) -> 
                System.out.printf("[3] [%s] %s and %s\n", Thread.currentThread().getName(), s1, s2), executor);
            
            try {
                cf3.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
            
            executor.shutdown();
        }
    }
}

Executing the program from Listing.7 will generate the following output:

Output.9

[1] [main] I am Cool and am Slick !!!
[2] [ForkJoinPool.commonPool-worker-3] I am Fast and am Nimble !!!
[3] [pool-1-thread-3] I am Stunning and am Quick !!!

The following are some of the concepts in the context of the code in Listing.7:

The following table summarizes the methods we covered in this article from the CompletableFuture class:

Method Usage Description
thenCombine(CompletionStage<U>, BiFunction<T, U, R>) Takes an instance of type CompletionStage<U> (as the first argument) and an instance of type BiFunction<T, U, R> (as the second argument). The function in the second argument is executed once the prior task (or stage) in the pipeline as well as the task wrapped in the CompletionStage<U> (first argument) have completed execution. The function generates an output value of type R
thenCombineAsync(CompletionStage<U>, BiFunction<T, U, R>) Takes two input arguments of type CompletionStage<U> (first argument) and of type BiFunction<T, U, R> (second argument). The function in the second argument is executed asynchronously once the prior task (or stage) in the pipeline as well as the task wrapped in the first argument have completed execution. The function generates an output value of type R
thenCompose(Function<T, CompletionStage<U>>) Takes an instance of type Function<T, CompletionStage<U>> which consumes the value (of type T) produced by the previous stage (or task) in the pipeline as an input and returns an instance of type CompletionStage<U>>. We get an output value (of type U) when this returned CompletionStage<U>> completes execution
thenComposeAsync(Function<T, CompletionStage<U>>) Takes an instance of type Function<T, CompletionStage<U>> which is executed asynchronously. The function consumes the value (of type T) produced by the previous stage (or task) in the pipeline as an input and returns an instance of type CompletionStage<U>>. We get an output value (of type U) when this returned CompletionStage<U>> completes execution
thenAcceptBoth(CompletionStage<U>, BiConsumer<T, U>) Takes two arguments - an instance of type CompletionStage<U> and an instance of type BiConsumer<T, U>. The function in the second argument is executed once the prior task (or stage) in the pipeline as well as the task wrapped in the first argument have completed execution. No result is generated
thenAcceptBothAsync(CompletionStage<U>, BiConsumer<T, U>) Takes an instance of type CompletionStage<U> and an instance of type BiConsumer<T, U>. The function in the second argument is executed asynchronously once the prior task (or stage) in the pipeline as well as the task wrapped in the first argument have completed execution. No result is generated

More to come in Part 3 of this article ...



© PolarSPARC