Riak is a distributed, fault tolerant, open source database that illustrates how to build large scale systems using Erlang/OTP. Thanks in large part to Erlang's support for massively scalable distributed systems, Riak offers features that are uncommon in databases, such as high-availability and linear scalability of both capacity and throughput.
Erlang/OTP provides an ideal platform for developing systems like Riak because it provides inter-node communication, message queues, failure detectors, and client-server abstractions out of the box. What's more, most frequently-used patterns in Erlang have been implemented in library modules, commonly referred to as OTP behaviors. They contain the generic code framework for concurrency and error handling, simplifying concurrent programming and protecting the developer from many common pitfalls. Behaviors are monitored by supervisors, themselves a behavior, and grouped together in supervision trees. A supervision tree is packaged in an application, creating a building block of an Erlang program.
A complete Erlang system such as Riak is a set of loosely coupled applications that interact with each other. Some of these applications have been written by the developer, some are part of the standard Erlang/OTP distribution, and some may be other open source components. They are sequentially loaded and started by a boot script generated from a list of applications and versions.
What differs among systems are the applications that are part
of the release which is started. In the standard Erlang distribution,
the boot files will start the Kernel and StdLib
(Standard Library) applications. In some installations, the
SASL (Systems Architecture Support Library) application is also
started. SASL contains release and software upgrade tools together
with logging capabilities. Riak is no different, other than starting
the Riak specific applications as well as their runtime dependencies,
which include Kernel, StdLib and SASL. A complete and
ready-to-run build of Riak actually embeds these standard elements of
the Erlang/OTP distribution and starts them all in unison when
riak start
is invoked on the command line. Riak consists of
many complex applications, so this chapter should not be
interpreted as a complete guide. It should be seen as an introduction
to OTP where examples from the Riak source code are used. The figures
and examples have been abbreviated and shortened for demonstration
purposes.
Erlang is a concurrent functional programming language that compiles to byte code and runs in a virtual machine. Programs consist of functions that call each other, often resulting in side effects such as inter-process message passing, I/O and database operations. Erlang variables are single assignment, i.e., once they have been given values, they cannot be updated. The language makes extensive use of pattern matching, as shown in the factorial example below:
-module(factorial). -export([fac/1]). fac(0) -> 1; fac(N) when N>0 -> Prev = fac(N-1), N*Prev.
Here, the first clause gives the factorial of zero, the second factorials of positive numbers. The body of each clause is a sequence of expressions, and the final expression in the body is the result of that clause. Calling the function with a negative number will result in a run time error, as none of the clauses match. Not handling this case is an example of non-defensive programming, a practice encouraged in Erlang.
Within the module, functions are called in the usual way; outside, the
name of the module is prepended, as in factorial:fac(3)
. It
is possible to define functions with the same name but different
numbers of arguments—this is called their arity. In the export
directive in the factorial
module the fac
function of
arity one is denoted by fac/1
.
Erlang supports tuples (also called product types) and lists. Tuples
are enclosed in curly brackets, as in {ok,37}
. In tuples, we
access elements by position. Records are another data type; they allow
us to store a fixed number of elements which are then accessed and
manipulated by name. We define a record using the -record(state,
{id, msg_list=[]}).
To create an instance, we use the
expression Var = #state{id=1}
, and we examine its contents
using Var#state.id
. For a variable number of elements, we use
lists defined in square brackets such as in {[}23,34{]}
. The
notation {[}X|Xs{]}
matches a non-empty list with head X
and tail Xs
. Identifiers beginning with a lower case letter
denote atoms, which simply stand for themselves; the ok
in the
tuple {ok,37}
is an example of an atom. Atoms used in this
way are often used to distinguish between different kinds of function
result: as well as ok
results, there might be results of the
form {error, "Error String"}
.
Processes in Erlang systems run concurrently in separate memory, and communicate with each other by message passing. Processes can be used for a wealth of applications, including gateways to databases, as handlers for protocol stacks, and to manage the logging of trace messages from other processes. Although these processes handle different requests, there will be similarities in how these requests are handled.
As processes exist only within the virtual machine, a single VM can simultaneously run millions of processes, a feature Riak exploits extensively. For example, each request to the database—reads, writes, and deletes—is modeled as a separate process, an approach that would not be possible with most OS-level threading implementations.
Processes are identified by process identifiers, called PIDs, but
they can also be registered under an alias; this should only be used
for long-lived "static" processes. Registering a process with its
alias allows other processes to send it messages without knowing its
PID. Processes are created using the spawn(Module, Function,
Arguments)
built-in function (BIF). BIFs are functions integrated
in the VM and used to do what is impossible or slow to execute in pure
Erlang. The spawn/3
BIF takes a Module
, a
Function
and a list of Arguments
as parameters. The call
returns the PID of the newly spawned process and as a side effect,
creates a new process that starts executing the function in the module
with the arguments mentioned earlier.
A message Msg
is sent to a process with process id Pid
using Pid ! Msg
. A process can find out its PID by calling the
BIF self
, and this can then be sent to other processes for them
to use to communicate with the original process. Suppose that a
process expects to receive messages of the form {ok, N}
and
{error, Reason}
. To process these it uses a receive
statement:
receive {ok, N} -> N+1; {error, _} -> 0 end
The result of this is a number determined by the pattern-matched clause. When the value of a variable is not needed in the pattern match, the underscore wild-card can be used as shown above.
Message passing between processes is asynchronous, and the messages
received by a process are placed in the process's mailbox in the order
in which they arrive. Suppose that now the receive
expression
above is to be executed: if the first element in the mailbox is either
{ok, N}
or {error, Reason}
the corresponding result
will be returned. If the first message in the mailbox is not of this
form, it is retained in the mailbox and the second is processed in a
similar way. If no message matches, the receive will wait for a
matching message to be received.
Processes terminate for two reasons. If there is no more code to
execute, they are said to terminate with reason normal. If a
process encounters a run-time error, it is said to terminate with a
non-normal reason. A process terminating will not affect other
processes unless they are linked to it. Processes can link to each
other through the link(Pid)
BIF or when calling the
spawn_link(Module, Function, Arguments)
. If a process
terminates, it sends an EXIT signal to processes in its link set. If
the termination reason is non-normal, the process terminates itself,
propagating the EXIT signal further. By calling the
process_flag(trap_exit, true)
BIF, processes can receive the
EXIT signals as Erlang messages in their mailbox instead of
terminating.
Riak uses EXIT signals to monitor the well-being of helper processes performing non-critical work initiated by the request-driving finite state machines. When these helper processes terminate abnormally, the EXIT signal allows the parent to either ignore the error or restart the process.
We previously introduced the notion that processes follow a common
pattern regardless of the particular purpose for which the process was
created. To start off, a process has to be spawned and then,
optionally, have its alias registered. The first action of the newly
spawned process is to initialize the process loop data. The loop data
is often the result of arguments passed to the spawn
built-in
function at the initialization of the process. Its loop data is stored
in a variable we refer to as the process state. The state, often
stored in a record, is passed to a receive-evaluate function, running
a loop which receives a message, handles it, updates the state, and
passes it back as an argument to a tail-recursive call. If one of the
messages it handles is a `stop' message, the receiving process will
clean up after itself and then terminate.
This is a recurring theme among processes that will occur regardless of the task the process has been assigned to perform. With this in mind, let's look at the differences between the processes that conform to this pattern:
spawn
BIF calls will differ
from one process to another.So, even if a skeleton of generic actions exists, these actions are complemented by specific ones that are directly related to the tasks assigned to the process. Using this skeleton as a template, programmers can create Erlang processes that act as servers, finite state machines, event handlers and supervisors. But instead of re-implementing these patterns every time, they have been placed in library modules referred to as behaviors. They come as part as the OTP middleware.
The core team of developers committing to Riak is spread across nearly a dozen geographical locations. Without very tight coordination and templates to work from, the result would consist of different client/server implementations not handling special borderline cases and concurrency-related errors. There would probably be no uniform way to handle client and server crashes or guaranteeing that a response from a request is indeed the response, and not just any message that conforms to the internal message protocol.
OTP is a set of Erlang libraries and design principles providing ready-made tools with which to develop robust systems. Many of these patterns and libraries are provided in the form of "behaviors."
OTP behaviors address these issues by providing library modules that implement the most common concurrent design patterns. Behind the scenes, without the programmer having to be aware of it, the library modules ensure that errors and special cases are handled in a consistent way. As a result, OTP behaviors provide a set of standardized building blocks used in designing and building industrial-grade systems.
OTP behaviors are provided as library modules in the stdlib
application which comes as part of the Erlang/OTP distribution. The
specific code, written by the programmer, is placed in a separate
module and called through a set of predefined callback functions
standardized for each behavior. This callback module will contain all
of the specific code required to deliver the desired functionality.
OTP behaviors include worker processes, which do the actual processing, and supervisors, whose task is to monitor workers and other supervisors. Worker behaviors, often denoted in diagrams as circles, include servers, event handlers, and finite state machines. Supervisors, denoted in illustrations as squares, monitor their children, both workers and other supervisors, creating what is called a supervision tree.
Figure 15.1: OTP Riak Supervision Tree
Supervision trees are packaged into a behavior called an application. OTP applications are not only the building blocks of Erlang systems, but are also a way to package reusable components. Industrial-grade systems like Riak consist of a set of loosely coupled, possibly distributed applications. Some of these applications are part of the standard Erlang distribution and some are the pieces that make up the specific functionality of Riak.
Examples of OTP applications include the Corba ORB or the Simple Network Management Protocol (SNMP) agent. An OTP application is a reusable component that packages library modules together with supervisor and worker processes. From now on, when we refer to an application, we will mean an OTP application.
The behavior modules contain all of the generic code for each given behavior type. Although it is possible to implement your own behavior module, doing so is rare because the ones that come with the Erlang/OTP distribution will cater to most of the design patterns you would use in your code. The generic functionality provided in a behavior module includes operations such as:
The loop data is a variable that will contain the data the behavior needs to store in between calls. After the call, an updated variant of the loop data is returned. This updated loop data, often referred to as the new loop data, is passed as an argument in the next call. Loop data is also often referred to as the behavior state.
The functionality to be included in the callback module for the generic server application to deliver the specific required behavior includes the following:
Generic servers that implement client/server behaviors are defined in
the gen_server
behavior that comes as part of the standard
library application. In explaining generic servers, we will use the
riak_core_node_watcher.erl
module from the riak_core
application. It is a server that tracks and reports on which
sub-services and nodes in a Riak cluster are available. The module
headers and directives are as follows:
-module(riak_core_node_watcher). -behavior(gen_server). %% API -export([start_link/0,service_up/2,service_down/1,node_up/0,node_down/0,services/0, services/1,nodes/1,avsn/0]). %% gen_server callbacks -export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2, code_change/3]). -record(state, {status=up, services=[], peers=[], avsn=0, bcast_tref, bcast_mod={gen_server, abcast}}).
We can easily recognize generic servers through the
-behavior(gen_server).
directive. This directive is used by
the compiler to ensure all callback functions are properly
exported. The record state is used in the server loop data.
With the gen_server
behavior, instead of using the
spawn
and spawn_link
BIFs, you will use the
gen_server:start
and gen_server:start_link
functions. The main difference between spawn
and start
is the synchronous nature of the call. Using start
instead of
spawn
makes starting the worker process more deterministic and
prevents unforeseen race conditions, as the call will not return the
PID of the worker until it has been initialized. You call the
functions with either of:
gen_server:start_link(ServerName, CallbackModule, Arguments, Options) gen_server:start_link(CallbackModule, Arguments, Options)
ServerName
is a tuple of the format {local, Name}
or
{global, Name}
, denoting a local or global Name
for the
process alias if it is to be registered. Global names allow servers
to be transparently accessed across a cluster of distributed Erlang
nodes. If you do not want to register the process and instead
reference it using its PID, you omit the argument and use a
start_link/3
or start/3
function call
instead. CallbackModule
is the name of the module in which
the specific callback functions are placed, Arguments
is a
valid Erlang term that is passed to the init/1
callback
function, while Options
is a list that allows you to set the
memory management flags fullsweep_after
and heapsize
,
as well as other tracing and debugging flags.
In our example, we call start_link/4
, registering the process
with the same name as the callback module, using the ?MODULE
macro call. This macro is expanded to the name of the module it is
defined in by the preprocessor when compiling the code. It is always
good practice to name your behavior with an alias that is the same as
the callback module it is implemented in. We don't pass any arguments,
and as a result, just send the empty list. The options list is kept
empty:
start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
The obvious difference between the start_link
and start
functions is that start_link
links to its parent, most often a
supervisor, while start
doesn't. This needs a special mention
as it is an OTP behavior's responsibility to link itself to the
supervisor. The start
functions are often used when testing
behaviors from the shell, as a typing error causing the shell process
to crash would not affect the behavior. All variants of the
start
and start_link
functions return {ok, Pid}
.
The start
and start_link
functions will spawn a new
process that calls the init(Arguments)
callback function in the
CallbackModule
, with the Arguments
supplied. The
init
function must initialize the LoopData
of the server
and has to return a tuple of the format {ok,
LoopData}
. LoopData
contains the first instance of the loop
data that will be passed between the callback functions. If you want
to store some of the arguments you passed to the init
function, you
would do so in the LoopData
variable. The LoopData
in
the Riak node watcher server is the result of the
schedule_broadcast/1
called with a record of type state
where the fields are set to the default values:
init([]) -> %% Watch for node up/down events net_kernel:monitor_nodes(true), %% Setup ETS table to track node status ets:new(?MODULE, [protected, named_table]), {ok, schedule_broadcast(#state{})}.
Although the supervisor process might call the start_link/4
function, a different process calls the init/1
callback: the
one that was just spawned. As the purpose of this server is to
notice, record, and broadcast the availability of sub-services within
Riak, the initialization asks the Erlang runtime to notify it of such
events, and sets up a table to store this information in. This needs
to be done during initialization, as any calls to the server would
fail if that structure did not yet exist. Do only what is necessary
and minimize the operations in your init
function, as the call
to init
is a synchronous call that prevents all of the other
serialized processes from starting until it returns.
If you want to send a synchronous message to your server, you use the
gen_server:call/2
function. Asynchronous calls are made using
the gen_server:cast/2
function. Let's start by taking two
functions from Riak's service API; we will provide the rest of the
code later. They are called by the client process and result in a
synchronous message being sent to the server process registered with
the same name as the callback module. Note that validating the data
sent to the server should occur on the client side. If the client
sends incorrect information, the server should terminate.
service_up(Id, Pid) -> gen_server:call(?MODULE, {service_up, Id, Pid}). service_down(Id) -> gen_server:call(?MODULE, {service_down, Id}).
Upon receiving the messages, the gen_server
process calls the
handle_call/3
callback function dealing with the messages in
the same order in which they were sent:
handle_call({service_up, Id, Pid}, _From, State) -> %% Update the set of active services locally Services = ordsets:add_element(Id, State#state.services), S2 = State#state { services = Services }, %% Remove any existing mrefs for this service delete_service_mref(Id), %% Setup a monitor for the Pid representing this service Mref = erlang:monitor(process, Pid), erlang:put(Mref, Id), erlang:put(Id, Mref), %% Update our local ETS table and broadcast S3 = local_update(S2), {reply, ok, update_avsn(S3)}; handle_call({service_down, Id}, _From, State) -> %% Update the set of active services locally Services = ordsets:del_element(Id, State#state.services), S2 = State#state { services = Services }, %% Remove any existing mrefs for this service delete_service_mref(Id), %% Update local ETS table and broadcast S3 = local_update(S2), {reply, ok, update_avsn(S3)};
Note the return value of the callback function. The tuple contains the
control atom reply
, telling the gen_server
generic code
that the second element of the tuple (which in both of these cases is
the atom ok
) is the reply sent back to the client. The third
element of the tuple is the new State
, which, in a new
iteration of the server, is passed as the third argument to the
handle_call/3
function; in both cases here it is updated to
reflect the new set of available services. The argument _From
is a tuple containing a unique message reference and the client
process identifier. The tuple as a whole is used in library functions
that we will not be discussing in this chapter. In the majority of
cases, you will not need it.
The gen_server
library module has a number of mechanisms and
safeguards built in that operate behind the scenes. If your client
sends a synchronous message to your server and you do not get a
response within five seconds, the process executing the call/2
function is terminated. You can override this by using
gen_server:call(Name, Message, Timeout)
where Timeout
is a value in milliseconds or the atom infinity
.
The timeout mechanism was originally put in place for deadlock
prevention purposes, ensuring that servers that accidentally call each
other are terminated after the default timeout. The crash report would
be logged, and hopefully would result in the error being debugged and
fixed. Most applications will function appropriately with a timeout of
five seconds, but under very heavy loads, you might have to fine-tune
the value and possibly even use infinity
; this choice is
application-dependent. All of the critical code in Erlang/OTP uses
infinity
. Various places in Riak use different values for the
timeout: infinity
is common between coupled pieces of the
internals, while Timeout
is set based on a user-passed
parameter in cases where the client code talking to Riak has specified
that an operation should be allowed to time out.
Other safeguards when using the gen_server:call/2
function
include the case of sending a message to a nonexistent server and
the case of a server crashing before sending its reply. In
both cases, the calling process will terminate. In raw Erlang, sending
a message that is never pattern-matched in a receive clause is a bug
that can cause a memory leak. Two different strategies are used in
Riak to mitigate this, both of which involve "catchall" matching
clauses. In places where the message might be user-initiated, an
unmatched message might be silently discarded. In places where such a
message could only come from Riak's internals, it represents a bug and
so will be used to trigger an error-alerting internal crash report,
restarting the worker process that received it.
Sending asynchronous messages works in a similar way. Messages are
sent asynchronously to the generic server and handled in the
handle_cast/2
callback function. The function has to return a
tuple of the format {reply, NewState}
. Asynchronous calls are
used when we are not interested in the request of the server and are
not worried about producing more messages than the server can
consume. In cases where we are not interested in a response but want
to wait until the message has been handled before sending the next
request, we would use a gen_server:call/2
, returning the atom
ok
in the reply. Picture a process generating database entries
at a faster rate than Riak can consume. By using asynchronous calls,
we risk filling up the process mailbox and make the node run out of
memory. Riak uses the message-serializing properties of synchronous
gen_server
calls to regulate load, processing the next request
only when the previous one has been handled. This approach eliminates
the need for more complex throttling code: in addition to enabling
concurrency, gen_server
processes can also be used to
introduce serialization points.
How do you stop the server? In your handle_call/3
and
handle_cast/2
callback functions, instead of returning
{reply, Reply, NewState}
or {noreply, NewState}
, you
can return {stop, Reason, Reply, NewState}
or {stop,
Reason, NewState}
, respectively. Something has to trigger this
return value, often a stop message sent to the server. Upon
receiving the stop tuple containing the Reason
and
State
, the generic code executes the terminate(Reason,
State)
callback.
The terminate
function is the natural place to insert the code
needed to clean up the State
of the server and any other
persistent data used by the system. In our example, we send out one
last message to our peers so that they know that this node watcher is
no longer up and watching. In this example, the variable State
contains a record with the fields status
and peers
:
terminate(_Reason, State) -> %% Let our peers know that we are shutting down broadcast(State#state.peers, State#state { status = down }).
Use of the behavior callbacks as library functions and invoking them
from other parts of your program is an extremely bad practice. For
example, you should never call
riak_core_node_watcher:init(Args)
from another module to
retrieve the initial loop data. Such retrievals should be done through
a synchronous call to the server. Calls to behavior callback functions
should originate only from the behavior library modules as a result of
an event occurring in the system, and never directly by the user.
A large number of other worker behaviors can and have been implemented using these same ideas.
Finite state machines (FSMs), implemented in the gen_fsm
behavior
module, are a crucial component when implementing protocol stacks in
telecom systems (the problem domain Erlang was originally invented
for). States are defined as callback functions named after the state
that return a tuple containing the next State
and the updated
loop data. You can send events to these states synchronously and
asynchronously. The finite state machine callback module should also
export the standard callback functions such as init
,
terminate
, and handle_info
.
Of course, finite state machines are not telecom specific. In Riak,
they are used in the request handlers. When a client issues a request
such as get
, put
, or delete
, the process
listening to that request will spawn a process implementing the
corresponding gen_fsm
behavior. For instance, the
riak_kv_get_fsm
is responsible for handling a get
request, retrieving data and sending it out to the client process. The
FSM process will pass through various states as it determines which
nodes to ask for the data, as it sends out messages to those nodes, and as
it receives data, errors, or timeouts in response.
Event handlers and managers are another behavior implemented in the
gen_event
library module. The idea is to create a centralized
point that receives events of a specific kind. Events can be sent
synchronously and asynchronously with a predefined set of actions
being applied when they are received. Possible responses to events
include logging them to file, sending off an alarm in the form of an
SMS, or collecting statistics. Each of these actions is defined in a
separate callback module with its own loop data, preserved between
calls. Handlers can be added, removed, or updated for every specific
event manager. So, in practice, for every event manager there could
be many callback modules, and different instances of these callback
modules could exist in different managers. Event handlers include
processes receiving alarms, live trace data, equipment related events
or simple logs.
One of the uses for the gen_event
behavior in Riak is for
managing subscriptions to "ring events", i.e., changes to the
membership or partition assignment of a Riak cluster. Processes on a
Riak node can register a function in an instance of
riak_core_ring_events
, which implements the
gen_event
behavior. Whenever the central process managing the
ring for that node changes the membership record for the overall
cluster, it fires off an event that causes each of those callback
modules to call the registered function. In this fashion, it is
easy for various parts of Riak to respond to changes in one of Riak's
most central data structures without having to add complexity to the
central management of that structure.
Most common concurrency and communication patterns are handled with
the three primary behaviors we've just discussed: gen_server
,
gen_fsm
, and gen_event
. However, in large systems,
some application-specific patterns emerge over time that warrant the
creation of new behaviors. Riak includes one such behavior,
riak_core_vnode
, which formalizes how virtual nodes are
implemented. Virtual nodes are the primary storage abstraction in
Riak, exposing a uniform interface for key-value storage to the
request-driving FSMs. The interface for callback modules is specified
using the behavior_info/1
function, as follows:
behavior_info(callbacks) -> [{init,1}, {handle_command,3}, {handoff_starting,2}, {handoff_cancelled,1}, {handoff_finished,2}, {handle_handoff_command,3}, {handle_handoff_data,2}, {encode_handoff_item,2}, {is_empty,1}, {terminate,2}, {delete,1}];
The above example shows the behavior_info/1
function from
riak_core_vnode
. The list of {CallbackFunction,
Arity}
tuples defines the contract that callback modules must
follow. Concrete virtual node implementations must export these
functions, or the compiler will emit a warning. Implementing your own
OTP behaviors is relatively straightforward. Alongside defining your
callback functions, using the proc_lib
and sys
modules,
you need to start them with particular functions, handle system
messages and monitor the parent in case it terminates.
The supervisor behavior's task is to monitor its children and, based
on some preconfigured rules, take action when they terminate. Children
consist of both supervisors and worker processes. This allows the Riak
codebase to focus on the correct case, which enables the supervisor to
handle software bugs, corrupt data or system errors in a consistent
way across the whole system. In the Erlang world, this non-defensive
programming approach is often referred to the "let it crash"
strategy. The children that make up the supervision tree can include
both supervisors and worker processes. Worker processes are OTP
behaviors including the gen_fsm
, gen_server
, and
gen_event
. The Riak team, not having to handle borderline
error cases, get to work with a smaller code base. This code base,
because of its use of behaviors, is smaller to start off with, as it
only deals with specific code. Riak has a top-level supervisor like
most Erlang applications, and also has sub-supervisors for groups
of processes with related responsibilities. Examples include Riak's
virtual nodes, TCP socket listeners, and query-response managers.
To demonstrate how the supervisor behavior is implemented, we will use
the riak_core_sup.erl
module. The Riak core supervisor is the
top level supervisor of the Riak core application. It starts a set of
static workers and supervisors, together with a dynamic number of
workers handling the HTTP and HTTPS bindings of the node's RESTful API
defined in application specific configuration files. In a similar way
to gen_servers
, all supervisor callback modules must include
the -behavior(supervisor).
directive. They are started using
the start
or start_link
functions which take the
optional ServerName
, the CallBackModule
, and an
Argument
which is passed to the init/1
callback
function.
Looking at the first few lines of code in the
riak_core_sup.erl
module, alongside the behavior directive
and a macro we will describe later, we notice the start_link/3
function:
-module(riak_core_sup). -behavior(supervisor). %% API -export([start_link/0]). %% Supervisor callbacks -export([init/1]). -define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []).
Starting a supervisor will result in a new process being spawned, and
the init/1
callback function being called in the callback
module riak_core_sup.erl
. The ServerName
is a tuple of
the format {local, Name}
or {global, Name}
, where
Name
is the supervisor's registered name. In our example, both
the registered name and the callback module are the atom
riak_core_sup
, originating form the ?MODULE
macro. We
pass the empty list as an argument to init/1
, treating it as a
null value. The init
function is the only supervisor callback
function. It has to return a tuple with format:
{ok, {SupervisorSpecification, ChildSpecificationList}}
where SupervisorSpecification
is a 3-tuple
{RestartStrategy, AllowedRestarts, MaxSeconds}
containing
information on how to handle process crashes and
restarts. RestartStrategy
is one of three configuration
parameters determining how the behavior's siblings are affected upon
abnormal termination:
one_for_one
: other processes in the supervision tree
are not affected.rest_for_one
: processes started after the terminating
process are terminated and restarted.one_for_all
: all processes are terminated and restarted.AllowedRestarts
states how many times any of the supervisor
children may terminate in MaxSeconds
before the supervisor
terminates itself (and its children). When ones terminates,
it sends an EXIT signal to its
supervisor which, based on its restart strategy, handles the termination
accordingly. The supervisor terminating after reaching the maximum
allowed restarts ensures that cyclic restarts and other issues that
cannot be resolved at this level are escalated. Chances are that the
issue is in a process located in a different sub-tree, allowing the
supervisor receiving the escalation to terminate the affected sub-tree
and restart it.
Examining the last line of the init/1
callback function in the
riak_core_sup.erl
module, we notice that this particular
supervisor has a one-for-one strategy, meaning that the processes are
independent of each other. The supervisor will allow a maximum of ten
restarts before restarting itself.
ChildSpecificationList
specifies which children the supervisor
has to start and monitor, together with information on how to
terminate and restart them. It consists of a list of tuples of the
following format:
{Id, {Module, Function, Arguments}, Restart, Shutdown, Type, ModuleList}
Id
is a unique identifier for that particular
supervisor. Module
, Function
, and Arguments
is an
exported function which results in the behavior start_link
function being called, returning the tuple of the format {ok,
Pid}
. The Restart
strategy dictates what happens
depending on the termination type of the process, which can be:
transient
processes, which are never restarted;temporary
processes, are restarted only if they terminate
abnormally; andpermanent
processes, which are always restarted, regardless of
the termination being normal or abnormal.Shutdown
is a value in milliseconds referring to the time the
behavior is allowed to execute in the terminate
function when
terminating as the result of a restart or shutdown. The atom
infinity
can also be used, but for behaviors other than
supervisors, it is highly discouraged. Type
is either the atom
worker
, referring to the generic servers, event handlers and
finite state machines, or the atom supervisor
. Together with
ModuleList
, a list of modules implementing the behavior, they
are used to control and suspend processes during the runtime software
upgrade procedures. Only existing or user implemented behaviors may be
part of the child specification list and hence included in a
supervision tree.
With this knowledge at hand, we should now be able to formulate a
restart strategy defining inter-process dependencies, fault tolerance
thresholds and escalation procedures based on a common
architecture. We should also be able to understand what is going on in
the init/1
example of the riak_core_sup.erl
module. First of all, study the CHILD
macro. It creates the
child specification for one child, using the callback module name as
Id
, making it permanent and giving it a shut down time of 5
seconds. Different child types can be workers or supervisors. Have a
look at the example, and see what you can make out of it:
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). init([]) -> RiakWebs = case lists:flatten(riak_core_web:bindings(http), riak_core_web:bindings(https)) of [] -> %% check for old settings, in case app.config %% was not updated riak_core_web:old_binding(); Binding -> Binding end, Children = [?CHILD(riak_core_vnode_sup, supervisor), ?CHILD(riak_core_handoff_manager, worker), ?CHILD(riak_core_handoff_listener, worker), ?CHILD(riak_core_ring_events, worker), ?CHILD(riak_core_ring_manager, worker), ?CHILD(riak_core_node_watcher_events, worker), ?CHILD(riak_core_node_watcher, worker), ?CHILD(riak_core_gossip, worker) | RiakWebs ], {ok, {{one_for_one, 10, 10}, Children}}.
Most of the Children
started by this supervisor are statically
defined workers (or in the case of the vnode_sup
, a
supervisor). The exception is the RiakWebs
portion, which is
dynamically defined depending on the HTTP portion of Riak's
configuration file.
With the exception of library applications, every OTP application,
including those in Riak, will have their own supervision tree. In
Riak, various top-level applications are running in the Erlang node,
such as riak_core
for distributed systems algorithms,
riak_kv
for key/value storage semantics, webmachine
for
HTTP, and more. We have shown the expanded tree under
riak_core
to demonstrate the multi-level supervision going on.
One of the many benefits of this structure is that a given subsystem
can be crashed (due to bug, environmental problem, or intentional
action) and only that subtree will in a first instance be terminated.
The supervisor will restart the needed processes and the overall
system will not be affected. In practice we have seen this work
well for Riak. A user might figure out how to crash a virtual node,
but it will just be restarted by riak_core_vnode_sup
. If
they manage to crash that, the riak_core
supervisor will
restart it, propagating the termination to the top-level supervisor.
This failure isolation and recovery mechanism allows Riak (and Erlang)
developers to straightforwardly build resilient systems.
The value of the supervisory model was shown when one large industrial user created a very abusive environment in order to find out where each of several database systems would fall apart. This environment created random huge bursts of both traffic and failure conditions. They were confused when Riak simply wouldn't stop running, even under the worst such arrangement. Under the covers, of course, they were able to make individual processes or subsystems crash in multiple ways—but the supervisors would clean up and restart things to put the whole system back into working order every time.
The application
behavior we previously introduced is used to
package Erlang modules and resources into reusable components. In OTP,
there are two kinds of applications. The most common form, called
normal applications, will start a supervision tree and all of the
relevant static workers. Library applications such as the Standard
Library, which come as part of the Erlang distribution, contain
library modules but do not start a supervision tree. This is not to
say that the code may not contain processes or supervision trees. It
just means they are started as part of a supervision tree belonging to
another application.
An Erlang system will consist of a set of loosely coupled applications. Some are written by the developers, some are available as open source, and others are be part of the Erlang/OTP distribution. The Erlang runtime system and its tools treat all applications equally, regardless of whether they are part of the Erlang distribution or not.
Riak was designed for extreme reliability and availability at a massive scale, and was inspired by Amazon's Dynamo storage system [DHJ+07]. Dynamo and Riak's architectures combine aspects of both Distributed Hash Tables (DHTs) and traditional databases. Two key techniques that both Riak and Dynamo use are consistent hashing for replica placement and a gossip protocol for sharing common state.
Consistent hashing requires that all nodes in the system know about each other, and know what partitions each node owns. This assignment data could be maintained in a centrally managed configuration file, but in large configurations, this becomes extremely difficult. Another alternative is to use a central configuration server, but this introduces a single point of failure in the system. Instead, Riak uses a gossip protocol to propagate cluster membership and partition ownership data throughout the system.
Gossip protocols, also called epidemic protocols, work exactly as they sound. When a node in the system wishes to change a piece of shared data, it makes the change to its local copy of the data and gossips the updated data to a random peer. Upon receiving an update, a node merges the received changes with its local state and gossips again to another random peer.
When a Riak cluster is started, all nodes must be configured with the
same partition count. The consistent hashing ring is then divided by
the partition count and each interval is stored locally as a
{HashRange, Owner}
pair. The first node in a cluster simply
claims all the partitions. When a new node joins the cluster, it
contacts an existing node for its list of {HashRange, Owner}
pairs. It then claims (partition count)/(number of nodes) pairs,
updating its local state to reflect its new ownership. The updated
ownership information is then gossiped to a peer. This updated state
then spread throughout the entire cluster using the above algorithm.
By using a gossip protocol, Riak avoids introducing a single point of failure in the form of a centralized configuration server, relieving system operators from having to maintain critical cluster configuration data. Any node can then use the gossiped partition assignment data in the system to route requests. When used together, the gossip protocol and consistent hashing enable Riak to function as a truly decentralized system, which has important consequences for deploying and operating large-scale systems.
Most programmers believe that smaller and simpler codebases are not only easier to maintain, they often have fewer bugs. By using Erlang's basic distribution primitives for communication in a cluster, Riak can start out with a fundamentally sound asynchronous messaging layer and build its own protocols without having to worry about that underlying implementation. As Riak grew into a mature system, some aspects of its networked communication moved away from use of Erlang's built-in distribution (and toward direct manipulation of TCP sockets) while others remained a good fit for the included primitives. By starting out with Erlang's native message passing for everything, the Riak team was able to build out the whole system very quickly. These primitives are clean and clear enough that it was still easy later to replace the few places where they turned out to not be the best fit in production.
Also, due to the nature of Erlang messaging and the lightweight core of the Erlang VM, a user can just as easily run 12 nodes on 1 machine or 12 nodes on 12 machines. This makes development and testing much easier when compared to more heavyweight messaging and clustering mechanisms. This has been especially valuable due to Riak's fundamentally distributed nature. Historically, most distributed systems are very difficult to operate in a "development mode" on a single developer's laptop. As a result, developers often end up testing their code in an environment that is a subset of their full system, with very different behavior. Since a many-node Riak cluster can be trivially run on a single laptop without excessive resource consumption or tricky configuration, the development process can more easily produce code that is ready for production deployment.
The use of Erlang/OTP supervisors makes Riak much more resilient in the face of subcomponent crashes. Riak takes this further; inspired by such behaviors, a Riak cluster is also able to easily keep functioning even when whole nodes crash and disappear from the system. This can lead to a sometimes-surprising level of resilience. One example of this was when a large enterprise was stress-testing various databases and intentionally crashing them to observe their edge conditions. When they got to Riak, they became confused. Each time they would find a way (through OS-level manipulation, bad IPC, etc) to crash a subsystem of Riak, they would see a very brief dip in performance and then the system returned to normal behavior. This is a direct result of a thoughtful "let it crash" approach. Riak was cleanly restarting each of these subsystems on demand, and the overall system simply continued to function. That experience shows exactly the sort of resilience enabled by Erlang/OTP's approach to building programs.
This chapter is based on Francesco Cesarini and Simon Thompson's 2009 lecture notes from the central European Functional Programming School held in Budapest and Komárno. Major contributions were made by Simon Thompson of the University of Kent in Canterbury, UK. A special thank you goes to all of the reviewers, who at different stages in the writing of this chapter provided valuable feedback.