Blog

Functional Reactive with Core Java - Part 04

By  
Sven Ruppert
·
On Apr 26, 2018 6:00:00 AM
·

What do we want to achieve?

In the last section, we have seen how we can extend the class Optional<T>. The result was the interface
Result<T>, which enabled us to do some additional things.

In this section, we will deal once again with the functions themselves.

In addition to the source text examples given in this article, I will also use the sources from the Open Source project

Functional-Reactive http://www.functional-reactive.org/. The sources are available at https://github.com/functional-reactive/functional-reactive-lib

And everything began with the list

The aim is to define some functions on a set of cars. We name this class as Car and derive it from the class Quint<T1,T2,T3,T4,T5>. The class Quint is taken from the project www.functional-reactive.org. We have used the class Pair in one of the earlier articles. The class Quint is so to say the elder brother. In this case too, we add the functional get-methods.

  public class Car 
        extends Quint<String, 
                      Colour, 
                      Integer, 
                      Integer, 
                      Float> {
    
    public Car(String brand , 
               Colour colour , 
               Integer speed , 
               Integer year , 
               Float price) {
      super(brand , colour , speed , year , price);
    }

    public String brand() {return getT1();}

    public Colour colour() {return getT2();}

    public Integer speed() {return getT3();}

    public Integer year() {return getT4();}

    public Float price() {return getT5();}
  }
  
  public enum Colour {
    UNDEFINED,
    BLUE,
    RED,
    WHITE,
    GREEN
  }

We now generate a set of instances of the class Car and make these available as Stream<Car>.

    final Stream<Car> carsStream = Stream
        .of(
            new Car("BMW" , Colour.BLUE , 200 , 2017 , 10000f) ,
            new Car("BMW" , Colour.GREEN , 215 , 2017 , 10432f) ,
            new Car("BMW" , Colour.UNDEFINED , 200 , 2017 , 10000f) ,
            new Car("VW" , Colour.BLUE , 180 , 2017 , 10000f) ,
            new Car("VW" , Colour.WHITE , 220 , 2017 , 10000f) ,
            new Car("VW" , Colour.RED , 200 , 2017 , 10000f));

The task

We now want to filter this set two times. Firstly, we want the set of BMWs and secondly the set of VWs (there was still the Volkswagen Group when this article was written)

An initial implementation now looks like as follows.

    Stream
        .of(
            new Car("BMW" , Colour.BLUE , 200 , 2017 , 10000f) ,
            new Car("BMW" , Colour.GREEN , 215 , 2017 , 10432f) ,
            new Car("BMW" , Colour.UNDEFINED , 200 , 2017 , 10000f) ,
            new Car("VW" , Colour.BLUE , 180 , 2017 , 10000f) ,
            new Car("VW" , Colour.WHITE , 220 , 2017 , 10000f) ,
            new Car("VW" , Colour.RED , 200 , 2017 , 10000f))
        .filter(c -> c.brand().equals("BMW"))
        .forEach(System.out::println);

    Stream
        .of(
            new Car("BMW" , Colour.BLUE , 200 , 2017 , 10000f) ,
            new Car("BMW" , Colour.GREEN , 215 , 2017 , 10432f) ,
            new Car("BMW" , Colour.UNDEFINED , 200 , 2017 , 10000f) ,
            new Car("VW" , Colour.BLUE , 180 , 2017 , 10000f) ,
            new Car("VW" , Colour.WHITE , 220 , 2017 , 10000f) ,
            new Car("VW" , Colour.RED , 200 , 2017 , 10000f))
        .filter(c -> c.brand().equals("VW"))
        .forEach(System.out::println);

Instead of keeping the respective set as final in a List<Car>, we simply write the elements in the command line.

If you have a look at this source code, you can quickly notice that this solution is everything else but optimal. We now start reformulating every thing a little.

Separate data and stream

Firstly, we separate the data from the stream. A stream can be used only once. Therefore, if we wish to process a workflow twice, a stream must then be generated twice from the
set of data, on which the stream is based.

  //persistence storage
  public static List<Car> cars = Arrays.asList(
      new Car("BMW" , Colour.BLUE , 200 , 2017 , 10000f) ,
      new Car("BMW" , Colour.GREEN , 215 , 2017 , 10432f) ,
      new Car("BMW" , Colour.UNDEFINED , 200 , 2017 , 10000f) ,
      new Car("VW" , Colour.BLUE , 180 , 2017 , 10000f) ,
      new Car("VW" , Colour.WHITE , 220 , 2017 , 10000f) ,
      new Car("VW" , Colour.RED , 200 , 2017 , 10000f)
  );

  //query -> result from persistence storage
  public static Stream<Car> nextCarStream() {
    return cars.stream();
  }

The data is now held only once, or else is fetched from a persistent source. The method nextCarStream() gives us the desired view as stream on the data.

The remaining task can then be shown as follows. The instances of the predicates, which help in filtering the stream, have already been extracted.

    Predicate<Car> carBrandFilterBMW = (car) -> 
             car.brand().equals("BMW");
    Predicate<Car> carBrandFilterVW = (car) ->
             car.brand().equals("VW");

    nextCarStream()
        .filter(carBrandFilterBMW::test)
        .forEach(System.out::println);

    nextCarStream()
        .filter(carBrandFilterVW::test)
        .forEach(System.out::println);

But in this case too, we can attach the remaining components only after generating the stream. And the definition of the filter is still too redundant.

Define the predicates in general

The rows given below for defining the predicates are still too redundant.

    Predicate<Car> carBrandFilterBMW = (car) -> 
             car.brand().equals("BMW");
    Predicate<Car> carBrandFilterVW = (car) ->
             car.brand().equals("VW");

We change this to a function, which generates a corresponding predicate from a string.

    Function<String,Predicate<Car>> brandFilter 
            = (brand) -> (car) -> car.brand().equals(brand);

This makes the usage look quite orderly.

    nextCarStream()
        .filter(brandFilter.apply("BMW"))
        .forEach(System.out::println);

    nextCarStream()
        .filter(brandFilter.apply("VW"))
        .forEach(System.out::println);

define streams without generation

We now have a stream here, which we want to build up gradually. In this case, only with one modification, the filter element. If one defines it directly, one can reach, for instance, the following solution.

  public static BiFunction<Stream<Car>, 
                           String, 
                           Stream<Car>> filteredStream =
      (stream , brand) -> stream.filter(brandFilter.apply(brand));

We are referring here to the function brandfilter defined earlier. However, one can notice here that the definition of the stream need not be done only after the generation. We thus get an option of formulating this element in a generic way and to save it in a separate library. According to this, there is no need to know, how and with which data is the stream really generated.

  public static Function<String, Predicate<Car>> brandFilter =
          (brand) -> (car) -> car.brand().equals(brand);

  public static BiFunction<Stream<Car>, 
                           String, 
                           Stream<Car>> filteredStream =
      (stream , brand) -> stream.filter(brandFilter.apply(brand));

  public static Consumer<Stream<Car>> printStream 
          = (stream) -> stream.forEach(System.out::println);

  public static void main(String[] args) {

    printStream
        .accept(filteredStream.apply(nextCarStream() , "BMW"));
    printStream
        .accept(filteredStream.apply(nextCarStream() , "VW"));
  }

Although we are now one step ahead, but we have still not finished. It is still not very nice at this point, for instance, how the elements are nested and that we are working with a BiFunction. A BiFunction cannot be combined so well with other functions by means of andThen() and combine(). We will discuss both of these.

Manual transformation

What we actually need is a function that generates a filtered stream from the input of a brand (BMW or VW).

Function<String, Function<Stream<Car>, Stream<Car>>

The stream, on which this is to be applied, naturally does not exist as yet; accordingly, the stream itself is an input parameter.

Now this here is the explicit case, which we generate from a string the Predicate<Car>. If one now formulates this in a general way, one gets a signature as given below.

Function<Predicate<T>, Function<Stream<T>, Stream<T>>

We can now make this part available in a generic way.

  public static <T> Function<Predicate<T>, 
                             Function<Stream<T>, 
                                      Stream<T>>> streamFilter(){
    return (filter) 
               -> (Function<Stream<T>, Stream<T>>) inputStream 
                   -> inputStream.filter(filter);
  }

In the usage, it looks like as follows.

    Main.<Car>streamFilter()
        .apply(filter().apply("BMW"))
        .apply(nextCarStream())
        .forEach(printCar());

//or with andThen

    filter()
        .andThen(streamFilter())
        .apply("BMW")
        .apply(nextCarStream())
        .forEach(printCar());

We are now quite ahead and have managed to extract the stateless parts in a generic way. Accordingly, the composition is no longer after the generation of the stream, which is mostly shown and taught. This gives us essentially a higher abstraction and reusability of the existing code sections. But there was still something…

Type definitions

We have now come so far that we can play with some possibilities, which have become available to us since Java 8. This should not mean that one can work functionally with Java only starting with Java 8. It has only become clearer.

But let us have a look at the following. We will now combine exclusively functions of this type Function<Integer, Integer> with one another.

    final Function<Integer, Integer> f1 = (x)-> x * 2;
    final Function<Integer, Integer> f2 = (x)-> x + 2;
    final Function<Integer, Integer> f3 = (x)-> x + 10;

    System.out.println("f1 f2 f3 (andThen) => " 
                       + f1.andThen(f2).andThen(f3).apply(1));

    System.out.println("f1 f2 f3 (compose) => " 
                       + f1.compose(f2).compose(f3).apply(1));

This one here works without any problem. A combination works in both directions, which means andThen() and compose().

We will now try and write this in each other.

    final Function<Integer, Integer> f1 = (x)-> x * 2;
    
        final Function<Integer, Integer>  fAndThen 
            = f1
                .andThen((x)-> x + 2)
                .andThen((x)-> x + 10);

This also works with the connection done using andThen(). How does it now look with compose()?

final Function<Integer, Integer>  fCompose 
    = f1
        .compose((x)-> x + 2).  //Operator + cannot be applied to j.l.Object
        .compose((x)-> x + 10);

OK, so this does not work like this. In other words, the type cannot be determined correctly, for which reason the compiler assumes Object. And just this goes wrong. What can now be changed?

The only remedy here is that the type is specified explicitly. This then leads to the following source code.

    final Function<Integer, Integer>  fCompose = f1
        .compose((Function<Integer, Integer>) (x)-> x + 2)
        .compose((x)-> x + 10);

Please note that the type needs to be mentioned only once in our example. The reason for this is that at the end of the type definition the variables are specified through declaration. However, we have to specify the type twice, if we use the combination as argument i.e., without the final variable declaration, as in the example below.

    System.out.println("f1 f2 f3 (compose) => " + f1
        .compose((Function<Integer, Integer>)(x)-> x + 2)
        .compose((Function<Integer, Integer>)(x)-> x + 10)
        .apply(1));

For the sake of clarity; How does it look like as parameter of a method, which has a matching type?

static void use(Function<Integer, Integer> f){
    System.out.println("f.apply(1) = " + f.apply(1));
}

    use(f1.compose((Function<Integer, Integer>)(x)-> x + 2)
          .compose((x)-> x + 10));

It suffices here again as in the case of declared variables. Therefore, at the other end, sufficient type definition is available.

Why actually the difference between the usage by means of andThen() and compose? The reason is quite simple, if one considers that andThen() sets the following and the existing function. The type is present here. In the case of compose, the function is extended on the front, which means that the type is not fixed in all cases.

and one more way.

Another way can also be taken for making available the necessary type information.

To do this, we define a method, which maps a compose for us.

static <T, U, V> 
       Function<
           Function<U, V>, 
           Function<Function<T, U>, Function<T, V>>> compose() {
    return (Function<U, V> f) 
        -> (Function<T, U> g) 
            -> (T x) -> f.apply(g.apply(x));
  }

Not only the process of compose is mapped here step by step, but also all the necessary type information has been defined.

In the usage, it looks like as follows.

    Function<Integer, Integer> fx = Main.<Integer, Integer, Integer>compose()
        .apply((x)-> x + 2)
        .apply((x)-> x * 2)

A function, which returns a function

At the start, we have always acted in a way that we have transformed the input parameters in a different value. But there is nothing against the fact that the return value is also a function.

Let us have a look at the following example. Firstly, from the OO-world, then we will change it a little.

We now want to implement different operators. The input will be a character, which will be converted in an operator. For instance, a + will be converted to the operator Add.

To do this, we define at first an interface and thereafter the different implementations.

  public static interface Operator {
    public Float work(Integer a , Integer b);
  }

  public static final class Add implements Operator {
    @Override
    public Float work(Integer a , Integer b) {
      return (float) (a + b);
    }
  }

  public static final class Sub implements Operator {
    @Override
    public Float work(Integer a , Integer b) {
      return (float) (a - b);
    }
  }

  public static final class Mult implements Operator {
    @Override
    public Float work(Integer a , Integer b) {
      return (float) (a * b);
    }
  }

  public static final class Divide implements Operator {
    @Override
    public Float work(Integer a , Integer b) {
      return (float) (a / b);
    }
  }

We will now have a look at the following implementation for making a decision.

  public static Operator operator(String op) {
    Operator result;
    switch (op) {
      case "+": result = new Add(); break;
      case "-": result = new Sub(); break;
      case "/": result = new Divide();break;
      case "*": result = new Mult(); break;
      default: throw new RuntimeException("bad op");
    }
    return result;
  }

I have intentionally omitted here some portions of the code, in order to make the implementation more robust with the aim of keeping the example short.

The usage is quite straightforward; we will select and use only one operator. final Float result = operator("+").work(1 , 2);

There is nothing against a multiple use of the operator, if one can assume that the operator does not have any internal state, which affects the following uses.

Let us now start with the transformation.

Refactoring Step 1

Let us define first what we want to achieve. The operator is a function, which gets two Integer values and returns a Float value. We can write this quite easily as function with two input parameters. BiFunction<Integer, Integer,Float>. With this, we first of all replace the implementations.

  public static BiFunction<Integer, Integer, Float> add = (a , b) -> (float) (a + b);
  public static BiFunction<Integer, Integer, Float> sub = (a , b) -> (float) (a - b);
  public static BiFunction<Integer, Integer, Float> div = (a , b) -> (float) (a / b);
  public static BiFunction<Integer, Integer, Float> mul = (a , b) -> (float) (a * b);

  static BiFunction<Integer, Integer, Float> operator(String operator) {
    BiFunction<Integer, Integer, Float> fkt = null;
    switch (operator) {
      case "+":
        fkt = add;
        break;
      case "-":
        fkt = sub;
        break;
      case "/":
        fkt = div;
        break;
      case "*":
        fkt = mul;
        break;
      default:
        throw new RuntimeException("bad op !!");
    }
    return fkt;
  }

Refactoring Step 2

In the last step, we have replaced the implementations; however, one can also include the same in the method operator(String operator).

  static Function<String, BiFunction<Integer, Integer, Float>> operator() {
    return (operator) -> {
      BiFunction<Integer, Integer, Float> fkt = null;
      switch (operator) {
        case "+": fkt = (a , b) -> (float) (a + b); break;
        case "-": fkt = (a , b) -> (float) (a - b); break;
        case "/": fkt = (a , b) -> (float) (a / b); break;
        case "*": fkt = (a , b) -> (float) (a * b); break;
        default:
          throw new RuntimeException("bad op !!");
      }
      return fkt;
    };
  }

The implementation, which we achieved in the last step, still contains the case that an exception can be thrown. We could remove this, for instance, by means of an Optional<T> or Result<T>.

  static Function<String, 
                  Result<BiFunction<Integer, Integer, Float>>> operator() {
    return (operator) ->
        (operator.equals("+")) ? Result.success((a , b) -> (float) (a + b)) 
        : (operator.equals("-")) ? Result.success((a , b) -> (float) (a - b))
        : (operator.equals("/")) ? Result.success((a , b) -> (float) (a / b))
        : (operator.equals("*")) ? Result.success((a , b) -> (float) (a * b))
        : Result.failure("bad op !!");
  }


  public static void main(String[] args) {
    operator()
        .apply("+")
        .ifPresentOrElse(
            success -> System.out.println("result = " + success) ,
            failure -> System.out.println(failure)
        );
  }

Summary

In this part, we have learned how to breakdown tasks in individual functions. In doing so, we have seen how the type information behaves in the combination of functions and and also the different ways of handling this.

One of the main points was also the difference present while defining the streams and the method of separating the definition from the generation of streams.

You can find the source code at:

https://github.com/Java-Publications/functional-reactive-with-core-java-004.git

If you have questions or comments, simply contact me at sven@vaadin.com or via Twitter @SvenRuppert.

Happy coding!
Sven Ruppert
Sven Ruppert has been coding Java since 1996 and is working as Developer Advocate at Vaadin. He is regularly speaking at Conferences like JavaOne/Jfokus/Devoxx/JavaZone/JavaLand and many more and contributes to IT periodicals, as well as tech portals.
Other posts by Sven Ruppert