Producing Reactive Streams
When using Flow or Hilla to build your user interface, you can use Flux
or Mono
from Reactor to allow your background jobs to interact with them. Reactor has an extensive API, which means you can do many things with it. This also means that it can be more difficult to learn than using callbacks or CompletableFuture
.
If you’re new to reactive programming, you should read Reactor’s Introduction to Reactive Programming before continuing.
Returning a Result
When you’re using Reactor, you can’t use the @Async
annotation. Instead, you have to instruct your Mono
or Flux
to execute using the Spring TaskExecutor
. Otherwise, your job executes in the thread that subscribes to the Mono
or Flux
.
For example, a background job that returns a string or an exception could be implemented like this:
public Mono<String> startBackgroundJob() {
return Mono.fromSupplier(this::doSomethingThatTakesALongTime)
.subscribeOn(Schedulers.fromExecutor(taskExecutor));
}
If the doSomethingThatTakesALongTime()
method throws an exception, the Mono
terminates with an error.
To update the user interface, you have to subscribe to the Mono
or Flux
. For more information about how to do this, see the Consuming Reactive Streams documentation page.
Important
|
Hilla only supports Flux , so if your job is returning a Mono , you have to convert it to a Flux inside your @BrowserCallable endpoint. You can do this by calling the Mono.asFlux() method.
|
Reporting Progress
If your background job only needs to report its progress without actually returning a result, you can return a Flux<Double>
. Your job should then emit progress updates, and complete the stream when done. However, you may often want also to return a result. Since Hilla only supports returning a single Flux
, you have to use the same stream for emitting both progress updates and the end result. The code may be a bit messy, but it works.
You first need to create a data type that can contain both progress updates and the result. For a job that returns a string, it could look like this:
import com.vaadin.hilla.Nullable;
public record BackgroundJobOutput(
@Nullable Double progressUpdate,
@Nullable String result
) {
public static BackgroundJobOutput progressUpdate(double progressUpdate) {
return new BackgroundJobOutput(progressUpdate, null);
}
public static BackgroundJobOutput finished(String result) {
return new BackgroundJobOutput(null, result);
}
}
The two built-in methods, progressUpdate()
and finished()
make the code look better when it’s time to create instances of BackgroundJobOutput
.
Note
|
If you’ve worked with sealed classes, you may be tempted to create a sealed interface called BackgroundJobOutput , and then create two records that implement that interface: one for progress updates; and another for the result. However, Hilla doesn’t support this at the moment.
|
Next, you have to implement the background job like this:
private String doSomethingThatTakesALongTime(Consumer<Double> onProgress) {
...
}
public Flux<BackgroundJobOutput> startBackgroundJob() {
Sinks.Many<Double> progressUpdates = Sinks 1
.many()
.unicast()
.onBackpressureError();
var result = Mono 2
.fromSupplier(() -> doSomethingThatTakesALongTime(
progressUpdates::tryEmitNext
))
.subscribeOn(Schedulers.fromExecutor(taskExecutor));
return Flux.merge( 3
progress.asFlux().map(BackgroundJobOutput::progressUpdate),
result.map(BackgroundJobOutput::finished)
);
}
-
Create a sink to which you can emit progress updates.
-
Create a
Mono
that emits the result of the background job. -
Map both streams to
BackgroundJobOutput
and merge them.
When your user interface subscribes to this Flux
, it needs to check the state of the returned BackgroundJobOutput
objects. If progressUpdate
contains a value, it should update the progress indicator. If result
contains a value, though, the operation is finished.
Cancelling
You can cancel a subscription to a Flux
or Mono
at any time. However, as with CompletableFuture
, cancelling the subscription doesn’t stop the background job itself. To fix this, you need to tell the background job when it has been cancelled, so that it can stop. Continuing on the earlier example, adding support for cancelling could look like this:
private String doSomethingThatTakesALongTime(
Consumer<Double> onProgress,
Supplier<Boolean> isCancelled) {
...
}
public Flux<BackgroundJobOutput> startBackgroundJob() {
var cancelled = new AtomicBoolean(false);
Sinks.Many<Double> progressUpdates = Sinks
.many()
.unicast()
.onBackpressureError();
var result = Mono
.fromSupplier(() -> doSomethingThatTakesALongTime(
progressUpdates::tryEmitNext, cancelled::get
))
.doOnCancel(() -> cancelled.set(true))
.subscribeOn(Schedulers.fromExecutor(taskExecutor));
return Flux.merge(
progress.asFlux().map(BackgroundJobOutput::progressUpdate),
result.map(BackgroundJobOutput::finished)
);
}
If the user interface cancels the subscription, the cancelled
flag becomes true
, and the job stops executing at its next iteration.