Blog

Functional Reactive with Core Java - Part 10

By  
Sven Ruppert
·
On Jun 7, 2018 6:00:00 AM
·

What do we want to achieve?

In the last part of this series, we have seen how work packets can be distributed by means of CompletableFuture<T> over different resource pools and how this task can be formulated as chain of asynchronous calls.

In this part, we will deal with how to formulate the chain in a more compact way and what are the possibilities available for control.

The task

We will begin with the trivial task of converting a string in an integer value. There are some typical pitfalls here, although the process itself is very simple and has already been solved within the JDK.

The starting situation used here is defined has follows and we always have at the back of the mind that both the functions act here as representative somewhat more elaborate work packets.

final String input = .... // some String
final Integer value = Integer.parseInt(input);

However, there can be a lot of failures even in this simple example. Let us assume that the string can either be zero or non-zero. Or else the content of the delivered string does not represent any number, or is a number that does not fit in the value range of an integer.

Therefore, the short version is that errors can occur, which lead to exceptions.

In addition to the source text examples given in this article, I will also use the sources from the Open Source project

Functional-Reactive http://www.functional-reactive.org/. The sources are available at https://github.com/functional-reactive/functional-reactive-lib

Divide and distribute

First of all, we will divide the steps again and generate from them individual instances of the class completableFuture.

    final Supplier<String> nextStringValue = () -> valueOf(new Random().nextInt(10));
    final CompletableFuture<String> step01 = CompletableFuture.supplyAsync(nextStringValue);
    
    final CompletableFuture<Integer> step02 = supplyAsync(nextStringValue)
        .thenComposeAsync(new Function<String, CompletionStage<Integer>>() {
          @Override
          public CompletionStage<Integer> apply(String s) {
            return CompletableFuture.completedFuture(Integer.parseInt(s));
          }
        });

A new method is used in this case. This implies the generation of an instance of the type CompletableFuture<T>, in which the result is already fixed. This means the method CompletableFuture.completedFuture(Integer.parseInt(s)).

Accordingly, nothing more needs to be done in this example; only the individual values are packed. Naturally, one can also write it in a more compact way.

    final CompletableFuture<Integer> step02 = supplyAsync(nextStringValue)
        .thenComposeAsync(s -> completedFuture(Integer.parseInt(s)));

How does one respond to an exception?

handleAsync

Handling an exception that occurs is defined in this case with the method handleAsync. One can define two things here. The first is the positive case that a result will be present and the second case is the processing of the exception thrown.

    final CompletableFuture<String> step02 = supplyAsync(nextStringValue)
        .thenComposeAsync(s -> completedFuture(Integer.parseInt(s)))
        .handleAsync(new BiFunction<Integer, Throwable, String>() {
          @Override
          public String apply(Integer value , Throwable throwable) {
            return (throwable == null)
                   ? "all is ok, value is " + value
                   : throwable.getMessage();
          }
        });

It can also be written in a more compact way by using Lambda constructs.

    final CompletableFuture<String> step02 = supplyAsync(nextStringValue)
        .thenComposeAsync(s -> completedFuture(Integer.parseInt(s)))
        .handleAsync((value , throwable) -> (throwable == null)
                                            ? "all is ok, value is " + value
                                            : throwable.getMessage());

In this case, the differentiation is implemented by means of BiFunction. This means that there must be a return value. Now we have come again to a point, where the use of the class Optional<T> or also the optional enhancement Result<T> makes sense.

    final CompletableFuture<Result<Integer>> step02 = supplyAsync(nextStringValue)
        .thenComposeAsync(s -> completedFuture(Integer.parseInt(s)))
        .handleAsync((value , throwable) -> (throwable == null)
                                            ? Result.success(value)
                                            : Result.failure(throwable.getMessage()));

CheckedFunction again

This case differentiation can also be mapped with the CheckedFunction presented earlier. Thus, the case differentiation is already defined and looks like as follows.

    final CompletableFuture<Result<Integer>> step02 = supplyAsync(nextStringValue)
        .thenComposeAsync(s -> completedFuture(((CheckedFunction<String, Integer>) Integer::parseInt).apply(s)));

After one has reached this point, next comes the challenge, how more than one processing stages can be combined. The output has now been packed in a result. Similarly, it is also possible here to shorten the unnecessary steps, since most of the errors in a stage make the remaining, subsequent processing stages unnecessary.

chain of thenComposeAsync

Next, the processing sequence is extended by one more stage. A message is to be generated in case of success. In case of an error, a more or less meaningful message is created and returned.

    final CompletableFuture<Result<String>> result = supplyAsync(nextStringValue)
        .thenComposeAsync(input -> completedFuture(((CheckedFunction<String, Integer>) Integer::parseInt).apply(input)))
        .thenComposeAsync(input -> completedFuture(input.isPresent()
                                                   ? input.map(integer -> "Result is now " + integer)
                                                   : Result.<String>failure("Value was not available")));

Here, a reduction by a stage of the type Resultis present implicitly. In case of success, a new instance of the type Result is generated with the method map. It is exactly the same with the error message. This can be written more clearly, in order to illustrate what exactly happens.

    final BiFunction<Result<Integer>, Function<Integer, Result<String>>, Result<String>> flatMap
        = (input , funct) -> (input.isPresent())
                             ? funct.apply(input.get())
                             : input.asFailure();

In our case. A method flatMap exists in the class Optional as well as in the class Result. This solves the problem at this point.

Within the method flatMap naturally an exception can also be processed.

         supplyAsync(nextStringValue)
            .thenComposeAsync(input -> completedFuture(((CheckedFunction<String, Integer>) Integer::parseInt).apply(input)))
            .thenComposeAsync(input -> completedFuture(input.flatMap((CheckedFunction<Integer, String>) integer -> "Result is now " + integer)))

The source codes for this part also contain a small comfort function, with which the result of the instances of the type CompletableFuture generated here can be output on the command line.

    Consumer<CompletableFuture<Result<?>>> print = (cf) -> cf
        .whenCompleteAsync((result , throwable) -> {
          if (throwable == null)
            result.ifPresentOrElse(
                System.out::println ,
                (Consumer<String>) System.out::println
            );
          else System.out.println("throwable = " + throwable);
        })
        .join();

The call now looks like as follows.

    print.accept(
        supplyAsync(() -> "oh no")
            .thenComposeAsync(input -> completedFuture(((CheckedFunction<String, Integer>) Integer::parseInt).apply(input)))
            .thenComposeAsync(input -> completedFuture(input.flatMap((CheckedFunction<Integer, String>) integer -> "Result is now " + integer)))
    );

Chain of Functions

If one looks at the examples shown till now, one can easily notice that the effort for writing is quite high, the more the functions are nested one after the other. Similarly, it can also be relevant that a number of functions not defined earlier are nested one after the other in a processing chain.

Let us simply define three functions, all of them at present with the same input and output type.

    final Function<String, String> step1 = (input) -> input.toUpperCase();
    final Function<String, String> step2 = (input) -> input + " next A";
    final Function<String, String> step3 = (input) -> input + " next B";

If now nests them one after the other in the classical way, one gets the following function.

    final String hello = step1
        .andThen(step2)
        .andThen(step3)
        .apply("hello"); // blocking call

This is now converted in a chain of non-blocking calls. What should suffice at this point is actually a tri-function, which returns one function based on the three functions, which gets an input value and returns a CompletableFuture. With this, one has achieved something quite interesting. A chain can now be built based on several instances of the type CompletableFuture, without letting it come into action.

    TriFunction<
        Function<String, String>,
        Function<String, String>,
        Function<String, String>,
        Function<String, CompletableFuture<String>>> inputTriA = (f1 , f2 , f3) -> {
      return (value) -> {
        final CompletableFuture<String> result1 = supplyAsync(() -> f1.apply(value));
        final CompletableFuture<String> result2 = result1.thenComposeAsync(v -> supplyAsync(() -> f2.apply(v)));
        final CompletableFuture<String> result3 = result2.thenComposeAsync(v -> supplyAsync(() -> f3.apply(v)));
        return result3;
      };
    };

One can naturally also write the example again in a more compact way.

    TriFunction<
        Function<String, String>,
        Function<String, String>,
        Function<String, String>,
        Function<String, CompletableFuture<String>>> inputTriB
        = (f1 , f2 , f3)
        -> (value)
        ->
        CompletableFuture
            .completedFuture(value)
            .thenComposeAsync(v -> supplyAsync(() -> f1.apply(v)))
            .thenComposeAsync(v -> supplyAsync(() -> f2.apply(v)))
            .thenComposeAsync(v -> supplyAsync(() -> f3.apply(v)));

… and subsequently define in general.

  public <A, B, C, D> TriFunction<
      Function<A, B>, 
      Function<B, C>,
      Function<C, D>,
      Function<A, CompletableFuture<D>>> genericTriFunction() {

    return (f1 , f2 , f3)
        -> (value)
        ->
        CompletableFuture
            .completedFuture(value)
            .thenComposeAsync(v -> supplyAsync(() -> f1.apply(v)))
            .thenComposeAsync(v -> supplyAsync(() -> f2.apply(v)))
            .thenComposeAsync(v -> supplyAsync(() -> f3.apply(v)));
  }

If one now transforms this function in a curried function, one gets the possibility of specifying the individual functions one after the other. In this way, we come closer to specifying an arbitrary number of functions for the desired behavior.

To remind, the result looks like as follows for a simple example.

    TriFunction<String, String, String,Integer> triDemo 
        = (s1,s2,s3)->{return -1;};
    
    final Function<String, Function<String, Function<String, Integer>>> 
    triDemoCurried 
        = Transformations.<String, String, String, Integer>curryTriFunction().apply(triDemo);
    
    final Integer i = triDemoCurried.apply("A").apply("B").apply("C");

However, if one looks at the transformed function based on the instances of the type CompletableFuture, one quickly comes to the conclusion that the source text is not particularly easy to maintain.

     final Function<
            Function<String, String>, 
            Function<Function<String, String>, 
                     Function<Function<String, String>, 
                              Function<String, CompletableFuture<String>>>>> apply
        = Transformations.<Function<String, String>, 
                           Function<String, String>, 
                           Function<String, String>, 
                           Function<String, CompletableFuture<String>>>
                           curryTriFunction()
                           .apply(inputTriA);

    final Function<String, 
                   CompletableFuture<String>> resultCF 
                   = apply
                          .apply(step1)
                          .apply(step2)
                          .apply(step3);
    
    final CompletableFuture<String> cf = resultCF.apply("hello World");

Therefore, there must be another way of achieving this aim. Let us go back one step and take the manual version as the starting point.

    //manual
    Function<String, CompletableFuture<String>> f = (value) ->
        CompletableFuture
            .completedFuture(value)
            .thenComposeAsync(v -> supplyAsync(() -> step1.apply(v)))
            .thenComposeAsync(v -> supplyAsync(() -> step2.apply(v)))
            .thenComposeAsync(v -> supplyAsync(() -> step3.apply(v)));

The aim to be achieved is that the portion with composeAsync is defined without starting the execution. Therefore, the start value is not present.

The aim should also be to make available a function of the type, for which the value to be processed is specified and gets an instance of CompletableFuture as the return value.

The number of processing steps should not be defined as fixed beforehand; only when the chain is being built up, the functions are made available gradually.

Let us again defined three functions so that the type of result changes in case of each processing stage.

    final Function<String, Integer> step1A = Integer::parseInt;
    final Function<Integer, String> step2A = (input) -> input + " next A";
    final Function<String, Pair<String, Integer>> step3A = (input) -> new Pair<>(input , input.length());

Manually, the result is as follows.

    Function<String, CompletableFuture<Pair<String, Integer>>> fB = (value) ->
        CompletableFuture
            .completedFuture(value)
            .thenComposeAsync(v -> supplyAsync(() -> step1A.apply(v)))
            .thenComposeAsync(v -> supplyAsync(() -> step2A.apply(v)))
            .thenComposeAsync(v -> supplyAsync(() -> step3A.apply(v)));

If one now extracts the portion with thenComposeAsync and formulates it generically, then the following method becomes a possible result. In abstract terms, the following happens here. Take the function (Function<T, CompletableFuture<R>>) from the step n-1 and attach the same to (supplyAsync(() -> nextTransformation.apply(v))) there. The instance of the class CFQ is the frame around everything; we will come back to this shortly.

    private Function<T, CompletableFuture<R>> resultFunction;

    public <N> CFQ<T, N> thenCombineAsync(Function<R, N> nextTransformation) {
      final Function<T, CompletableFuture<N>> f = this.resultFunction
          .andThen(before -> before.thenComposeAsync(v -> supplyAsync(() -> nextTransformation.apply(v))));
      return new CFQ<>(f);
    }

The initial step is now still missing. We define here a function, which generates from a normal Function<T,R> a function of the type Function<T, CompletableFuture<R>> for us.

    public static <T, R> CFQ<T, R> define(Function<T, R> transformation) {
      return new CFQ<>(t -> CompletableFuture.completedFuture(transformation.apply(t)));
    }

We now have all the steps together for transforming a chain of random functions in a chain of work steps. I have left out at this point that each function can also be processed in a defined resource. This would be only one more parameter of the type Executor.

The functions, which can be specified, must naturally also fulfill the criterion that the output type of the function n-1 is the same as the input type of the function n.

  public static class CFQ<T, R> {

    private Function<T, CompletableFuture<R>> resultFunction;

    private CFQ(Function<T, CompletableFuture<R>> resultFunction) {
      this.resultFunction = resultFunction;
    }

    public static <T, R> CFQ<T, R> define(Function<T, R> transformation) {
      return new CFQ<>(t -> CompletableFuture.completedFuture(transformation.apply(t)));
    }

    public <N> CFQ<T, N> thenCombineAsync(Function<R, N> nextTransformation) {
      final Function<T, CompletableFuture<N>> f = this.resultFunction
          .andThen(before -> before.thenComposeAsync(v -> supplyAsync(() -> nextTransformation.apply(v))));
      return new CFQ<>(f);
    }

    public Function<T, CompletableFuture<R>> resultFunction() {
      return this.resultFunction;
    }
  }

In the usage, it looks like as follows.

    // functions die chain
    final Function<String, Integer> step1A = Integer::parseInt;
    final Function<Integer, String> step2A = (input) -> input + " next A";
    final Function<String, Pair<String, Integer>> step3A = (input) -> new Pair<>(input , input.length());

    //transformation
    final Function<String, CompletableFuture<Pair<String, Integer>>> f = CFQ
        .define(step1A)
        .thenCombineAsync(step2A)
        .thenCombineAsync(step3A)
        .resultFunction();
    
    //usage - activate
    final CompletableFuture<Pair<String, Integer>> cf = f.apply("hello");

    //usage - get result
    final String hello = cf
        .join()
        .getT1();

Summary

The class CFQ enables one to interconnect a random number of functions in a chain. Each step can be outsourced in its own resource pool and does not block the execution at that point.

Similarly, such chains can be defined without that the processing must start directly.

The effort for formulating this is considerably reduced as compared to the manual version and is also type-safe.

The class CFQ is present in the project https://github.com/functional-reactive/functional-reactive-lib under the name CompletableFutureQueue.

You can find the source code at:

https://github.com/Java-Publications/functional-reactive-with-core-java-010.git

If you have questions or comments, simply contact me at sven@vaadin.com or via Twitter @SvenRuppert.

Happy coding!
Sven Ruppert
Sven Ruppert has been coding Java since 1996 and is working as Developer Advocate at Vaadin. He is regularly speaking at Conferences like JavaOne/Jfokus/Devoxx/JavaZone/JavaLand and many more and contributes to IT periodicals, as well as tech portals.
Other posts by Sven Ruppert