DistributedSet

The DistributedSet primitive is an implementation of Java’s Set collection. Distributed sets can be configured for replication using a variety of protocols ranging from strongly consistent consensus to eventually consistent gossip protocols.

DistributedSet is an extension of DistributedCollection and supports event-based notifications of changes to the set. Clients can listen for add/remove events by registering event listeners on a distributed set.

Finally, DistributedSet supports lazy iteration and Java 8’s Streams.

Configuration

The DistributedSet can be configured programmatically using the DistributedSetBuilder. To create a new set builder, use the setBuilder method, passing the name of the set to construct:

DistributedSetBuilder<String> setBuilder = atomix.<String>setBuilder("my-set");

The set can be configured with a protocol to use to replicate changes. DistributedSet supports both strongly consistent and eventually consistent replication protocols:

Distributed sets are not ordered, so they are partitioned among all partitions in a partition group.

MultiRaftProtocol protocol = MultiRaftProtocol.builder()
  .withReadConsistency(ReadConsistency.LINEARIZABLE)
  .build();

Set<String> set = atomix.<String>setBuilder("my-set")
  .withProtocol(protocol)
  .build();

The generic parameter in the set configuration is the element type. By default, arbitrary element types may be used. However, when non-standard types are used, class names will be serialized with elements, and the thread context class loader will be used to load classes from names. To avoid serializing class names, register an element type via withElementType. Class-based serialization can also be disabled via withRegistrationRequired().

Set<Foo> set = atomix.<Foo>setBuilder("my-set")
  .withProtocol(protocol)
  .withElementType(Foo.class)
  .build();

Sets support caching. When caching is enabled, the set will transparently listen for change events and update a local cache. To enable caching, use withCacheEnabled():

Set<String> set = atomix.<String>setBuilder("my-set")
  .withProtocol(protocol)
  .withCacheEnabled()
  .withCacheSize(1000)
  .build();

A set can also be constructed in read-only mode using withReadOnly():

Set<String> set = atomix.<String>setBuilder("my-set")
  .withProtocol(protocol)
  .withReadOnly()
  .build();

Sets can be configured in configuration files. To configure an set primitive, use the set primitive type:

atomix.conf

primitives.my-set {
  type: set
  cache.enabled: true
  protocol {
    type: multi-raft
    group: raft
    read-consistency: linearizable
  }
}

To get an instance of the pre-configured set, use the getSet method:

Set<String> set = atomix.getSet("my-set");

The set’s protocol and configuration will be loaded from the Atomix configuration files.

Operation

The DistributedSet supports most of the same operations as Java’s core Set. All operations performed on the set are guaranteed to be atomic. Beyond that atomicity guarantee, the consistency guarantees of read and write operations are specified by the configured protocol.

Set<String> set = atomix.<String>setBuilder("my-set")
  .withProtocol(MultiRaftProtocol.builder()
    .withReadConsistency(ReadConsistency.LINEARIZABLE)
    .build())
  .build();

if (set.add("foo")) {
  ...
}

Asynchronous Operations

As with all Atomix primitives, an asynchronous analogue of the set API - AsyncDistributedSet - can be retrieved by calling the async() method:

// Get a synchronous DistributedSet
DistributedSet<String> set = atomix.getSet("my-set");

// Get the underlying asynchronous DistributedSet
AsyncDistributedSet<String> asyncSet = set.async();

asyncSet.add("foo").thenAccept(succeeded -> {
  ...
});

The asynchronous API uses CompletableFutures to notify the client once an operation is complete. The thread model provided by all Atomix protocols guarantees CompletableFuture callbacks will always be executed on the same thread unless a thread is blocked by a prior primitive operation. Additionally, CompletableFutures will be completed in program order. In other words, if an operation A was performed before operation B on the client, the future for operation A will always be completed before the future for operation B.

Event Notifications

DistributedSet supports publishing event notifications to client listeners. This allows clients to react to insert, update, and remove operations on the set. To add a listener to a set, simply register the listener via addListener:

set.addListener(event -> {
  ...
});

When state machine-based protocols (i.e. Raft or primary-backup) are used, Atomix guarantees that events will be received in the order in which they occurred inside replicated state machines, and event listeners will be called on an Atomix event thread. For eventually consistent protocols (i.e. anti-entropy or CRDT), the guarantees of set events are specific to each protocol implementation.

Users can optionally provide a custom executor on which to call the event listener:

Executor executor = Executors.newSingleThreadExecutor();
set.addListener(event -> {
  ...
}, executor);

Custom executors can change the ordering of events. It’s recommended that single thread executors be used to preserve order. Multi-threaded executors cannot provide the same guarantees as are provided by Atomix event threads or single thread executors.

The event listener will be called with an DistributedCollectionEvent instance. Each event in Atomix has an associated type which can be read via the type() method. To determine the type of modification that took place, use a switch statement:

switch (event.type()) {
  case ADD:
    ...
    break;
  case REMOVE:
    ...
    break;
}

The DistributedCollectionEvent provides the added/removed element.

String value = event.element();

The Atomix thread model allows for event listeners to make blocking calls on primitives within event threads. So, in response to an update event, an event listener can e.g. call add on the same set:

// re-add entries that are removed from the set
set.addListener(event -> {
  if (event.type() == DistributedCollectionEvent.Type.REMOVE) {
    set.add(event.element());
  }
});

When performing blocking operations (any operation on a synchronous primitive) within an event threads, additional events and futures will be completed on a background thread pool. This means ordering guarantees are inherently relaxed when event threads are blocked.

Iterators

DistributedSet supports lazy iterators:

for (String value : set) {
  ...
}

Iterators are implemented by lazily fetching batches of set elements from the partition as the elements are iterated. Once a primitive iterator has been created, it must either be exhausted or explicitly close()d to ensure the resources used to track the iterator state is cleaned up.

Iterator<String> iterator = set.iterator();

try {
  String value = iterator.next();
} finally {
  iterator.close();
}

Failing to exhaust or explicitly close frequently created primitive iterators may cause a memory leak.

Just as with typical synchronous primitives, the iterators provided for Atomix primitives are backed by an asynchronous implementation called AsyncIterator, and the asynchronous backing iterator can be retrieved via the async() method:

AsyncIterator<String> asyncIterator = set.async().iterator();

Streams

The implementation of lazy iterators also allows the set to support Java 8 Streams:

Set<String> fooValues = set.stream()
  .filter(value -> value.contains("foo"))
  .collect(Collectors.toSet());

Cleanup

While a set is in use, Atomix may consume some network, memory, and disk resources to manage the set. To free up those resources, users should call the close() method to allow Atomix to garbage collect the instance.

set.close();