Call microprofile rest client when receiving amqp message using quarkus

We have a service that is sending an anonymous amqp message to our quarkus vaadin app. I receive the message and then I want to update my ui based on data i want to fetch with the microprofile rest client. But when I call the microprofile rest client I always get a 401 Unauthorized error. I get the error because the client can’t connect to my other quarkus api due to a missing token. As I am struggeling for days with this topic I want to ask if this is even possible using quarkus and vaadin. Has anyone a clue for me?

The behavior I want is: Receive an amqp message with a specfic event. Based on the event update the view, by fetching new data from the api service, for all users that are currently on this view.

It looks like your rest client is not properly configured. How is the API secured? For example, if it’s basic authentication then you can apply the ClientBasicAuth annotation

I use OIDC in the frontend to authenticate. Then i propagate the token to the backend. Which also works fine. But then I send an AMQP message without token propagation because the AMQP message should be received by multiple users. I consume the AMQP message within a vaadin Broadcaster and send an event to the frontend. When I receive the event in the frontend I want to call my backend using threadContext.withContextCapture() but most of the time I get:

“jakarta.ws.rs.ProcessingException: Cannot invoke “io.vertx.core.Context.getLocal(Object)” because the return value of “io.vertx.core.Vertx.currentContext()” is null”

When refreshing my page fast sometimes the call works and I can receive Data from my backend. I currently assume that sometimes the SecurityContext is still available because it wasnt closed yet. But I don’t know how to provide the context within my amqp thread so that I can update the UI by fetching data from my backend when the amqp message arrives.

I’m not familiar with the Micropofile context propagation API, and without looking at the code it is even harder to provide a suggestion.
My guess is that you need to capture the context when you register the listener within the Vaadin broadcaster (maybe something like broadcasterListener = threadContext.contextualConsumer( uiListener) ?) then you should have the correct security context for the backend call.

I already tried to use Context Propagation in Quarkus - Quarkus + Vaadin Broadcaster

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class AmqpDataConsumer {

  @Inject
  MyBroadcaster broadcaster;

  @Incoming("my-channel")
  public void consume(Object dataName) {
    // Fire CDI event with just the data name
    broadcaster.onMyAction("test");
  }
}
import jakarta.enterprise.context.ApplicationScoped;
import com.vaadin.flow.shared.Registration;
import java.util.LinkedList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

@ApplicationScoped
public class MyBroadcaster {
  static Executor executor = Executors.newSingleThreadExecutor();

  static LinkedList<Consumer<String>> listeners = new LinkedList<>();

  public static synchronized Registration register(Consumer<String> listener) {
    listeners.add(listener);

    return () -> {
      synchronized (MyBroadcaster.class) {
        listeners.remove(listener);
      }
    };
  }

  public void onDataUpdate(String dataName) {
    synchronized (MyBroadcaster.class) {
      for (Consumer<String> listener : listeners) {
        executor.execute(() -> listener.accept(dataName));
      }
    }
  }
}

  private Registration broadcasterRegistration;

  @Inject ThreadContext threadContext;
  @Inject ManagedExecutor managedExecutor;

  @Override
  protected void onAttach(AttachEvent attachEvent) {
    super.onAttach(attachEvent);
    broadcasterRegistration = MyBroadcaster.register(myPayload-> {
      System.out.println("Received Event: " + myPayload);
      threadContext.withContextCapture(myRestClient.getData("test-data"))
          .thenApplyAsync(response -> {
            System.out.println(response.stream().toList().size());
            return response.stream().toList();
          }, managedExecutor);
    });
  }

This is what I have tried. And it works rarely. (Only when I refresh my page really often and then on every 100’s refresh the call gets trough). I think this is because the context get propagated, but it also is destroyed when the HTTP Session closes.

What I don’t get is. How can I implement this behavior (receiving an anonymous event and refresh data by using an authenticated http call). Is this even possible?

It looks like to me that you are capturing the context information in the listener, that is however invoked in another thread by MyBroadcaster.
I would try by capturing the context at the moment of the registration , when user info should be available.
Something like

    broadcasterRegistration = MyBroadcaster.register(threadContext.contextualConsumer(myPayload-> {
      System.out.println("Received Event: " + myPayload);
      myRestClient.getData("test-data").then(....)
    }));

EDIT: Looking at the code again, you are capturing the threadContext instance, so if its implementation is not ThreadLocal based it should work.

If you can provide a simple example project, I can try to take a look