High Availability for Mutable Shared State

Mutable shared state is the root of all evil in concurrent systems. The history of concurrent computation is a basically the story of approaches to managing mutable shared state. The thread model, which has long held the dominant position, leads to intractable complexity [1].

The actor model captures state in the behavior of an actor. An actor’s behavior may change over time, representing changing state. However, this “state” may only be observed through the actor’s responses to asynchronous messages. Concurrent actors react to the reception of asynchronous messages, processing them in non-deterministic arrival order. Each message reception may cause a change in the behavior/state of the actor, but this change will only become visible through the actor’s response to subsequent messages. In essence, message reception and processing is the atomic transactional unit of state change in an actor system.

Naive Database

The actor approach to shared mutable state can be illustrated by a naive “database” implementation. The “readers/writers problem” is to keep the database consistent and available when there are multiple concurrent requests to read/write the database.

Our database protocol consists of #read and #write requests. A #read request includes a query function that computes a result based on the state of the database. A #write request includes an update function that computes a new state for the database based on the current state. Both requests produce asynchronous replies indicating completion of the request. A #read request sends the computed query result, and a #write request sends the identity of the database actor.

LET database_beh(state) = \msg.[
	CASE msg OF
	(cust, #read, query) : [
		SEND query(state) TO cust
	]
	(cust, #write, update) : [
		BECOME database_beh(update(state))
		SEND SELF TO cust
	]
	END
]

Each request represents an atomic transaction on the database. Requests are serialized by the reception ordering rules of the actor model. Multiple concurrent requests are thus handled in a non-deterministic order, but no request can interfere with another. Consistency of the database is maintained across concurrent requests.

If one request is dependent on the completion of another, we must wait for the asynchronous response to be sure that the prior request has completed. Note, however, that additional requests may be processed before our next request is received, so the state of the database may not be simply the result of our last update.

What makes this database naive is that we are ignoring the processing time required to compute the results of query and update functions. No further requests can be received while these functions are being applied to the state of the database, which can take an arbitrary amount of time. In the worst case, these functions may not terminate, making the database permanently unresponsive. We can ensure consistency, but not availability.

High Availability

Our strategy to maintain availability involves separating the query and update computations from the handling of concurrent requests. The database still maintains the consistent state. New actors are created to perform query and update computations.

Ready Database

The database is “ready” when there are no #write requests in progress. A #read request cannot change the state of the database, but computation of the query function takes an arbitrary amount of time. Therefore, we create a reader actor that performs the query computation on the state, and eventually sends the result to the original customer.

LET database_beh(state) = \msg.[
	CASE msg OF
	(cust, #read, query) : [
		CREATE reader WITH \_.[
			SEND query(state) TO cust
		]
		SEND () TO reader
	]
	...
	END
]

All actor computation is initiated by reception of a message. The information our reader actor requires is already in scope, so we initiate processing by sending an empty message. The reader computes the query from the current state and sends the result to cust. The database is immediately available to handle further requests. The reader processes the query asynchronously. The original customer will receive an asynchronous reply when (and if) the query function completes. Processing a query does not impact the availability of the database for concurrent requests.

So what about updates? We also want to avoid having updates interfere with availability. We accomplish this by separating the computation of the new state from the update of the observable state of the database. While the new state is being computed, the current state remains consistently available. However, we also must prevent overlapping updates, so subsequent update requests are queued while an update is being computed. This effectively reduces the availability of the database for writers, relative to other writers, but not readers.

LET database_beh(state) = \msg.[
	CASE msg OF
	...
	(cust, #write, update) : [
		CREATE writer WITH writer_beh(cust, update, state)
		SEND SELF TO writer
		BECOME locked_db_beh(writer, state, q-empty)
	]
	END
]
LET writer_beh(cust, update, state) = \db.[
	SEND (SELF, #update, update(state), cust) TO db
]

When a #write request is received, a new writer actor is created to asynchronously compute the new state by applying the update function to the current state. Computation in the writer is initiated by a message containing the identity of the database itself. The database becomes “locked”, since there is now a #write in progress, with an initially empty queue of deferred writers.

Banker’s Queue

In previous articles, when we needed to defer customers, we often used a simple stack built by pairing new items with a list of items previously deferred. While this can be somewhat justified on the basis of indeterminate delays in asynchronous message passing, this inherently unfair strategy could lead to starvation of some customers. To restore fairness, we would prefer to use a queue (FIFO) rather than a stack (LIFO). We could use a Finger Tree for our queue, but that would be overly complicated since all we need is simple queue semantics. Instead, we will use a persistent functional data structure called a Banker’s Queue, which offers amortized O(1) performance [2].

LET push-pop(s', s) = (
	CASE s OF
	NIL : s'
	(x, xs) : push-pop((x, s'), xs)
	END
)
LET reverse(s) = push-pop(NIL, s)
LET q-empty = (NIL, NIL)
LET q-norm(p, q) = (
	CASE p OF
	NIL : (reverse(q), NIL)
	_ : (p, q)
	END
)
LET q-put((p, q), x) = q-norm(p, (x, q))
LET q-take(p, q) = (
	CASE p OF
	NIL : (?)
	(h, t) : (h, q-norm(t, q))
	END
)

A Banker’s Queue maintains a pair of stacks. The “front” stack p contains elements ready to be taken from the queue, from first to last. The “back” stack q contains elements put on the queue, from last to first. The queue is normalized by ensuring that the “front” stack is only empty when the queue is empty. If the “front” stack is empty the normalization function q-norm transfers items from the “back” stack to the “front” stack, reversing their order in the process. Note that a successful call to the q-take function returns a pair consisting of the element taken, and the new state of the queue. Once again, there is no mutable state involved. All values are immutable. The q-put and q-take functions generate new values for each queue state.

Locked Database

The database is “locked” when there is a #write request in progress. The database remembers the identity of the writer that is computing an update. While an updated state is being computed, concurrent #read requests are still satisfied using the current state of the database. The strategy for handling #read requests is the same as we used while “ready”.

LET locked_db_beh(writer, state, waiting) = \msg.[
	CASE msg OF
	(cust, #read, query) : [
		CREATE reader WITH \_.[
			SEND query(state) TO cust
		]
		SEND () TO reader
	]
	...
	END
]

If additional #write requests arrive while an update is in progress, they are put on the back of the waiting queue. The database immediately becomes available to process more requests.

LET locked_db_beh(writer, state, waiting) = \msg.[
	CASE msg OF
	...
	(cust, #write, update) : [
		BECOME locked_db_beh(writer, state, 
			q-put(waiting, (cust, update)))
	]
	...
	END
]

While “locked”, the database is prepared to receive an #update request from the writer that is computing the new state. The writer was created by the database, so the database is the only actor (besides the writer) that knows the identity of the writer. Thus the writer identity serves as a security token which only the writer can provide.

LET locked_db_beh(writer, state, waiting) = \msg.[
	CASE msg OF
	...
	($writer, #update, state', cust) : [
		CASE waiting OF
		$q-empty : [
			BECOME database_beh(state')
		]
		_ : [
			LET ((cust', update'), waiting') = 
				$q-take(waiting)
			CREATE writer' WITH 
				writer_beh(cust', update', state')
			SEND SELF TO writer'
			BECOME locked_db_beh(writer', state', waiting')
		]
		END
		SEND SELF TO cust
	]
	END
]

When the writer has computed a new state’, it sends an #update message tagged with its own identity, and includes the customer cust of the original #write request. If the waiting queue is empty, the database simply becomes “ready” with the new state’ value. If the waiting queue is not empty, a waiting customer cust’ and update’ function are taken from the front of the queue. A new writer’ is created to compute the next update and the identity of the database is sent to initiate processing. The database remains “locked”, but the writer’ in progress, the state’ and any remaining waiting’ requests are updated. In any case, the database identity is sent to the original customer cust as a signal that the #write has completed.

Test Fixture

With these behaviors in place, we can create a database with a simple integer state and send it a few concurrent requests. We use the built-in timer actor to arrange for delayed delivery of some requests, presuming that previous outstanding requests will all have completed by the time the delayed messages are delivered.

CREATE db WITH database_beh(0)

SEND (println, #write, \x.add(x, 1)) TO db
SEND (println, #read, \x.(-1, x)) TO db
SEND (println, #read, \x.(-2, x)) TO db
SEND (println, #read, \x.(-3, x)) TO db

SEND (1000, (println, #write, \x.add(x, 2)), db) TO timer
SEND (1000, (println, #read, \x.(-4, x)), db) TO timer
SEND (1000, (println, #write, \x.add(x, 4)), db) TO timer

SEND (2000, (println, #read, \x.(-5, x)), db) TO timer

There is considerable opportunity for non-determinism in this example. The database is created with an integer state of zero (0). Three batches of concurrent requests are sent one second apart.

The first batch includes a #write request and three #read requests. The #write request computes an updated state by adding one (1) to the current state. The #read requests each label the current state with a distinct negative-numbered prefix (so we can identify the requests). Some of the #read requests may get the initial value (0) and some may get the updated value (1), depending on non-deterministic arrival ordering of the requests.

The second batch includes two #write requests and a #read request. The #write requests add two (2) and four (4) respectively to the current state. Again, depending on the non-deterministic arrival order of the requests, the #read request may observe the value 1, 3, 5 or 7.

The third batch includes only a #read request, which will observe the final value of 7.

Conclusion

Shared mutable state can be represented by the behavior of an actor. The transactional semantics of actor message reception and processing ensure consistency. Availability can be enhanced by off-loading computation to dynamically-created subordinate actors. Fairness of serialized updates is provided by a queue for deferred requests. This addresses two of the three pillars of Brewer’s CAP Theorem [3]. Partition tolerance is enhanced by the encapsulation of query and update computations as part of #read and #write requests. Customers can become partitioned from the database without affecting the availability of the database to non-partitioned customers.

References

[1]
E. Lee. The Problem with Threads, Computer, v.39 n.5, p.33-42, May 2006.
[2]
C. Okasaki. Purely Functional Data Structures. Cambridge University Press, 1998.
[3]
N. Lynch, S. Gilbert. Brewer’s conjecture and the feasibility of consistent, available, partition-tolerant web services. ACM SIGACT News, Volume 33 Issue 2. 2002.


Tags: , , , , , , , ,
This entry was posted on Monday, November 7th, 2011 at 7:14 am and is filed under Uncategorized. You can follow any responses to this entry through the RSS 2.0 feed. You can leave a response, or trackback from your own site.

3 Responses to “High Availability for Mutable Shared State”

  1. Semantic Extensibility with Vau

    […] of extensibility in our target language, we will re-implement the Banker’s Queue datatype shown previously. This implementation makes use of some Kernel features not available in traditional LISP/Scheme […]

  2. Mutable Objects in Kernel

    […] previously described, a Banker’s Queue maintains a pair of stacks. The “front” stack contains elements […]

  3. It's Actors All The Way Down » Serializers Revisited

    […] strategy behind this implementation was explained in a previous post, so here we will just describe the API provided by this […]

Leave a Reply

Your comment