How to auto scope Broadcaster messages to the originating user's session?

I need to send notifications from service classes that automatically go to the correct user’s UI without explicitly passing user IDs. The specific use case is sending progress notifications during long-running tasks:

Current issue: When a service method broadcasts a notification, it goes to all users. I want it to automatically go to just the user who initiated the service call, maintaining the user context from controller to service layer.

Example flow:

  1. User A logs in
  2. User B logs in
  3. User A starts a research task
  4. Service broadcasts progress notifications that should automatically route to User A’s UI
  5. User B should not see these notifications, without me having to explicitly pass User A’s ID

Yes, I could pass user IDs with every broadcast call, but I’m looking for a cleaner architectural solution that maintains the user context automatically. Here’s my current code…

public interface Broadcaster {
    void sendNotification(String message);
    Registration registerListener(SerializableBiConsumer<String, UI> listener);
}
public class SessionBroadcaster implements Broadcaster {
    private final Executor executor = Executors.newSingleThreadExecutor();
    private final List<SerializableBiConsumer<String, UI>> uiListeners = new CopyOnWriteArrayList<>();

    public SessionBroadcaster() {
    }

    @Override
    public void sendNotification(String message) {
        broadcastMessage(message);
    }

    @Override
    public Registration registerListener(SerializableBiConsumer<String, UI> listener) {
        uiListeners.add(listener);
        return () -> uiListeners.remove(listener);
    }

    private void broadcastMessage(String message) {
        UI currentUI = UI.getCurrent();
        for (SerializableBiConsumer<String, UI> listener : uiListeners) {
            executor.execute(() -> listener.accept(message, currentUI));
        }
    }
}

Then I use it in the onAttach method of the MainLayout:

@Override
    protected void onAttach(AttachEvent attachEvent) {
        UI ui = attachEvent.getUI();
        broadcasterRegistration = broadcaster.registerListener((message, _) -> ui.access(() -> {  // Use the UI from attachEvent
            if (!"REFRESH_GRIDS".equals(message)) {
                Notification notification = Notification.show(message);
                notification.setPosition(Notification.Position.TOP_CENTER);
            }
        }));
    }

and here’s the entire MainLayout:

@JsModule("./prefers-color-scheme.js")
@Layout
@AnonymousAllowed
public class MainLayout extends AppLayout {

    private H1 viewTitle;
    private final AuthenticatedUser authenticatedUser;
    private Registration broadcasterRegistration;
    private final Broadcaster broadcaster;

    public MainLayout(AuthenticatedUser authenticatedUser, Broadcaster broadcaster) {
        this.authenticatedUser = authenticatedUser;
        this.broadcaster = broadcaster;
        setPrimarySection(Section.DRAWER);
        addDrawerContent();
        addHeaderContent();
    }

    private void addHeaderContent() {
        DrawerToggle toggle = new DrawerToggle();
        toggle.setAriaLabel("Menu toggle");

        viewTitle = new H1();
        viewTitle.addClassNames(LumoUtility.FontSize.LARGE, LumoUtility.Margin.NONE);

        Optional<Account> currentAccount = authenticatedUser.get();
        String credit = "?";
        if (currentAccount.isPresent()) {
            credit = String.valueOf(currentAccount.get().appCredit().balance());
        }
        Paragraph creditBalanceParagraph = new Paragraph("Credit balance: " + credit);
        creditBalanceParagraph.setId("credit-balance");
        creditBalanceParagraph.addClassNames(LumoUtility.FontSize.SMALL, LumoUtility.Margin.NONE);

        HorizontalLayout headerLayout = new HorizontalLayout(viewTitle, creditBalanceParagraph);
        headerLayout.setWidthFull();
        headerLayout.setJustifyContentMode(FlexComponent.JustifyContentMode.BETWEEN);
        headerLayout.addClassNames(LumoUtility.Padding.Right.LARGE);

        addToNavbar(true, toggle, headerLayout);
    }

    private void addDrawerContent() {
        Span appName = new Span("B2B Demand Generation Strategy");
        appName.addClassNames(LumoUtility.FontWeight.SEMIBOLD, LumoUtility.FontSize.LARGE);
        Header header = new Header(appName);
        header.addClickListener(_ -> header.getUI().ifPresent(ui -> ui.navigate(KeywordCollectionsView.class)));
        Scroller scroller = new Scroller(createNavigation());
        addToDrawer(header, scroller, createFooter());
    }

    private SideNav createNavigation() {
        SideNav nav = new SideNav();

        List<MenuEntry> menuEntries = MenuConfiguration.getMenuEntries();
        menuEntries.forEach(entry -> {
            if (entry.icon() != null) {
                nav.addItem(new SideNavItem(entry.title(), entry.path(), new SvgIcon(entry.icon())));
            } else {
                nav.addItem(new SideNavItem(entry.title(), entry.path()));
            }
        });

        return nav;
    }

    private Footer createFooter() {
        Footer layout = new Footer();

        Optional<Account> maybeUser = authenticatedUser.get();
        if (maybeUser.isPresent()) {
            Account account = maybeUser.get();
            String name;
            if (account.person() != null && account.person().name() != null) {
                name = account.person().name();
            } else {
                name = account.username();
            }
            Avatar avatar = new Avatar(name);
            avatar.setThemeName("xsmall");
            avatar.getElement().setAttribute("tabindex", "-1");

            MenuBar userMenu = new MenuBar();
            userMenu.setThemeName("tertiary-inline contrast");

            MenuItem userName = userMenu.addItem("");
            Div div = new Div();
            div.add(avatar);
            div.add(name);
            div.add(new Icon("lumo", "dropdown"));
            div.getElement().getStyle().set("display", "flex");
            div.getElement().getStyle().set("align-items", "center");
            div.getElement().getStyle().set("gap", "var(--lumo-space-s)");
            userName.add(div);
            SubMenu subMenu = userName.getSubMenu();
            subMenu.addItem("Sign out", _ -> authenticatedUser.logout());

            layout.add(userMenu);
        } else {
            Anchor loginLink = new Anchor("login", "Sign in");
            layout.add(loginLink);
        }

        return layout;
    }

    @Override
    protected void afterNavigation() {
        super.afterNavigation();
        viewTitle.setText(getCurrentPageTitle());
    }

    private String getCurrentPageTitle() {
        return MenuConfiguration.getPageHeader(getContent()).orElse("");
    }

    @Override
    protected void onAttach(AttachEvent attachEvent) {
        UI ui = attachEvent.getUI();
        broadcasterRegistration = broadcaster.registerListener((message, _) -> ui.access(() -> {  // Use the UI from attachEvent
            if (!"REFRESH_GRIDS".equals(message)) {
                Notification notification = Notification.show(message);
                notification.setPosition(Notification.Position.TOP_CENTER);
            }
        }));
    }

    @Override
    protected void onDetach(DetachEvent detachEvent) {
        if (broadcasterRegistration != null) {
            broadcasterRegistration.remove();
            broadcasterRegistration = null;
        }
    }
}

Two questions:
Is push active?
Where is the sendNotification method called?

1 Like

Is push active?

Yes.

@Push
@SpringBootApplication
@Theme(value = "keyword-compass", variant = Lumo.DARK)
public class Application implements AppShellConfigurator {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

Where is the sendNotification method called?

From a service class.

public class KeywordResearchService {
    private final Broadcaster broadcaster;

    public KeywordResearchService(ResearchRepository researchRepository, AccountRepository accountRepository, KeywordResearchProvider researchProvider, AiResearchService aiResearchService, Broadcaster broadcaster) {
...
        this.broadcaster = broadcaster;
    }
...
private RichResult<Void> performResearchIteration(KeywordCollection keywordCollection, short currentIteration) {
        String iterationMessage = "Keyword research started (Iteration " + currentIteration +
                                  " of " + keywordCollection.mostRecentResearchSettings().maxIterations() + ")";
        broadcaster.sendNotification(iterationMessage);
...

Hard to say just looking at the code.
If you set a breakpoint in broadcastMessage() method and inside the lambda registered as a listener, are they hit?

1 Like

Yes. And I can add a logger and see the log in the console. No notifications are visible. Maybe it’s not allowed in MainLayout?

2025-01-18T10:06:33.790-06:00 INFO 4119 — [mcat-handler-81] d.n.application.SessionBroadcaster : Broadcasting message: Test notification triggered from button

I went to using the example Broadcaster from the Vaadin docs. Works now. I’ll have to find some other way to make it session-scoped.

public class SessionBroadcaster implements Broadcaster {

    private static final Logger log = LoggerFactory.getLogger(SessionBroadcaster.class);
    private static final Executor executor = Executors.newSingleThreadExecutor();
    private static final LinkedList<SerializableBiConsumer<String, UI>> listeners = new LinkedList<>();

    @Override
    public void sendNotification(String message) {
        broadcast(message);
    }

    @Override
    public synchronized Registration registerListener(SerializableBiConsumer<String, UI> listener) {
        listeners.add(listener);

        return () -> {
            synchronized (SessionBroadcaster.class) {
                listeners.remove(listener);
            }
        };
    }

    private synchronized void broadcast(String message) {
        log.info("Broadcasting message: {}", message);
        UI currentUI = UI.getCurrent();

        for (SerializableBiConsumer<String, UI> listener : listeners) {
            executor.execute(() -> listener.accept(message, currentUI));
        }
    }
}

I don’t understand how your first implementation was supposed to only notify some UIs.
Also, the only difference I see with the working code is the usage of synchronized.
What am I missing?

1 Like

I mean…I wish I knew. I’m basically just hammering on chatGPT right now trying to find a solution. The need for a session-scoped Broadcaster must be super common, though, right? Is there no demo of this out there already?

Is it a Spring application? You can perhaps make the broadcaster a @VaadinSessionScope bean.
The drawback is that this way you will not be able to broadcast to other sessions.

1 Like

Probably I still misunderstood the requirement.
Did I understand correctly that what you want, is that when you call Broadcaster.sendMessage() it should notify the listeners of all the UIs belonging to the same VaadinSession?

1 Like

Is it a Spring application?

yes

I did try that a few times. The app won’t even start because:

Caused by: org.springframework.beans.factory.support.ScopeNotActiveException: Error creating bean with name ‘broadcaster’: Scope ‘vaadin-session’ is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton

Caused by: java.lang.IllegalStateException: No VaadinSession bound to current thread

Yes. But let me state it more plainly in case I am misunderstanding the use of the word session or scope: each logged in users should only receive messages intended for them.

This is most likely because you tried to inject the VaadinSessionScoped broadcaster into another bean with a wider scope (e.g. an Singleton`)

1 Like

And how does the application decide which users should receive the message?

1 Like

I’ll just use a workaround for now and pass along the username in every message, but if anyone knows a better way, please let me know.

public class SessionBroadcaster implements Broadcaster {
    private static final Logger log = LoggerFactory.getLogger(SessionBroadcaster.class);
    private static final Executor executor = Executors.newSingleThreadExecutor();
    private static final LinkedList<SerializableBiConsumer<BroadcastMessage, UI>> listeners = new LinkedList<>();

    @Override
    public void sendNotification(String message) {
        broadcast(new BroadcastMessage(null, message));
    }

    @Override
    public void sendNotificationToUser(String username, String message) {
        broadcast(new BroadcastMessage(username, message));
    }

    @Override
    public synchronized Registration registerListener(SerializableBiConsumer<String, UI> listener) {
        var wrappedListener = wrapListener(null, listener);
        listeners.add(wrappedListener);
        return () -> {
            synchronized (SessionBroadcaster.class) {
                listeners.remove(wrappedListener);
            }
        };
    }

    @Override
    public synchronized Registration registerUserListener(String username, SerializableBiConsumer<String, UI> listener) {
        var wrappedListener = wrapListener(username, listener);
        listeners.add(wrappedListener);
        return () -> {
            synchronized (SessionBroadcaster.class) {
                listeners.remove(wrappedListener);
            }
        };
    }

    private SerializableBiConsumer<BroadcastMessage, UI> wrapListener(String username,
                                                                      SerializableBiConsumer<String, UI> listener) {
        return (message, ui) -> {
            if (message.isForUser(username)) {
                listener.accept(message.content(), ui);
            }
        };
    }

    private synchronized void broadcast(BroadcastMessage message) {
        log.info("Broadcasting message: {}", message);
        UI currentUI = UI.getCurrent();
        for (var listener : listeners) {
            executor.execute(() -> listener.accept(message, currentUI));
        }
    }
}
@Override
    protected void onAttach(AttachEvent attachEvent) {
        setupBroadcasterListener(attachEvent.getUI());
    }

    private void setupBroadcasterListener(UI ui) {
        String username = null;
        if (optionalAccount.isPresent()) {
            username = optionalAccount.get().username();
        }
        broadcasterRegistration = broadcaster.registerUserListener(username, (message, _) -> ui.access(() -> {
            if (!REFRESH_GRIDS_MESSAGE.equals(message)) {
                Notification notification = Notification.show(message);
                notification.setPosition(Notification.Position.TOP_CENTER);
            }
        }));
    }

This is the main question of this post. How does a broadcast remain session-scoped?

I would try something like this


@SpringComponent
Broadcaster implements VaadinServiceInitListener, SessionInitLister, SessionDestroyListener {

	vaadinSessions[]
	listeners[]

	sessionInit(event) {
		add to vaadinSessions[]
	}
	sessionDestroy(event) {
		remove from vaadinSessions[]
	}
    public void serviceInit(ServiceInitEvent event) {
        event.getSource().addSessionInitListener(this);
        event.getSource().addSessionDestroyListener(this);
    }	
	
	register(listener) {
		add to listeners
	}

	sendMessage(user, message) {
		loop over vaadinSessions (optionally filtered by user) {
			loop over vaadinSession.getUIs (filter for active UI) {
				loop over listeners {
					executor.execute(listener)
				}
			}
		}
	}	
}

Edit: this solution is incorrect because it executes a listener multiple times.
Perhaps register method should take also the current UI as parameter and store it with listener, so that a proper filter can be applied on broadcast.

1 Like

Thanks! I have tried a few variations of this. Still haven’t got it working. Seems like the only way is to pass in an identifier for the session or user along with the message with each broadcast.

If you want to send a message to a specific user from a global context (a singleton service) , you should provide a way to identify the user session.
Different thing is if the message sender itself is session scoped (meaning that the service instance is per single session): in this case you will have only one session/user, so no need for filtering, but you will not be able to broadcast to other sessions/users

Maybe you want to look into mbassador.

Other question, does the bean have to be session scoped?

1 Like