Once I tried this with Spring Flux based on this article, but it wasn’t really handy. (Using Hilla)
Or with the bean ApplicationEventPublisher
and the subscriber method, simply annotated with @EventListener
Everything will be synchronous.
The reactive chat Reactive Chat App with Spring Boot, Project Reactor, and Vaadin | Vaadin
But a quite old.
more actual with project reactor too, but i’m not sure is works with the session scope
@Configuration
public class MessageListItemReactivePublisher {
@Bean(value = "publisher)
public Sinks.Many<YourEntity> publisher() {
return Sinks
.many()
.multicast()
.onBackpressureBuffer();
}
@Bean(value = "subscribers")
public Flux<YourEntity> subscribers(Sinks.Many<YourEntity> publisher) {
return publisher
.asFlux();
}
}
constructor inyection
The publisher
private final Sinks.Many<YourEntity> publisher;
this.publisher.tryEmitNext(entity);
---
The subscribers
private final Flux<YourEntity> subscribers;
this.subscribers
.subscribe(yourEntity -> {
this.subscribe(ui, yourEntity);
});
One more thought here
I have CDI demo, where I have ApplicationScoped event bus (In Spring it would be ApplicationScope). It would be one line change to turn this to VaadinSessionScope by just changing the annotation.
I think in this thread we are mixing multiple use cases:
- just broadcast messages to all tabs in the same browser: a session scoped broadcast would be enough
- broadcast to all users: the example from the docs shows how to do it
- broadcast to one user on multiple devices: the caller must provide a user identifier or predicate and the broadcaster must be able to find active sessions for the user
- broadcast to multiple users: the caller must provide users identifier (or a predicate to identify their sessions) and the broadcaster must filter out active sessions for the users
In addition, if the caller is on a UI request thread, the current VaadinSession can be used to apply some filter, but if it is a background thread there will be no way to automatically filter out a single session.
Based on the use case, the implementation and the scope may be different.
They all look pretty similar to the approach that I’ve been using. Maybe slightly different order:
- UI registers itself (in the constructor or post construct) to the broadcaster to receive messages (stored using weak references)
- In incoming messages the broadcaster invokes processMessage in all registered/available UIs
- processMessage checks if the UI is still attached and updates wrapped in access()
- If not attached anymore, it unregisters itself from the broadcaster.
The scope of the broadcaster is singleton, but it should not really matter. And of course, the whole thing is meaningful only when using @Push .
Maybe you want to look into mbassador.
I’ll take a look!
Other question, does the bean have to be session scoped?
Not currently. I tried a couple of different ways to apply that, but mostly it just makes the app crash because there no session to apply on startup when the beans are created.
Thanks Sami. I feel like I tried that and it didn’t work for me. May I see your implementation.
I’ve tried so many things now that I’ve lost track. Should have been keeping notes, although I guess I can look through the commits.
Thanks Marco for helping clarify the distinction. You’re right - I need to broadcast from singleton service classes (like my KeywordResearchService) to specific users’ UIs. Let me clarify my actual use case:
- User A logs in and starts a keyword research task
- User B logs in and starts their own research task
- When the KeywordResearchService processes User A’s task, I want notifications to only show up for User A
- When processing User B’s task, notifications should only show for User B
I will update the post title for clarity.
The service is singleton-scoped since it handles tasks for all users. I’ve tried the workaround of passing the username with each message, which probably the simplest solution for now, but I feel there must be a cleaner architectural solution that would allow me a little more freedom to just broadcast from anywhere without needing to also always pass in the userId or similar.
Thanks everyone for your suggestions. I’ll need to go through each one slowly and try to understand how I might try implementing it.
I spent some time this evening trying out everyone’s suggestions. None of them worked completely. In the end, I’m going back to specifying the AccountId in every broadcast message. Honestly, I’m not sure why I thought I could escape this.
I wish I could just close this post, but otherwise I’m not sure I should mark any one reply as the solution since probably what I was requesting is impossible in the first place.
If you only need to schedule a UI update after the process finish, and the process is started by a UI thread, wouldn’t it be simpler to make the service return a CompletableFuture and chain the UI update?
class MyService {
CompletableFuture<?> doSoemthing() {
CompletableFuture<?> future = new CompletableFuture<>();
// start background thread that will call future.complete() when work is done
return future;
}
}
class MyView {
MyView() {
new Button("Start process", event -> {
UI ui = UI.getCurrent();
SerializableRunnable updateUI = ui.accessLater(() -> {
// do ui update
}, null);
service.doSomething().thenRun(updateUI);
});
}
}
Thanks Marco!
I wish this were the case. The reason started investigating this is because there is one long running async process that wants to send updates to the user about its progress. It may go on for several minutes because it contacts two different apis that take time to respond and it does this in a loop based on user input.
Just returning to show the exact code I ended up using in case it will help anyone:
public interface Broadcaster {
void broadcast(String message, AccountId accountId);
Registration registerListener(SerializableBiConsumer<String, UI> listener, UI ui, AccountId accountId);
}
public class MessageBroadcaster implements Broadcaster {
private static final Logger log = LoggerFactory.getLogger(MessageBroadcaster.class);
private static final Executor executor = Executors.newSingleThreadExecutor();
private static final LinkedList<ListenerWrapper> listeners = new LinkedList<>();
@Override
public synchronized Registration registerListener(SerializableBiConsumer<String, UI> listener, UI ui, AccountId accountId) {
ListenerWrapper wrapper = new ListenerWrapper(listener, ui, accountId);
listeners.add(wrapper);
return () -> {
synchronized (MessageBroadcaster.class) {
listeners.remove(wrapper);
}
};
}
@Override
public synchronized void broadcast(String message, AccountId targetAccountId) {
log.info("Broadcasting message to account {}: {}", targetAccountId, message);
var listenersCopy = new LinkedList<>(listeners);
for (var wrapper : listenersCopy) {
if (wrapper.accountId().equals(targetAccountId)) {
executor.execute(() -> wrapper.ui().access(() -> {
try {
wrapper.listener().accept(message, wrapper.ui());
} catch (Exception e) {
log.error("Error executing listener: {}", e.getMessage(), e);
// Remove failed listeners to prevent memory leaks
listeners.remove(wrapper);
}
}));
}
}
}
private record ListenerWrapper(SerializableBiConsumer<String, UI> listener, UI ui, AccountId accountId) {
}
}
Make it a bean in a config file:
@Profile("!test")
@Bean
public Broadcaster broadcaster() {
return new MessageBroadcaster();
}
Then create notifications in the MainLayout:
@Override
protected void onAttach(AttachEvent attachEvent) {
super.onAttach(attachEvent);
authenticatedUser.get().ifPresent(value ->
setupBroadcasterListener(attachEvent.getUI(), value.accountId()));
addGoogleTagManagerScript(attachEvent);
addGoogleTagManagerNoscript(attachEvent);
detectAdBlockerUsingClientDetails(attachEvent.getUI());
}
private void setupBroadcasterListener(UI ui, AccountId accountId) {
broadcasterRegistration = broadcaster.registerListener((message, _) -> ui.access(() -> {
if (!REFRESH_GRIDS_MESSAGE.equals(message)) {
Notification notification = Notification.show(message);
notification.setPosition(Notification.Position.TOP_CENTER);
}
}), ui, accountId);
}
Just noting, that if you use LinkedList to store the listeners, you need to always unregister listeners upon detach of the component. Otherwise your solution will leak memory.
Just to be on the safe side, I am thus always using WeakHashMap instead
I am not particularly fond of passing UI to the broadcaster, it is against my principle of separation of concern.
I prefer to wire EventBus in Presenters, which catches the event broadcasted
And I am creating methods that use ui.access in the View
(I have utility method that handles the error cases)
The project is Vaadin 8, but these things are rather independent on Vaadin version.
And isn’t it better to use a CopyOnWriteArrayList to avoid synchronizing ? in the method ?