Push - Creates new threads permanently (Atmosphere-Shared)

I have created a very simple example to demonstrate the problem. A progressbar is updated all 100ms in a background thread. It seems that every UI.access() call creates a new thread.

The following image shows the thread list in netbeans as a snapshot.

For one user we can see about 220 live threads. But there are >5 new threads per second and >5 old terminates. In VisualVM it is possible to track thread creation.

I hope somebody can help, because this problem can result in unusable web services if there are more than 10 users working.

BR, Axel

Here ist the very simple view source code:

package com.example.threads.views.myview;

import com.vaadin.flow.component.*;
import com.vaadin.flow.component.orderedlayout.VerticalLayout;
import com.vaadin.flow.component.progressbar.ProgressBar;
import com.vaadin.flow.router.PageTitle;
import com.vaadin.flow.router.Route;

@PageTitle("My View")
@Route("")
public class MyViewView extends Composite<VerticalLayout>
{
  public MyViewView()
  {
    getContent().setWidth("100%");
    getContent().getStyle().set("flex-grow", "1");
    progressBar.setValue(0.01);
    getContent().add(progressBar);
  }

  @Override protected void onAttach(AttachEvent e)
  {
    super.onAttach(e);
    _mThr = new Thread(() -> _doLoop(e.getUI()), "PROGRESSION");
    _mThr.setDaemon(true);
    _mThr.start();
  }
  
  @Override protected void onDetach(DetachEvent e)
  {
    try
    {
      _mThr.interrupt();
      _mThr.join();
    }
    catch (InterruptedException x)
    { Thread.currentThread().interrupt(); }

    _mThr = null;
    super.onDetach(e);
    System.out.println(this + " : detach!");
  }

  void _doLoop(UI ui)
  { int val = 0;
    int off = 1;
    
    try
    {
      while (ui.isAttached())
      {
        Thread.sleep(100);
        val += off;
        float v = 0.01f * val;
        ui.access(() -> progressBar.setValue(v));

        if (val >= 100)
          off = -1;
        if (val <= 0)
          off = 1;
      }
    }
    catch (InterruptedException x)
    { Thread.currentThread().interrupt(); }
  }

  private final ProgressBar progressBar = new ProgressBar();

  private Thread  _mThr;
}

I agree that something interesting goes on with those Atmosphere-Shared threads but your assumption about scalability doesn’t seem to hold up. If I open the same view in multiple separate browser windows, then it reduces the pace of thread creation. With 10 instances open, it hasn’t created any new thread during the last 5 minutes of running.

It might be good to have a closer look but it could very well be that Atomsphere has some thread pool that is optimized for a system under higher load and those settings lead to some side effect when the load is lighter.

Thank you for your time and comments. First I couldn’t believe what you have described. But I can see this bevavior too - and it is reproducable during one service run using Edge and open and closing tabs (Firefox has a problem detecting the detach sometimes).
The idle time for the atmosphere threads seems to be 30 seconds and is ok. But it is not clear why they are created but not really used.

Are there some atmosphere properties and how they can be set?

Atmosphere creates several thread pools, that can vary based on the configuration. IIRC by default it tries to use ForkJoin pools, but based on the configuration it can switch to cached or fixed thread pools.

For example, the default number of threads for the aync write executor is 200.

You can take a look at org.atmosphere.util.ExecutorsFactory to see how thread pools are initialized.

This sounds good. Can you tell me, how I can change something like to use a cached thread pool. Can I do this in the application.properties file? I have nothing found to change atmosphere parameters in a Vaadin application.

You need to set the properties as VaadinServlet init parameters, it cannot be done through application.properties.
If you are on Vaadin 24.6 you can easily customize the Spring Vaadin Servlet registration bean following the instructions in the documentation: https://vaadin.com/docs/latest/flow/integrations/spring/configuration#customizing-vaadin-servlet-and-service

For older version, take a look at the example code in this comment: java.lang.UnsupportedOperationException: Unexpected message id from the client · Issue #16664 · vaadin/flow · GitHub

Maybe somebody is interested I have made a little hack. I have overwritten the default broadcaster and set my 3 own executor services into the static ExecutorsFactory before call super.initialize(). Now there are always the 3 VaXXX threads.

Here the patched broadcaster:

public class VaPatchBroadcaster extends DefaultBroadcaster
{
  public VaPatchBroadcaster()
  {
    super();
    _mAsyncSrv  = new ThreadPoolExecutor(0, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), _threadFac("VA-AsyncWrite",    new AtomicLong()));
    _mMsgDisp   = new ThreadPoolExecutor(0, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), _threadFac("VA-MsgDispatcher", new AtomicLong()));

    ScheduledThreadPoolExecutor se = new ScheduledThreadPoolExecutor(0, _threadFac("VA-Scheduler", new AtomicLong()));
    se.setKeepAliveTime(60, TimeUnit.SECONDS);
    _mSchedSrv = se;
  }

  @Override
  public Broadcaster initialize(String name, URI uri, AtmosphereConfig config)
  {
//    ExecutorsFactory.reset(config);
    config.properties().put(ExecutorsFactory.SCHEDULER_THREAD_POOL,   _mSchedSrv);
    config.properties().put(ExecutorsFactory.BROADCASTER_THREAD_POOL, _mMsgDisp);
    config.properties().put(ExecutorsFactory.ASYNC_WRITE_THREAD_POOL, _mAsyncSrv);

    super.initialize(name, uri, config);
//
//    bc.setExecutorService(_mMsgDisp, true);
//    bc.setAsyncWriteService(_mAsyncSrv, true);
//    bc.setScheduledExecutorService(_mSchedSrv);

    return this;
  }

//  @Override
//  protected BroadcasterConfig createBroadcasterConfig(AtmosphereConfig config)
//  {
//    return new BroadcasterConfig(config.framework().broadcasterFilters(), config, false, getID()).init();
//  }

  @SuppressWarnings("Convert2Lambda")
  private static ThreadFactory _threadFac(String nm, AtomicLong cnt)
  {
    return new ThreadFactory()
    { @Override public Thread newThread(Runnable r)
      { Thread t = new Thread(r, nm + " #" + cnt.incrementAndGet());
        t.setDaemon(true);
        return t;
      }
    };
  }

  private final ExecutorService           _mAsyncSrv;
  private final ExecutorService           _mMsgDisp;
  private final ScheduledExecutorService  _mSchedSrv;
}

And the Spring configuration class:

@Configuration
public class VaPatchConfig
{
  @Bean
  BeanPostProcessor patchAtmosphereBroadcaster()
  {
    return new BeanPostProcessor()
    {
      @Override
      public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException
      {
        if (bean instanceof ServletRegistrationBean<?> reg)
        {
          if (reg.getServlet() instanceof SpringServlet)
            reg.addInitParameter(ApplicationConfig.BROADCASTER_CLASS, VaPatchBroadcaster.class.getName());
        }
        return bean;
      }
    };
  }
}

Just a question, out of curiosity: wouldn’t have been easier to use the existing parameters to disable the usage of ForkJoin pool and tune the size of the thread pools, instead of doing a custom implementation?

Thanks for your support. Of course it should be easier. But I didn’t found all the parameters and how I have to use them.