Docs

Documentation versions (currently viewingVaadin 24)

Producing Reactive Streams

How to use reactive streams to interact with the user interface.

When using Flow or Hilla to build your user interface, you can use Flux or Mono from Reactor to allow your background jobs to interact with them. Reactor has an extensive API, which means you can do many things with it. This also means that it can be more difficult to learn than using callbacks or CompletableFuture.

If you’re new to reactive programming, you should read Reactor’s Introduction to Reactive Programming before continuing.

Returning a Result

When you’re using Reactor, you can’t use the @Async annotation. Instead, you have to instruct your Mono or Flux to execute using the Spring TaskExecutor. Otherwise, your job executes in the thread that subscribes to the Mono or Flux.

For example, a background job that returns a string or an exception could be implemented like this:

public Mono<String> startBackgroundJob() {
    return Mono.fromSupplier(this::doSomethingThatTakesALongTime)
               .subscribeOn(Schedulers.fromExecutor(taskExecutor));
}

If the doSomethingThatTakesALongTime() method throws an exception, the Mono terminates with an error.

To update the user interface, you have to subscribe to the Mono or Flux. For more information about how to do this, see the Consuming Reactive Streams documentation page.

Important
Hilla only supports Flux, so if your job is returning a Mono, you have to convert it to a Flux inside your @BrowserCallable endpoint. You can do this by calling the Mono.asFlux() method.

Reporting Progress

If your background job only needs to report its progress without actually returning a result, you can return a Flux<Double>. Your job should then emit progress updates, and complete the stream when done. However, you may often want also to return a result. Since Hilla only supports returning a single Flux, you have to use the same stream for emitting both progress updates and the end result. The code may be a bit messy, but it works.

You first need to create a data type that can contain both progress updates and the result. For a job that returns a string, it could look like this:

import com.vaadin.hilla.Nullable;

public record BackgroundJobOutput(
        @Nullable Double progressUpdate,
        @Nullable String result
) {
    public static BackgroundJobOutput progressUpdate(double progressUpdate) {
        return new BackgroundJobOutput(progressUpdate, null);
    }

    public static BackgroundJobOutput finished(String result) {
        return new BackgroundJobOutput(null, result);
    }
}

The two built-in methods, progressUpdate() and finished() make the code look better when it’s time to create instances of BackgroundJobOutput.

Note
If you’ve worked with sealed classes, you may be tempted to create a sealed interface called BackgroundJobOutput, and then create two records that implement that interface: one for progress updates; and another for the result. However, Hilla doesn’t support this at the moment.

Next, you have to implement the background job like this:

private String doSomethingThatTakesALongTime(Consumer<Double> onProgress) {
    ...
}

public Flux<BackgroundJobOutput> startBackgroundJob() {
    Sinks.Many<Double> progressUpdates = Sinks // (1)
        .many()
        .unicast()
        .onBackpressureError();

    var result = Mono // (2)
        .fromSupplier(() -> doSomethingThatTakesALongTime(
            progressUpdates::tryEmitNext
        ))
        .subscribeOn(Schedulers.fromExecutor(taskExecutor));

    return Flux.merge( // (3)
        progress.asFlux().map(BackgroundJobOutput::progressUpdate),
        result.map(BackgroundJobOutput::finished)
    );
}
  1. Create a sink to which you can emit progress updates.

  2. Create a Mono that emits the result of the background job.

  3. Map both streams to BackgroundJobOutput and merge them.

When your user interface subscribes to this Flux, it needs to check the state of the returned BackgroundJobOutput objects. If progressUpdate contains a value, it should update the progress indicator. If result contains a value, though, the operation is finished.

Cancelling

You can cancel a subscription to a Flux or Mono at any time. However, as with CompletableFuture, cancelling the subscription doesn’t stop the background job itself. To fix this, you need to tell the background job when it has been cancelled, so that it can stop. Continuing on the earlier example, adding support for cancelling could look like this:

private String doSomethingThatTakesALongTime(
    Consumer<Double> onProgress,
    Supplier<Boolean> isCancelled) {
    ...
}

public Flux<BackgroundJobOutput> startBackgroundJob() {
    var cancelled = new AtomicBoolean(false);
    Sinks.Many<Double> progressUpdates = Sinks
        .many()
        .unicast()
        .onBackpressureError();

    var result = Mono
        .fromSupplier(() -> doSomethingThatTakesALongTime(
            progressUpdates::tryEmitNext, cancelled::get
        ))
        .doOnCancel(() -> cancelled.set(true))
        .subscribeOn(Schedulers.fromExecutor(taskExecutor));

    return Flux.merge(
        progress.asFlux().map(BackgroundJobOutput::progressUpdate),
        result.map(BackgroundJobOutput::finished)
    );
}

If the user interface cancels the subscription, the cancelled flag becomes true, and the job stops executing at its next iteration.