Documentation

Documentation versions (currently viewingVaadin 24)

Implementing Support for Clustered Environments

Using Collaboration Kit with its default settings in a clustered environment results in users being able to collaborate only with other users connected to the same application instance, the same node. To run clustered application deployments, Collaboration Kit provides the Backend superclass that can be extended to support multi-instance environments.

This page shows how to implement a custom backend to support clustering on the Hazelcast platform.

Important
Experimental Feature
The Collaboration Kit backend for clustering support is an experimental feature. The behavior, the API, as well as the look-and-feel, may change. You can enable it through a Feature Flag.

Event Log Basics

Collaboration Kit uses a custom Backend implementation as the gateway for accessing Event Logs. An Event Log is an ordered log of submitted events involving Topic data, such as added items or value changes. The EventLog API provides methods to submit new events to the log and to add a subscriber that receives past and future events. All events are marked by a unique identifier. The API uses it for a method that removes all events in the log before a given identifier.

Event Log for Hazelcast

Hazelcast provides a straightforward streaming API based on shared maps and lists. Start by implementing the EvengLog interface for the reference Hazelcast platform. The Event Log can be implemented to make use of a Hazelcast IList. However, you’ll need a class to store both the event identifier and the payload, first.

private static final class IdAndPayload implements Serializable {

    private final UUID id;

    private final String payload;

    private IdAndPayload(UUID id, String payload) {
        this.id = id;
        this.payload = payload;
    }
}

Once you have that, you can start implementing the interface methods. The submitEvent method uses the event identifier and payload so you can store them in a new IdAndPayload object. You may also add it to the Hazelcast list for the Event Log.

public static class HazelcastEventLog implements EventLog {

    private final IList<IdAndPayload> list;
    public HazelcastEventLog(IList<IdAndPayload> list) {
        this.list = list;
    }

    @Override
    public void submitEvent(UUID id, String payload) {
        list.add(new IdAndPayload(id, payload));
    }

}

To implement subscriptions to events, add an item listener to the Hazelcast list. The subscriber then receives all past and future events in the Event Log. A new subscriber receives all earlier events from the log in their original order. This allows it to synchronize to the latest state. New events are delivered in order, after all earlier events have been delivered. The subscribe method may not be invoked again until the previous subscriber has been removed.

The subscribe method optionally takes the identifier of the last known event so that the subscriber is notified only about newer events. If an identifier is provided but not found in the Event Log, an EventIdNotFoundException is thrown.

Note
Exception Handling
When the code calling this method catches an exception, it may retry the subscription with another identifier.
@Override
public synchronized Registration subscribe(UUID newerThan,
        BiConsumer<UUID, String> eventSubscriber)
        throws EventIdNotFoundException {
    if (this.eventSubscriber != null) {
        throw new IllegalStateException(); // (1)
    }

    if (newerThan != null) {
        Optional<IdAndPayload> newerThanIdAndEvent = list.stream()
                .filter(item -> newerThan.equals(item.id)).findFirst();
        if (newerThanIdAndEvent.isEmpty()) {
            throw new EventIdNotFoundException(
                    "newerThan doesn't " + "exist in the log."); // (2)
        }
    }
    this.newerThan = newerThan;
    this.eventSubscriber = eventSubscriber;
    nextEventIndex = 0;

    UUID registrationId = list
            .addItemListener(new ItemListener<IdAndPayload>() {
                @Override
                public void itemAdded(ItemEvent<IdAndPayload> item) {
                    deliverEvents();
                }

                @Override
                public void itemRemoved(ItemEvent<IdAndPayload> item) {
                    handleRemoveItem();
                }
            }, false); // (3)

    // Deliver initial events
    deliverEvents(); // (4)

    return () -> {
        synchronized (this) {
            list.removeItemListener(registrationId);
            this.eventSubscriber = null;
        }
    }; // (5)
}
  1. Only a single subscriber is allowed: an exception is thrown when there is one, already.

  2. If an event identifier is provided, the list is checked to see if it’s contained in it. If it doesn’t, an EventIdNotFoundException is thrown.

  3. An item listener is added to the Hazelcast list to handle new items and those that have been removed.

  4. All past events are delivered initially.

  5. Registration is returned and can be used to remove the subscriber.

The deliverEvents method is synchronized to prevent it from being invoked simultaneously by multiple threads — and to avoid the subscriber being notified of duplicate events. The method keeps track of the Hazelcast list index to identify the next event. It increments that index until all events are delivered. If an event identifier is set as the starting point, no new events are delivered until that identifier is reached.

private synchronized void deliverEvents() {
    while (nextEventIndex < list.size()) {
        IdAndPayload event = list.get(nextEventIndex++);
        if (this.newerThan == null) {
            eventSubscriber.accept(event.id, event.payload);
        } else {
            if (event.id.equals(newerThan)) {
                this.newerThan = null;
            }
        }
    }
}

The last method to implement for the EventLog interface is truncate. This method is used to limit the number of events contained in the log, preventing it from growing infinitely. It takes the identifier of the oldest known event that should be preserved, or, if a null identifier is provided, it empties the entire log.

To implement this behavior for Hazelcast, create a Predicate class and pass it to the removeIf method in the list.

@Override
public synchronized void truncate(UUID olderThan) {
    Predicate<IdAndPayload> filter = e -> true;
    if (olderThan != null) {
        Optional<IdAndPayload> olderThanEvent = list.stream()
                .filter(item -> olderThan.equals(item.id)).findFirst();
        if (olderThanEvent.isEmpty()) {
            // NOOP
            return;
        }
        filter = new Predicate<>() {
            boolean found;

            @Override
            public boolean test(IdAndPayload event) {
                found = found || olderThan.equals(event.id);
                return !found;
            }
        };
    }
    list.removeIf(filter);
}

Opening an Event Log

Once you have a Hazelcast implementation of the EventLog interface, you’ll need to extend the Backend class to be able to create and get instances of it. Since this implementation depends only on a single Hazelcast IList, it’s simple to implement the openEventLog method. You can do so by returning a new instance of the HazelcastEventLog with the list named after the logId parameter.

@Override
public EventLog openEventLog(String logId) {
    return new HazelcastEventLog(hz.getList(logId));
}

Event Log Snapshots

A snapshot is an opaque representation of data at a certain moment in time. It can be used by nodes joining a cluster to catch up with a recent state of data — without the need to replay all events. Snapshots are identified by name. Each version of a named snapshot is assigned an extra unique identifier.

Loading a Snapshot

To load the latest version of a snapshot, the Backend class provides the loadLatestSnapshot method. This method can be implemented for Hazelcast by using a map to store the latest available snapshot.

@Override
public CompletableFuture<Snapshot> loadLatestSnapshot(String name) {
    return CompletableFuture.completedFuture(snapshots.get(name));
}

New Snapshot

To submit a new snapshot of data, use the replaceSnapshot method. It takes the name of the snapshot, the expected unique identifier of the latest snapshot, the unique identifier of the new snapshot, and the payload of the snapshot itself. To implement this method for Hazelcast, you need some logic to verify that the latest snapshot is the expected snapshot.

@Override
public CompletableFuture<Void> replaceSnapshot(String name, UUID expectedId,
        UUID newId, String payload) {
    Snapshot currentSnapshot = snapshots.computeIfAbsent(name,
            k -> new Snapshot(null, null));

    if (Objects.equals(expectedId, currentSnapshot.getId())) {
        Snapshot idAndPayload = new Snapshot(newId, payload);
        snapshots.put(name, idAndPayload);
    }

    return CompletableFuture.completedFuture(null);
}

Node Identity & Membership Events

The primary purpose of the Backend API is to support collaborative features in applications that are deployed in clustered environments. Every Backend instance represents a member of the cluster and is uniquely identified by a UUID, which is returned by the getNodeId method. For example, you could return the local member identifier for your Hazelcast implementation.

@Override
public UUID getNodeId() {
    return hz.getCluster().getLocalMember().getUuid();
}

When many Backend instances are involved, it’s necessary to know when they join or leave the cluster. For this purpose, the Backend provides an implementation of the addMembershipListener method that takes a MembershipListener and notifies it when cluster members join or leave. Since Hazelcast uses the same concept, the implementation is straightforward: You only need to map Hazelcast’s events to Collaboration Kit’s MembershipEvent events. This grabs the MembershipEventType (i.e., JOIN or LEAVE), and the identifier of the member.

@Override
public Registration addMembershipListener(
        MembershipListener membershipListener) {
    UUID registrationId = hz.getCluster()
            .addMembershipListener(new InitialMembershipListener() {

                @Override
                public void init(InitialMembershipEvent event) {
                    event.getMembers()
                            .forEach(member -> submitEvent(
                                    MembershipEventType.JOIN,
                                    member.getUuid()));
                }

                @Override
                public void memberAdded(MembershipEvent membershipEvent) {
                    submitEvent(MembershipEventType.JOIN,
                            membershipEvent.getMember().getUuid());
                }

                @Override
                public void memberRemoved(MembershipEvent membershipEvent) {
                    submitEvent(MembershipEventType.LEAVE,
                            membershipEvent.getMember().getUuid());
                }

                private void submitEvent(MembershipEventType type,
                        UUID id) {
                    membershipListener.handleMembershipEvent(
                            new com.vaadin.collaborationengine.MembershipEvent(
                                    type, id, getCollaborationEngine()));
                }
            });
    return () -> hz.getCluster().removeMembershipListener(registrationId);
}
Note
Registration Object
The addMembershipListener returns a Registration object that can be used later to remove the listener.

Feature Flag

To use the Collaboration Kit backend for clustering support, you must enable it with the collaborationEngineBackend feature flag.

See Feature Flags for instructions on how to do this.

AB472607-53E3-481D-AF99-93E3F6ED8B61