Docs

Documentation versions (currently viewingVaadin 24)

Consuming Reactive Streams

How to use server push with reactive streams.

When building the user interface with either Vaadin Flow or Hilla, you can use reactive streams to allow a background job to update the user interface. This is covered in the Producing Reactive Streams documentation page.

Types of Subscriptions

Background threads typically use cold streams for output. A cold stream starts emitting values when the client subscribes to it, and then completes.

Broadcasts typically use hot streams for output. A hot stream emits values regardless of whether a client is subscribed or not. A subscriber only receives the values that were emitted while it was subscribed.

In your user interfaces, you typically don’t need to worry about unsubscribing from cold streams, as they’re often short lived. However, if you subscribe to a hot stream, it’s important that you remember to unsubscribe when no longer needed.

Subscribing with Flow

In Flow, you can use both Flux and Mono. You subscribe to both by calling the subscribe() method. The method takes a callback which is called for each emitted value. You should implement the callback as a private method, and then wrap it inside UI.accessLater() when you subscribe.

For example, a method for handling successful completion could look like this:

private void onJobCompleted(String result) {
    Notification.show("Job completed: " + result);
}

The UI.accessLater() method is explained in the Pushing UI Updates documentation page.

In the following example, a background job returns a Mono<String>. The stream is cold, so you don’t need to unsubscribe explicitly from it, as this happens once the Mono has completed. The job is started by a button click listener.

button.addClickListener(clickEvent -> {
    var ui = UI.getCurrent();
    service.startBackgroundJob()
        .subscribe(ui.accessLater(this::onJobCompleted, null));
});

In the following example, a Flux<ChatMessage> is used to receive chat messages. The stream is hot, so you have to subscribe to it when the component is attached, and unsubscribe when it’s detached:

private void onMessageReceived(ChatMessage message) {
    // Add the message to a message list
}

@Override
protected void onAttach(AttachEvent attachEvent) {
    var ui = attachEvent.getUI();
    var subscription = chatService.messages()
        .subscribe(ui.accessLater(this::onMessageReceived, null));
    addDetachListener(detachEvent -> {
        detachEvent.unregisterListener();
        subscription.dispose();
    });
}

Subscribing with Hilla

In Hilla, you can only use a Flux — even if you’re only emitting a single value. However, you can easily convert a Mono to a Flux by calling the asFlux() method.

This is an example of a reactive service that delegates to a worker to start a background job. The worker returns a Mono<String>, which the service converts to a Flux<String>:

@BrowserCallable
public class MyBackgroundJobService {

    private final MyWorker worker;

    MyBackgroundJobService(MyWorker worker) {
        this.worker = worker;
    }

    @AnonymousAllowed
    public Flux<String> startBackgroundJob() {
        return worker.startBackgroundJob().asFlux();
    }
}

You subscribe to a Flux by calling the generated TypeScript service method. You then use the returned Subscription<T> object to register a function that gets called whenever the Flux emits a value.

The following client-side uses the Flux<String> from the earlier example to receive a single output from a server-side background job. The stream is cold, so you don’t need to unsubscribe from it:

const onJobCompleted = (result: string) => {
    // Update the UI state
}

const startJob = () => {
    MyBackgroundJobService.startBackgroundJob().onNext(onJobCompleted)
}

The following client-side example uses a Flux<ChatMessage> to receive chat messages. The stream is hot, so you have to subscribe to it inside a React effect. In the cleanup function, it calls the cancel method of the subscription object. This ensures the subscription is cancelled whenever your component is removed from the DOM:

const onMessageReceived = (message: ChatMessage) => {
    // Update the UI state
}

useEffect(() => {
    const subscription = ChatService.messages().onNext(onMessageReceived)
    return () => subscription.cancel()
}, [])

Handling Errors

In a reactive stream, an error is a terminal event. This means that the subscription is cancelled and no more values are emitted. If you’re dealing with a hot stream, you should therefore consider resubscribing to it as a part of error recovery.

Errors with Flow

In Flow, you can use the doOnError() method to attach a callback that’s called if an error occurs. As for successful completion, you should declare a private method and wrap it inside UI.accessLater() .

For example, a method for handling errors could look like this:

private void onJobFailed(Throwable error) {
    Notification.show("Job failed: " + error.getMessage());
}

In the following example, a button click listener starts a new background job, and uses the callback method to handle any errors that may occur:

button.addClickListener(clickEvent -> {
    var ui = UI.getCurrent();
    service.startBackgroundJob()
        .doOnError(ui.accessLater(this::onJobFailed, null))
        .subscribe(ui.accessLater(this::onJobCompleted, null));
});

Errors with Hilla

With Hilla, you can use the onError() method of the Subscription<T> object to register a function that’s called if an error occurs.

If you add error handling to the earlier background job example, you get something like this:

const onJobCompleted = (result: string) => {
    // Update the UI state
}

const onJobFailed = () => {
    // Handle the error
}

const startJob = () => {
    MyEndpoint.startBackgroundJob().onNext(onJobCompleted).onError(onJobFailed)
}

Note, that the error callback function doesn’t receive any information about the error itself.

Buffering

You shouldn’t push updates to the browser more than 2 to 4 times per second. If your Flux is emitting events faster than that, you should buffer them and update the user interface in batches. Buffering a Flux is easy, as it has built-in support for it through the buffer() method.

In the following example, the buffered stream buffers events for 250 milliseconds before it emits them in batches. Because of this, the user interface is receiving a List<Event> instead of an Event:

private Flux<Event> eventStream() {
    ...
}

public Flux<List<Event>> bufferedEventStream() {
    return eventStream().buffer(Duration.ofMillis(250));
}

If you’re using Flow, you can do the buffering in your user interface, before you subscribe to the stream.

In the following example, the user interface component subscribes to the buffered stream when it’s attached, and unsubscribes when it’s detached:

@Override
protected void onAttach(AttachEvent attachEvent) {
    var subscription = myService.eventStream()
        .buffer(Duration.ofMillis(250))
        .subscribe(attachEvent.getUI().accessLater((eventList) -> {
            // Update your UI here
        }, null));
    addDetachListener(detachEvent -> {
        detachEvent.unregisterListener();
        subscription.dispose();
    });
}

If you’re using Hilla, you have to do the buffering inside the reactive service.

The following example shows a browser callable service that buffers the stream before it’s returned. Because of this, the generated TypeScript service method emits arrays of Event objects:

@BrowserCallable
public class EventService {

    private Flux<Event> eventStream() {
        ...
    }

    @AnonymousAllowed
    public Flux<@Nonnull List<@Nonnull Event>> bufferedEventStream() {
        return eventStream().buffer(Duration.ofMillis(250));
    }
}

Lost Subscriptions Hilla

In Hilla, you have to be prepared to handle situations in which a subscription is lost without being cancelled. For instance, the user may close their laptop lid, or be temporarily disconnected from the network. Hilla automatically re-establishes the connection, but the subscription may no longer be valid. When this happens, Hilla calls the onSubscriptionLost callback function if one has been registered with the Subscription<T> object.

This function can return two values:

REMOVE

Remove the subscription. No more values are received until the client has explicitly resubscribed. This is the default action if no callback has been specified.

RESUBSCRIBE

Re-subscribe by calling the same server method.

In the following example, a React component subscribes to a reactive service inside an effect. It resubscribes to the same service if it loses the subscription:

const onMessageReceived = (message: ChatMessage) => {
    // Update the UI state
}

useEffect(() => {
    const subscription = ChatService.messages()
        .onNext(onMessageReceived)
        .onSubscriptionLost(() => ActionOnLostSubscription.RESUBSCRIBE)
    return () => subscription.cancel()
}, [])