PolarSPARC

Java 8 CompletableFuture :: Part 3


Bhaskar S 11/04/2018


In Part 1 and Part 2 of this series, we demonstrated some of the capabilities and nuances of CompletableFuture. In this part, we will illustrate few more features, including exception handling in CompletableFuture.

Often times there are scenarios when either of the two outcomes is enough to proceed to the next stage in a pipeline.

The following example demonstrates a pipeline with the ability to choose an output from either of the two asynchronous tasks based on which of the two completes first and consume that output:

Listing.8
package com.polarsparc.cf.CompletableFuture;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Sample08 {
    private static final Random random = new Random();
    
    public static final void randomDelay() {
        try {
            Thread.sleep(random.nextInt(500));
        }
        catch (Exception ex) {
            // Ignore
        }
    }
    
    // ----- Main -----
    
    public static void main(String[] args) {
        {
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
                randomDelay();
                return "I am Awesome";
            });
            CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
                randomDelay();
                return "I am Cool";
            });
            CompletableFuture cf3 = cf1.acceptEither(cf2, msg -> 
                System.out.printf("[1] [%s] %s and am NIMBLE !!!\n", Thread.currentThread().getName(), msg));
            
            try {
                cf3.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        {
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
                randomDelay();
                return "I am Stunning";
            });
            CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
                randomDelay();
                return "I am Fast";
            });
            CompletableFuture cf3 = cf1.acceptEitherAsync(cf2, msg -> 
                System.out.printf("[2] [%s] %s and am SLICK !!!\n", Thread.currentThread().getName(), msg));
            
            try {
                cf3.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        {
            ExecutorService executor = Executors.newFixedThreadPool(3);
            
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
                randomDelay();
                return "I am Quick";
            });
            CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
                randomDelay();
                return "I am Elegant";
            });
            CompletableFuture cf3 = cf1.acceptEitherAsync(cf2, msg -> 
                System.out.printf("[3] [%s] %s and am NEW !!!\n", Thread.currentThread().getName(), msg),
                executor);
            
            try {
                cf3.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
            
            executor.shutdown();
        }
    }
}

Executing the program from Listing.8 will generate the following output:

Output.10

[1] [ForkJoinPool.commonPool-worker-3] I am Awesome and am NIMBLE !!!
[2] [ForkJoinPool.commonPool-worker-9] I am Fast and am SLICK !!!
[3] [pool-1-thread-1] I am Elegant and am NEW !!!

Re-running the program from Listing.8 will generate the following output:

Output.11

[1] [ForkJoinPool.commonPool-worker-5] I am Cool and am NIMBLE !!!
[2] [ForkJoinPool.commonPool-worker-9] I am Fast and am SLICK !!!
[3] [pool-1-thread-1] I am Quick and am NEW !!!

From Listing.8, notice that we introduce random delays (upto 500ms) in the tasks that make up the pipeline. This will allow for one of the tasks to finish executing first.

The following are some of the concepts in the context of the code in Listing.8:

In the next example, we will demonstrate a pipeline with the ability to choose an output from either of the two asynchronous tasks based on which of the two completes first and apply a function on that output to produce a new result.

Listing.9
package com.polarsparc.cf.CompletableFuture;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Sample09 {
    private static final Random random = new Random();
    
    public static final void randomDelay() {
        try {
            Thread.sleep(random.nextInt(500));
        }
        catch (Exception ex) {
            // Ignore
        }
    }
    
    // ----- Main -----
    
    public static void main(String[] args) {
        {
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
                randomDelay();
                return "I am Awesome";
            });
            CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
                randomDelay();
                return "I am Bold";
            });
            CompletableFuture cf3 = cf1.applyToEither(cf2, msg -> String.format("%s and am Cool !!!", msg))
                .thenAccept(msg -> System.out.printf("[1] [%s] %s\n", Thread.currentThread().getName(), msg));
            
            try {
                cf3.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        {
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
                randomDelay();
                return "I am Elegant";
            });
            CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
                randomDelay();
                return "I am Fast";
            });
            CompletableFuture cf3 = cf1.applyToEitherAsync(cf2, msg -> String.format("%s and am New !!!",
                    msg))
                .thenAcceptAsync(msg -> System.out.printf("[2] [%s] %s\n", Thread.currentThread().getName(), msg));
            
            try {
                cf3.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
        }
        
        {
            ExecutorService executor = Executors.newFixedThreadPool(3);
            
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
                randomDelay();
                return "I am Practical";
            });
            CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> {
                randomDelay();
                return "I am Quick";
            });
            CompletableFuture cf3 = cf1.applyToEitherAsync(cf2, msg -> String.format("%s and am Radical !!!",
                    msg), executor)
                .thenAcceptAsync(msg -> System.out.printf("[3] [%s] %s\n", Thread.currentThread().getName(), msg),
                    executor);
            
            try {
                cf3.get();
            }
            catch (Exception ex) {
                ex.printStackTrace(System.err);
            }
            
            executor.shutdown();
        }
    }
}

Executing the program from Listing.9 will generate the following output:

Output.12

[1] [ForkJoinPool.commonPool-worker-5] I am Bold and am Cool !!!
[2] [ForkJoinPool.commonPool-worker-9] I am Fast and am New !!!
[2] [pool-1-thread-2] I am Practical and am Radical !!!

Re-running the program from Listing.9 will generate the following output:

Output.13

[1] [ForkJoinPool.commonPool-worker-3] I am Awesome and am Cool !!!
[2] [ForkJoinPool.commonPool-worker-5] I am Elegant and am New !!!
[3] [pool-1-thread-2] I am Practical and am Radical !!!

The following are some of the concepts in the context of the code in Listing.9:

Business processes often experience exceptions in one or more stages in the pipeline. In the next example below, we demonstrate how to handle exception cases. The pipeline consists of two asynchronous tasks generating two random numbers, which is consumed by the next task in the pipeline to generate a modulus value. If the modulus value is zero, we thrown an exception.

Listing.10
package com.polarsparc.cf.CompletableFuture;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Sample10 {
    private static final Random random = new Random();
    
    // ----- Main -----
    
    public static void main(String[] args) {
        // No explicit exception handling
        {
            ExecutorService executor = Executors.newFixedThreadPool(4);
            
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> random.nextInt(1000)+1, executor);
            CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> random.nextInt(100)+1, executor);
            CompletableFuture cf3 = cf1.thenCombineAsync(cf2, (n1, n2) -> {
                int ret = n1.intValue() % n2.intValue();
                if (ret <= 0) {
                    throw new RuntimeException(String.format("n1 = %d, n2 = %d => Invalid combination", 
                        n1.intValue(), n2.intValue()));
                }
                return ret;
            }, executor)
            .thenAcceptAsync(n -> System.out.printf("[1] [%s] Magic number is %d\n", Thread.currentThread().getName(), 
                n), executor);
            
            try {
                cf3.get();
            } catch (Exception ex) {
                System.out.printf("[1] EXCEPTION:: %s\n", ex.getMessage());
            }
            
            executor.shutdown();
        }
        
        // Using exceptionally
        {
            ExecutorService executor = Executors.newFixedThreadPool(4);
            
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> random.nextInt(1000)+1, executor);
            CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> random.nextInt(100)+1, executor);
            CompletableFuture cf3 = cf1.thenCombineAsync(cf2, (n1, n2) -> {
                int ret = n1.intValue() % n2.intValue();
                if (ret <= 0) {
                    throw new RuntimeException(String.format("n1 = %d, n2 = %d => Invalid combination", 
                        n1.intValue(), n2.intValue()));
                }
                return ret;
            }, executor)
            .exceptionally(ex -> {
                System.out.printf("[2] ERROR:: %s\n", ex.getMessage());
                return -1;
            })
            .thenAcceptAsync(n -> System.out.printf("[2] [%s] Magic number is %d\n", Thread.currentThread().getName(), 
                n), executor);
            
            try {
                cf3.get();
            } catch (Exception ex) {
                System.out.printf("[2] EXCEPTION:: %s\n", ex.getMessage());
            }
            
            executor.shutdown();
        }
        
        // Using handle (or handleAsync)
        {
            ExecutorService executor = Executors.newFixedThreadPool(4);
            
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> random.nextInt(1000)+1, executor);
            CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> random.nextInt(100)+1, executor);
            CompletableFuture cf3 = cf1.thenCombineAsync(cf2, (n1, n2) -> {
                int ret = n1.intValue() % n2.intValue();
                if (ret <= 0) {
                    throw new RuntimeException(String.format("n1 = %d, n2 = %d => Invalid combination", 
                        n1.intValue(), n2.intValue()));
                }
                return ret;
            }, executor)
            .handle((n, ex) -> {
                if (n != null) {
                    return n;
                } else {
                    System.out.printf("[3] ERROR:: %s\n", ex.getMessage());
                    return -1;
                }
            })
            .thenAcceptAsync(n -> System.out.printf("[3] [%s] Magic number is %d\n", Thread.currentThread().getName(), 
                n), executor);
            
            try {
                cf3.get();
            } catch (Exception ex) {
                System.out.printf("[3] EXCEPTION:: %s\n", ex.getMessage());
            }
            
            executor.shutdown();
        }
        
        // Using whenComplete
        {
            ExecutorService executor = Executors.newFixedThreadPool(4);
            
            CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> random.nextInt(1000)+1, executor);
            CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> random.nextInt(100)+1, executor);
            CompletableFuture cf3 = cf1.thenCombineAsync(cf2, (n1, n2) -> {
                int ret = n1.intValue() % n2.intValue();
                if (ret <= 0) {
                    throw new RuntimeException(String.format("n1 = %d, n2 = %d => Invalid combination", 
                        n1.intValue(), n2.intValue()));
                }
                return ret;
            }, executor)
            .whenComplete((n, ex) -> {
                if (n != null) {
                    System.out.printf("[4] [%s] Magic number is %d\n", Thread.currentThread().getName(), n);
                } else {
                    System.out.printf("[4] ERROR:: %s\n", ex.getMessage());
                }
            });
            
            try {
                cf3.get();
            } catch (Exception ex) {
                System.out.printf("[4] EXCEPTION:: %s\n", ex.getMessage());
            }
            
            executor.shutdown();
        }
    }
}

Executing the program from Listing.10 will generate the following output:

Output.14

[1] [pool-1-thread-4] Magic number is 6
[2] [pool-2-thread-4] Magic number is 10
[3] [pool-3-thread-4] Magic number is 7
[4] [main] Magic number is 11

Re-running the program from Listing.10 a few times will generate the following output:

Output.15

[1] EXCEPTION:: java.lang.RuntimeException: n1 = 99, n2 = 1 => Invalid combination
[2] [pool-2-thread-4] Magic number is 2
[3] [pool-3-thread-4] Magic number is 1
[4] [main] Magic number is 8

When a task (or stage) in a pipeline throws an exception, the subsequent task(s) downstream from the current task will be skipped and the exception is thrown when the get() method is invoked on the CompletableFuture. This is evident from the Output.15 above.

Again, re-running the program from Listing.10 a few times will generate the following output:

Output.16

[1] [pool-1-thread-4] Magic number is 3
[2] ERROR:: java.lang.RuntimeException: n1 = 962, n2 = 37 => Invalid combination
[2] [pool-2-thread-4] Magic number is -1
[3] [pool-3-thread-4] Magic number is 39
[4] [main] Magic number is 31

Once again, re-running the program from Listing.10 a few times will generate the following output:

Output.17

[1] [pool-1-thread-4] Magic number is 12
[2] [pool-2-thread-4] Magic number is 19
[3] [pool-3-thread-4] Magic number is 43
[4] ERROR:: java.lang.RuntimeException: n1 = 27, n2 = 9 => Invalid combination
[4] EXCEPTION:: java.lang.RuntimeException: n1 = 27, n2 = 9 => Invalid combination

The following are some of the concepts in the context of the code in Listing.10:

Just like the other methods we have seen so far on the interface CompletionStage, both the methods handle(BiFunction<T, Throwable, U>) and whenComplete(BiConsumer<T, Throwable>) have their async counterparts, with and without the custom executor.

The following table summarizes the methods we covered in this article from the CompletableFuture class:

Method Usage Description
exceptionally(Function<Throwable, T>) Takes an instance of Function<Throwable, T>. The specified function is executed only when the prior task (or stage) encounters an exception. For normal operation, this method transparently passes along the value from the previous task in the pipeline. This method allows one to swallow exceptions
handle(BiFunction<T, Throwable, U>) Takes an instance of BiFunction<T, Throwable, U>. The specified function in called irrespective of whether or not an exception occurs. One of the two arguments to the specified function will be null based on whether the previous task (or stage) encounters some exception. The specified function must return a value. This method allows one to swallow exceptions
whenComplete(BiConsumer<T, Throwable>) Takes an instance of BiConsumer<T, Throwable>. The specified function in called irrespective of whether or not an exception occurs. One of the two arguments to the specified function will be null based on whether the previous task (or stage) encounters some exception. This method will propagate any thrown exceptions

More to come in Part 4 of this article ...



© PolarSPARC