Akka cluster, guts
I want to somehow document my findings about akka cluster and decided to do a series of blog posts about it. This post skips roles and datacenters. While they play important role, we want to focus on the basics first.
what is akka cluster
akka-cluster enables clustering capabilities for akka. Like
- actor singletons - a single actor in the cluster
- actors sharding - sharding of actors through the cluster with single instance warranty
- distributed pub-sub - multicasting messaging capabilities
- distributed data - CRDTs
per project overview
For akka 2.5.6, 7eeb9b5210b626a673f6c6aa90605c5a911c2af7 we got next packages
- akka-cluster - cluster akka
ActorSystemextension, cluster members managing capabilities
- akka-cluster-tools - distributed pub-sub and singletons implementations
- akka-sharding - actors sharding implementation
- akka-distributed-data - CRDTs,
- akka-cluster-typed - a typed wrapper over akka-cluster
- akka-sharding-typed - same as akka-sharding, but for typed actors
Lets start with a foundation - akka-cluster. All other things are built on top of it. Akka cluster handles cluster membership with total order by age among it's members, so it can perform leader-election like thingy - picking leader to be the oldest member. Akka-cluster is built on top of akka-actors and akka-remote.
cluster membership, highlevel overview
Akka cluster organizes it's members into ordered (by age) ring, in which information about members(cluster state overview) is exchanged via gossip with conflict detection via vector clocks.
Cluster member can only join to cluster member that is in
Up state. To be
Up cluster member should be joined somewhere.
Cluster member on startup issues
Join requests towards pre-configured list of seed nodes. The first node in list of seed nodes should be address of self.
If none of seed-nodes(without first element - self) replies - node joins itself, goes to UP state and therefor forms a new cluster.
Other nodes then can issue
Join requests towards it and be replied with
The joining node gets into cluster state with
Joining status and this state is gossiped towards other nodes.
When all members have seen this gossip - leader can promote node into
Node age is determined by
Member.upNumber, it's not a clock generated timestamp, but a some variation of logical timestamp.
Member.upNumber got assigned to member when leader moves it from
Up (happens in the
Leader is defined as oldest node among reachable members. It's not classical leader election in a sence that several leaders can exist(and perform actions) at the same time (members have have different
GossipOverview.reachability). However eventually all nodes will reach convergence upon who leader is.
Leader can perform status changes for members - which results in new gossips. It waits for gossip convergence for promoting node from intermediate state into next.
Each gossip contains member states, version(vector clock), reachability and seen.
- members - member info together with state. State in the cluster is represented via next FSM diagram that goes between
- version - vector clock of this gossip
- seen - set of members who saw this gossip
- reachability - overview of reachability
Any node can mark any other node as
Leader can promote node from this state into
Removed when gossip convergence was reached.
Gossip convergence is when
- leader receives a gossip that contains all the nodes in the seen set.
- this gossip indicates that all nodes that are
Leaving can see each other(reachability is full)
This means that every node saw that gossip and acknowledged it as non conflicting one.
- node leader waits for all members to see the member in joining state (when all members will put themselvs into seen of gossip).
- then he promotes joining node from
Up and issues a new gossip with updated version of joining member.
Akka cluster uses
UniqueAddress to identify it's members. This address contains regular network address together with randomly generated pid.
This for example allows cluster to recognize when node issues another
Join request that it was a restart, and not delayed message - allowing this node to participate in the cluster again.
Like many distributed systems akka cluster uses heartbeating to detect unreachable nodes, or phi-accural detector to be more precise.
Each node (subject) got heaartbeated periodically by N other nodes(observers). If at least one node considers node as unreachable - the node is considered unreachable in all computations(leader, oldest, etc).
Each gossip contains reachability, which is structure describing can one node (observer) reach other node (subject).
It has somewhat complicated versioning - in order to perform proper merging of 2 different gossips, which includes merging of
records - sequence of
Reachability also contains
versions - a vector clock like thingy -
Map[UniqueAddress, Long]. Where
UniqueAddress is address of the observer. And long - is version number.
Reachability.Record contains version stamp - when it was created/changed (
I have not found where
Reachability.Record.version is used. I suspect it is currently used only for debug purposes.
When gossip encounters conflicting gossip - it merges 2 reachabilities through
That one selects newest version of records for each observer (according to