I am sure there are details I am missing. But the docs don’t get there on this issue.
I have found following the same example as background job. I see I can only push the events to the first client that connects. I know for initiating a background job that may make sense. I am not running a job just a sink and flux that pushes data to the client.
So, using Sinks.many().unicast() or .multicast() are pushing the data to the first client that connects. But the second I don’t get anything.
However if I use Sinks.many().replay().limit(1) that handles multiple clients. I kinda wish the docs had better coverage and mentions this “Session” behavior that seems to apply to a single client.
Maybe you are remembering the first UI instance you have in a static variable, and you’re calling ui.access() on that instance only? That would explain why only the first client gets the updates.
There a three different “variants” of Sinks.many():
Replay is like multicast but also sends a selection of previous messages to new subscribers so that they can get in sync immediately. This is useful for simple state synchronization between multiple users.
Multicast supports multiple subscribers. Subscribers will only receive messages that have been emitted after subscribing. This is mostly useful for cases where messages only describe what has changed since the previous update and there’s a separate mechanism for new subscribers to load the initial state.
Unicast only supports a single subscriber. If another client tries to subscribe, then there’s a server-side exception with the message “Sinks.many().unicast() sinks only allow a single Subscriber” and the subscription fails. This is useful e.g. if a user starts a background job and wants to get updated as it progresses.
I agree that we could describe the differences more clearly in the documentation. I couldn’t reproduce your report that multicast didn’t deliver updates to other clients. Maybe you just looked at the lack of an initial update immediately when subscribing, but you didn’t look at subsequent updates after subscribing? I also noticed that changes to how the sink field in my example wasn’t picked up by the hotswap mechanism so I had to restart the server whenever making changes.
Here’s the code that I used for testing.
@BrowserCallable
@AnonymousAllowed
public class HelloService {
// Many<@NonNull String> sink = Sinks.many().replay().limit(1);
Many<@NonNull String> sink = Sinks.many().multicast().onBackpressureBuffer();
// Many<@NonNull String> sink = Sinks.many().unicast().onBackpressureBuffer();
public void sayHello() {
sink.tryEmitNext("Hello at " + System.currentTimeMillis());
}
public Flux<@NonNull String> subscribe() {
return sink.asFlux();
}
}
Hmm, not doing Flow. This is straight from the Vaadin Hilla Starter. Also, straight out of the docs for Hilla. Please explain why you are saying it is Flow? Your example test seems to contradict Flow as well, since it is React. This is greatly appreciated and I am thankful for your replies here.
I notice here you are doing Service.subscribe().onNext..., which is not really documented in the Vaadin Hilla space, usually it all says Service.enpoint().onNext...
subscribe and endpoint are just arbitrary names that you give to the application logic. You can use whatever you want there as long as it’s the same on the server and on the client.
Flow was in reaction to the other comment that suggested using some concepts from Flow.
Oh correct, I did not catch that you had named the endpoint subscribe. I am on 24.7 and I also put the @AnonymousAllowed on the method and not the entire claass. Is it possible that has an effect on the Sink. Your Sink is also not created in the default constructor as mine is. Not sure if this initialization method is a cause. I can only get 1 client to receive the push at the endpoint using multicast().onBackPressureBuffer().
I run 2 browser clients and only 1, the first, will get the push, when using multicast or unicast. Using replay().limit(1) achieves simultaneous push clients.
private final Sinks.Many<ARecord> aSink;
public ApplicationStatusService() {
// NOTE: The many sink cannot be multicast or unicast, does not handle multiclient.
// Need to investigate, but for now this works well. Those forms I used BackPressureBuffer?
asSink = Sinks.many().replay().limit(1);
}
// ....
public void consume(ARecord aMessage) {
aSink.emitNext(aMessage, Sinks.EmitFailureHandler.FAIL_FAST);
}
@AnonymousAllowed
public Flux<ARecord> getAPublisher() {
return aSink.asFlux().onBackpressureBuffer();
}
}
The different initialization approaches shouldn’t make any difference since both ways are still run before any requests are handled.
There’s also a small difference in how changes are emitted with tryEmitNext vs emitNext but I tested that your way also and it didn’t make any difference - multicast() still works for multiple clients for me.
What remains is the expected difference that multicast() only sends new messages emitted after subscribing whereas replay().limit(1) sends the previous message immediately when a new client subscribes. Have you verified that consume is run at least once after returning from getAPublisher for the second client?