Atomix provides a variety of services that can be used for direct and publish-subscribe style communication. Underlying each of the communication abstractions is Netty, which is used for all inter-cluster communication. Direct messaging is performed through the ClusterCommunicationService
interface. The ClusterCommunicationService
supports unicast, multicast, broadcast, and request-reply messaging patterns.
An important concept for inter-cluster communication is the message subject. Subjects are arbitrary strings that indicate the type of message being sent. Every message that’s sent by one node and received by another must be identified by a subject. This allows senders and receivers to filter relevant messages.
Messages are received on subscribers registered with the subscribe
message:
atomix.getCommunicationService().subscribe("test", message -> {
return CompletableFuture.completedFuture(message);
});
Three types of subscribers can be registered:
Executor
on which to consume messagesCompletableFuture
Executor
on which to consume messagesAdditionally, serializers can be provided for custom object types.
As noted above, messages can be sent using a variety of different communication patterns:
unicast
sends a message to a single peer without awaiting a responsemulticast
sends a message to a set of members without awaiting any responsesbroadcast
sends a message to all members known to the local ClusterMembershipService
without awaiting any responsessend
sends a direct message to a peer and awaits a response via CompletableFuture
// Send a request-reply message to node "foo"
atomix.getCommunicationService().send("test", "Hello world!", MemberId.from("foo")).thenAccept(response -> {
System.out.println("Received " + response);
});
The ClusterCommunicationService
uses a default serializer to serialize a variety of core data structures, but often custom objects need to be communicated across the wire. The messaging service provides overloaded methods for providing arbitrary message encoders/decoders for requests/replies:
Serializer serializer = Serializer.using(Namespace.builder()
.register(Namespaces.BASIC)
.register(MemberId.class)
.register(ClusterHeartbeat.class)
.build());
ClusterHeartbeat heartbeat = new ClusterHeartbeat(atomix.getMembershipService().getLocalMember().id());
atomix.getCommunicationService().broadcast("test", heartbeat, serializer::encode);