When does a Flux-endpoint become active?

If a client joins a flux endpoint I want to resend or somehow tell the client what they missed on that stream. But I can not resend directly to the flux since then I send them too soon, and the client will not get the new messages. How do I architect a bulletproof “Client hook into a stream and gets all historical messages first”?

When does a Flux-endpoint become active?

What do you mean by ”can not resend directly since then I send them too soon”?

Depending on the case, one pattern is to first send a normal rpc saying ”give me all data you have” potentially with an additional ”from no more than 12h ago”. Then when you have retrieved the data you call a Flux with an id or timestamp parameter saying ”I know everything up until this point, send me anything after that”

I would also split this into two methods: a non-reactive method for getting history, and then a reactive endpoint that starts sending messages from a given id.

Something like

// Fetch history first
this.messages = await Endpoint.getHistory(/* probably some parameter for history length*/);
// Then subscribe to live updates
Endpoint.getLiveMessagesStartingFrom(/* id of last message in history*/).onNext(...);

What I was trying to do was this:
public test(){
Sink sink = …
Flux flux = sink.asFlux();
sink.tryEmitNext(“hello”);
sink.tryEmitNext(“boiii”);
return flux;
}
Which didnt work, I tried to do the emit part in a Future, but still not stable due to the Future being too fast. Doing return Flux.concat(Flux.fromArray(history),realFlux) does seem to do the work though :slightly_smiling_face:

Ah. So the problem was that everything had already been emitted by the time you were looking at your running app? One thing you can also do for tests like this is use delayElement to delay each emit by some defined duration Project Reactor - Adding Delay to Mono and Flux - Woolha

Thank you both for pointing me in the right direction :slightly_smiling_face:

I try to have a regular java-class as the return-value in my endpoint where one of the variables is of type Flux<EP_OverlayContent>. It works great, but I do get this error by the ts-compiler in the log:
ERROR in frontend/generated/com/oumworld/endpoints/overlay/EP_OverlayJoin.ts:9:9
TS2304: Cannot find name ‘Subscription’.
*/
export default interface EP_OverlayJoin {
flux: Subscription<EP_OverlayContent | undefined>;
^^^^^^^^^^^^
setup: Array<EP_OverlayContent | undefined>;
}