Blog

Functional Reactive with Core Java - Part 08

By  
Sven Ruppert
·
On May 24, 2018 6:00:00 AM
·

What do we want to achieve?

In the previous sections, we focused on the functional approaches. We will now start discussing the reactive approaches, in order then to combine these with the functional approaches.

In addition to the source text examples given in this article, I will also use the sources from the Open Source project

Functional-Reactive http://www.functional-reactive.org/. The sources are available at https://github.com/functional-reactive/functional-reactive-lib

Reactive with Core Java

We will now discuss the reactive part. In simple words, one can say that these are all possible forms of the observer pattern. (Exactly at this point a lot of readers will exclaim, who have worked already with these). But let us have a look at these in a very simple way.

The basic principle here is that the caller does not wait till the result has been calculated. He will formulate the task and transfer to the next level. When this is done, the next level will respond.

In the first step, therefore, it means that I will not start the call myself - it will get started. And here we are already at the observer pattern. The processing unit reports to the producers of information units. If an information unit is ready for processing, the listeners (the set can also contain zero elements) or one listener is called explicitly with this value. This, in turn, processes its result in the same way and also outputs it. And we already get a randomly long chain of operators, which calculate a result building upon one another.

Simple observer pattern

First of all, let us implement a very simple version of the observer pattern. This is nothing else but a wrapper around a Map<KEY, Consumer<VALUE>>. The map itself is implemented by means of ConcurrentHashMap so that we do not get any concurrency problems at least at this point. But we cannot foresee, how many different threads will generate events at what points of time. Does this suffice? We will discuss this later. A consumer can register under a key and can also delete it again, as soon as it no longer wishes to receive events.

If an event is generated and transferred to the observer, this event will be sent to all the registered consumers. It is up to the consumer what to do with it. But now comes the first problem here. An event i.e., the data, or better, the data structure is sent to all consumers. This implies that the same instance is sent to all consumers. It is quietly assumed here that the respective consumer does not modify the contents of the event. A very bold assumption.

We now come to the implementation itself. This is still very simple at present and does not need any explanation.

  public class Observable<KEY, VALUE> {

    private final Map<KEY, Consumer<VALUE>> listeners = new ConcurrentHashMap<>();

    public void register(KEY key , Consumer<VALUE> listener) {
      listeners.put(key , listener);
    }

    public void unregister(KEY key) { listeners.remove(key);}

    public void sendEvent(VALUE event) {
      listeners.values().forEach(c -> c.accept(event));
    }
  }

***Listing: V001 ***

Given below is a sample use of our consumer.

  public static void main(String[] args) {

    final Observable<String, String> observable = new Observable<>();

    observable.register("key1", System.out::println);
    observable.register("key2", System.out::println);

    observable.sendEvent("Hello World");

    observable.unregister("key1");
    observable.sendEvent("Hello World again");
  }

***Listing: V001 ***

The related output is:

Hello World
Hello World
Hello World again

Memory Leaks with Listeners

Even this simple implementation causes again and again the
memory leaks to be produced. OK, memory leaks and Java? Of course, it works!

We have here a very typical use case. Such observers are often take to build up a type of event service. To do this, the observer is made available as static. The listeners can now be registered in the program from all possible points.

  public class Registry {
    private static final Observable<String, String> observable = new Observable<>();

    public static void register(String key, Consumer<String> consumer){
      observable.register(key, consumer);
    }

    public static void unregister(String key){
      observable.unregister(key);
    }

    public static void sendEvent(String input){
      observable.sendEvent(input);
    }
  }

Nothing much has changed in the usage.

  public static void main(String[] args) {
    Registry.register("key1" , System.out::println);
    Registry.register("key2" , System.out::println);

    Registry.sendEvent("Hello World");

    Registry.unregister("key1");
    Registry.sendEvent("Hello World again");
  }

What now unfortunately happens again and again that consumers are registered in a program, which are never deleted again. This can firstly cause that the observer itself simply overflows, and secondly it can also prevent that certain parts of the application are registered by the GC.

Let us take, for instance, a web application, which uses in the consumer a graphical unit, such as an instance of a label. If the session is now closed, then this unit unfortunately still remains in the consumer and hence also in the observer. As long as this is not cleared or can or may become a victim of the GC, it can happen that all data contained indirectly are managed via the label. What is now missing is a comfortable way for the developer to let all this be done automatically.

Solution with call back

A possible solution can be that the elements, which interact with the observer, have a life cycle. By this I do not mean the method finalize. This should not be used as long as possible.

The web application will again be take here as an example. If the components have something like an attach() and detach(), then one can use these to become free of the observer again.

To do this, we define an interface Registration, which uses the registered method as the return value. The release of the connection is already stored here.

  public static interface Registration {
    public void remove();
  }

  public class Observable<KEY, VALUE> {

    private final Map<KEY, Consumer<VALUE>> listeners = new ConcurrentHashMap<>();

    public Registration register(KEY key , Consumer<VALUE> listener) {
      listeners.put(key , listener);

      return () -> listeners.remove(key);
    }

    public void sendEvent(VALUE event) {
      listeners.values().forEach(c -> c.accept(event));
    }
  }

As in the case of the observer, there is now no longer any method for deleting an entry explicitly.

  public class Registry {

    private static final Observable<String, String> observable = new Observable<>();

    public static Registration register(String key , Consumer<String> consumer) {
      return observable.register(key , consumer);
    }

    public static void sendEvent(String input) {
      observable.sendEvent(input);
    }
  }

The usage now changes to the extent that the process of removing the connection is started on the registration itself. This method should then be called directly or indirectly by the part of the application,
which takes care of the life cycle of this element.

  public static void main(String[] args) {
    final Registration registerA = Registry.register("key1" , System.out::println);
    final Registration registerB = Registry.register("key2" , System.out::println);
    Registry.sendEvent("Hello World");

    //done by life cycle
    registerA.remove();

    Registry.sendEvent("Hello World again");
  }

More than an observer

We now come back to the observer and try to combine several of these. Everything is quite simple, if only one observer is used. But how does it look like, if several steps are built upon one another? In this case, only the inputs and the outputs of the observer are connected with one another.

As an example, the following class with two methods is used. The

  public static class Worker {

    public String doWork(String input) {
      return input.toUpperCase();
    }

    public String[] split(String input) {
      return input.split(" ");
    }
  }

If these steps are now to be executed one after the other, one can write these as follows.

    final Worker worker = new Worker();
    final String[] split = worker.split(worker.doWork("Hello World"));
    System.out.println("result = " + Arrays.asList(split));

If one now wants to join these by means of observers, then in the current solution, an observer must be defined for each intermediate stage. There are three pieces here. The workflow is quite simple in this example.

  • call the method worker.doWork(s)
  • with the result of the first call, call the method worker.split(..).
  • the final result should now be available.

In order now to call the first method, one can imagine the following call. observableA.sendEvent("Hello World "); The data are processed with this, but beforehand the work step must have been registered. And now we come to the first point, which needs a little attention. We do not get any result, when an observer processes a result. To be able to map this now, we work indirectly on the result. observableB.sendEvent(worker.doWork(s)).

Both together now result in the first observer.

    final Observable<String, String> observableA = new Observable<>();
    final Observable<String, String> observableB = new Observable<>();
    observableA.register("A" , s -> observableB.sendEvent(worker.doWork(s)));

Still nothing useful is now happening inside the second observer. In this case too, a processing step must be registered. observableB.register("B" , s -> observableC.sendEvent(worker.split(s))); Here too, work is done indirectly on the result. And so we come to our third and, for the time being, the last observer. observableC.register("C" , strings -> results.addAll(Arrays.asList(strings)));

We now come to a point, where we must take out the result from this chain in any form. In this case, we work on a data structure, which has been defined externally. List<String>

Everything together now looks like as follows.

    final List<String> results = new ArrayList<>();
    
    final Observable<String, String> observableA = new Observable<>();
    final Observable<String, String> observableB = new Observable<>();
    final Observable<String, String[]> observableC = new Observable<>();

    observableA.register("A" , s -> observableB.sendEvent(worker.doWork(s)));
    observableB.register("B" , s -> observableC.sendEvent(worker.split(s)));
    observableC.register("C" , strings -> results.addAll(Arrays.asList(strings)));

    observableA.sendEvent("Hello World ");

    System.out.println("results = " + results);

So far, we still have the same result as before. We have still not gotten any real advantage. It has become rather more difficult, because we have defined the processing chain from back to front.

Independent steps

Till now, the chain is processed by an instance of the class Worker. If now more than one thread send events, it can easily cause concurrency problems here. The first step in this direction, which remains at our disposal, is the use of more than one instance of the class Worker.

    final List<String> results = new ArrayList<>();

    final Observable<String, String> observableA = new Observable<>();
    final Observable<String, String> observableB = new Observable<>();
    final Observable<String, String[]> observableC = new Observable<>();

    observableA.register("A" , s -> observableB.sendEvent(new Worker().doWork(s)));
    observableB.register("B" , s -> observableC.sendEvent(new Worker().split(s)));
    observableC.register("C" , strings -> results.addAll(Arrays.asList(strings)));

    observableA.sendEvent("Hello World ");

delete the states

The next step completely deletes, first of all, the instance of the class Worker including the implementation and replaces each step by the method implementation itself. With this, we have further reduced the possibly available states.

    final List<String> results = new ArrayList<>();

    final Observable<String, String> observableA = new Observable<>();
    final Observable<String, String> observableB = new Observable<>();
    final Observable<String, String[]> observableC = new Observable<>();

    observableA.register("A" , s -> observableB.sendEvent(s.toUpperCase()));
    observableB.register("B" , s -> observableC.sendEvent(s.split(" ")));
    observableC.register("C" , strings -> results.addAll(Arrays.asList(strings)));

    observableA.sendEvent("Hello World ");

    System.out.println("results = " + results);

One can already observe here that the use of functions is being forced.

from list to tree

Advantages start appearing only when one wants to take several paths instead of a linear list of processing sequences. We will now build a tree. Let us assume that we do not want to have everything only in upper case letters, but instead also a version consisting only of lower case letters. To achieve this, one can now attach one more observer.

    final List<String> results = new ArrayList<>();

    final Observable<String, String> observableA = new Observable<>();
    final Observable<String, String> observableB = new Observable<>();
    final Observable<String, String[]> observableC = new Observable<>();

    observableA.register("A1" , s -> observableB.sendEvent(s.toUpperCase()));
    observableA.register("A2" , s -> observableB.sendEvent(s.toLowerCase()));
    observableB.register("B" , s -> observableC.sendEvent(s.split(" ")));
    observableC.register("C" , strings -> results.addAll(Arrays.asList(strings)));

    observableA.sendEvent("Hello World ");

    System.out.println("results = " + results);

One can now extend this at all the possible places. For instance, one can also map the terminal operation, which outputs the result on the command line, by means of an observer.

    final List<String> results = new ArrayList<>();

    final Observable<String, String> observableA = new Observable<>();
    final Observable<String, String> observableB = new Observable<>();
    final Observable<String, String[]> observableC = new Observable<>();

    observableA.register("A1" , s -> observableB.sendEvent(s.toUpperCase()));
    observableA.register("A2" , s -> observableB.sendEvent(s.toLowerCase()));
    observableB.register("B" , s -> observableC.sendEvent(s.split(" ")));
    observableC.register("C1" , strings -> results.addAll(Arrays.asList(strings)));
    observableC.register("C2" , strings -> System.out.println("From C2 " + Arrays.asList(strings)));

    observableA.sendEvent("Hello World ");

    System.out.println("results = " + results);

ConcurrentHashSet

In this step, we will replace the map in the observer by a set. To be noted here is the
method of generating a ConcurrentHashSet. In my opinion, this version is to be preferred over the synchronized version.

  public static class Observable<VALUE> {

    private final Set<Consumer<VALUE>> listeners = ConcurrentHashMap.newKeySet();

    public Registration register(Consumer<VALUE> listener) {
      listeners.add(listener);

      return () -> listeners.remove(listener);
    }

    public void sendEvent(VALUE event) {
      listeners.forEach(c -> c.accept(event));
    }

This also simplifies the implementations in the usage.

    final List<String> results = new ArrayList<>();

    final Observable<String> observableA = new Observable<>();
    final Observable<String> observableB = new Observable<>();
    final Observable<String[]> observableC = new Observable<>();

    observableA.register(s -> observableB.sendEvent(s.toUpperCase()));
    observableA.register(s -> observableB.sendEvent(s.toLowerCase()));
    observableB.register(s -> observableC.sendEvent(s.split(" ")));
    observableC.register(strings -> results.addAll(Arrays.asList(strings)));
    observableC.register(strings -> System.out.println("From C2 " + Arrays.asList(strings)));

    observableA.sendEvent("Hello World ");

    System.out.println("results = " + results);

Summary

A few questions arise at this point.

  • How can one deregister complete sub-trees without realizing everything manually?
  • What is the way to reach the result of a processing stage?
  • How can scaling be done here?
  • Is an asynchronous processing possible?
  • and many more.

One quickly notices that it can mean a lot of effort, if one needs more than just a simple observer. It also becomes clear that even in case of small tasks the task chains are formulated, but which can become very unclear. For instance, it is inconvenient here that the process must rather be defined from back to front.

But still one can create very efficient and scalable applications with the reactive approaches. It can also be seen this example here that the functional aspects can be used very well. It also applies here to enjoy the states with caution and should be avoided.

In the next sections, we will discuss this subject in extensive detail and establish, in addition, the bridge to the functional aspects.

You can find the source code at:

https://github.com/Java-Publications/functional-reactive-with-core-java-008.git

If you have questions or comments, simply contact me at sven@vaadin.com or via Twitter @SvenRuppert.

Happy coding!
Sven Ruppert
Sven Ruppert has been coding Java since 1996 and is working as Developer Advocate at Vaadin. He is regularly speaking at Conferences like JavaOne/Jfokus/Devoxx/JavaZone/JavaLand and many more and contributes to IT periodicals, as well as tech portals.
Other posts by Sven Ruppert