Actors Make Better Observers

The Observer pattern causes temporal coupling in systems with synchronous message passing. This can lead to failure in Object-Oriented systems. Asynchronous messaging avoids the pitfalls. Actor-based implementations more accurately realize the original intent of the pattern.

The intent of the Observer pattern is to “define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.” [1] The Observer pattern is known by several other names. In many user-interface frameworks Observers are called Listeners. In distributed system design it is often called Publish/Subscribe, and is generalized to many-to-many relationships.

Observers in JavaScript

In order to illustrate the problems with the Observer pattern in a traditional Object-Oriented context, we will consider a simple JavaScript implementation. A Subject allows observers (call-back functions) to attach and detach themselves. A Subject can notify all currently attached observers of an event.

var Subject = function () {
this.observers = [];
};
Subject.prototype.attach = function (observer) {
if (typeof observer !== 'function') {
throw Error('Observers must be functions');
}
this.observers.push(observer);
return this;
};
Subject.prototype.detach = function (observer) {
var i;

for (i = 0; i < this.observers.length; i += 1) {
if (this.observers[i] === observer) {
this.observers.splice(i, 1);  // remove observer
break;
}
}
return this;
};
Subject.prototype.notify = function (event) {
var i;

for (i = 0; i < this.observers.length; i += 1) {
(this.observers[i])(event);
}
return this;
};


The implementation is straight-forward. Each Subject maintains a list of observers. The notify method iterates through the list and calls the registered call-back function of each observer. For convenience, each method returns this, to allow call chaining.

Failure Modes

Given an implementation like the one shown above, consider what happens during event notification. Some activity in the system causes an object to change. The object updates its internal state and calls notify on a subject that holds observers interested in changes to the object. The notify method begins iterating through observers, calling their call-back functions. Figure 1 illustrates the messages involved, and hints at the potential pitfalls.

Figure 1 - Synchronous Observer Failure Modes

The Observer pattern is meant to decouple subjects and observers. It is true that subjects are decoupled from the identities of their observers, and observers may come and go over time. However, the single-threaded synchronous delivery of notifications implies mutual coupling in time between the subject and all of the observers. The thread iterating over the observer list is "suspended" as it enters each observer call-back.

If one of the observers makes a blocking call, or fails to return due to divergent behaviors (such as an infinite loop), consider the impact. The subject will never complete its notification, and any remaining observers will not be notified. The subject and all of the observers are dependent on the proper behavior of each observer, as well as their ordering in the notification list.

If one of the observers performs an action which modifies the subject, another notification loop can be triggered. Since synchronous message delivery is single-threaded, this re-entrant notification will occur in the middle of the in-progress notifications. Some observers will receive the nested notification before other observers have been notified of the original event. Worse yet, the object which caused the nested notification will be re-entered (through its call-back) and often will trigger further nested notifications, creating an infinitely nested event echo.

These are not simply theoretical issues. They occur in the development of real systems. Traditional counter-measures include complicated "update-in-progress" flags, and posting "invoke-later" events (which are essentially asynchronous messages).

Actor-Based Observers

The asynchronous messaging of actor-based systems allows implementations of the Observer pattern to more closely realize the original intent. Subjects and observers are truly decoupled. The action, or lack of action, of an observer cannot interfere with the subject or any other observer. Consider this simple implementation of the Observer pattern in Humus:

LET subject_beh(observers) = \msg.[
CASE msg OF
(#attach, observer) : [
BECOME subject_beh(observer, observers)
]
(#notify, event) : [
SEND observers TO dispatcher
CREATE dispatcher WITH \list.[
CASE list OF
(first, rest) : [
SEND event TO first
SEND rest TO SELF
]
END
]
]
END
]


When an actor with subject_beh receives a message msg, it is either an #attach message, or a #notify message. All other messages are ignored.

If the subject receives an #attach message, it adds the new observer to a NIL-terminated list of observers, updating its behavior to reflect the addition.

If the subject receives a #notify message, it sends the list of observers to a newly-created dispatcher actor. The dispatcher receives the observer list and splits the first from the rest. The notification event is sent to the first actor, and the rest of the list is sent back the the dispatcher (SELF) for further processing. When the list is NIL, no split is possible and the message is ignored.

The dispatcher is an example of a common actor interaction pattern. It is created specifically to handle concurrent processing of items in a list. Let's refactor the implementation to extract the behavior of the dispatcher.

LET dispatch_beh(event) = \list.[
CASE list OF
(first, rest) : [
SEND event TO first
SEND rest TO SELF
]
END
]
LET subject_beh(observers) = \msg.[
CASE msg OF
(#attach, observer) : [
BECOME subject_beh(observer, observers)
]
(#notify, event) : [
SEND observers TO NEW dispatch_beh(event)
]
END
]


We have not changed the behavior of the system. We have refactored it to clarify the separation of responsibilities. The dispatch_beh is responsible for delivering an event to a list of actors. The subject_beh is responsible for maintaining the list of observers, and initiating an event dispatch. We also notice that the dispatcher can remain anonymous, so we use NEW, instead of CREATE, to inline the actor creation.

We can demonstrate these behaviors with a test fixture like this:

LET observer_beh(label) = \event.[ SEND (label, event) TO println ]

CREATE o1 WITH observer_beh(#o1)
CREATE o2 WITH observer_beh(#o2)
CREATE o3 WITH observer_beh(#o3)
CREATE subject WITH subject_beh(o1, o2, o3, NIL)

SEND (#notify, #Ping) TO subject


Executing this code in the Humus simulator should produce the output:

#o1, #Ping
#o2, #Ping
#o3, #Ping


Figure 2 illustrates the message flow involved in dispatching an event.

Figure 2 - Asynchronous Observer Message Flow

Notice the potential concurrency in the message flow. The subject is immediately available to process additional requests. It could be adding a new observer while an event is being delivered. Any changes to the observer list would not affect the dispatcher because it is traversing the list as it existed when the notification began. The observer notifications can proceed concurrently with the list traversal because the event message and the message containing the rest of the list are sent concurrently and received asynchronously. Nothing that an observer does in response to an event can interfere with the subject, the traversal, or any other observer. Once the dispatcher has delivered the event to all of the observers, no references to the dispatcher remain, so it can be garbage-collected.

Implementing Detach

Now let's extend the protocol of our subject behavior to understand a #detach message. Detach removes an observer from the notification list.

LET dispatch_beh(event) = \list.[
CASE list OF
(first, rest) : [
SEND event TO first
SEND rest TO SELF
]
END
]
LET remove(list, item) = (
CASE list OF
NIL : NIL
($item, rest) : rest (first, rest) : (first, remove(rest, item)) END ) LET subject_beh(observers) = \msg.[ CASE msg OF (#attach, observer) : [ BECOME subject_beh(observer, observers) ] (#detach, observer) : [ BECOME subject_beh(remove(observers, observer)) ] (#notify, event) : [ SEND observers TO NEW dispatch_beh(event) ] END ] LET empty_subject_beh =$subject_beh(NIL)


When an actor with subject_beh receives a #detach message, it removes the observer from the NIL-terminated list of observers, updating its behavior to reflect the removal.

The helper function remove determines the value of the observer list without the specified observer. Note that, like all functions in Humus, remove is a pure function that creates a new value for the observer list. The previous value is not modified in any way, and may be concurrently used for an in-progress event dispatch.

Finally, we defined empty_subject_beh to encapsulate the representation of a subject with no observers as subject_beh with a NIL observer list. This kind of definition gives us the freedom to change the representation later without affecting client code that creates new subjects.

Test Fixture

Previously, our test fixture initialized the subject with a list of observers. Now let's exercise the full life-cycle by dynamically attaching and detaching observers, interspersed with event notifications.

LET observer_beh(label) = \event.[ SEND (label, event) TO println ]

CREATE subject WITH empty_subject_beh
CREATE o1 WITH observer_beh(#o1)
CREATE o2 WITH observer_beh(#o2)
CREATE o3 WITH observer_beh(#o3)

SEND (#attach, o1) TO subject
SEND (#attach, o2) TO subject
SEND (#attach, o3) TO subject
SEND (1000, (#notify, #Hello), subject) TO timer

SEND (2000, (#detach, o2), subject) TO timer
SEND (3000, (#notify, #World), subject) TO timer


Here we use the pre-defined timer actor to schedule messages for delivery after 1, 2 and 3 seconds of delay time. The output from this test should be:

#o2, #Hello
#o3, #Hello
#o1, #Hello
#o3, #World
#o1, #World


The delivery order may vary, but we should see that #Hello is sent to all three observers, yet #World is not sent to o2 because that observer has been detached.

Actor Thinking

So far we have taken a rather straight-forward approach to implementing the Observer pattern. We have given the Subject responsibility for maintaining the observer list, and created a dispatcher to handle each event notification. This corresponds to a tradition object-oriented approach to managing state (data) and behavior (methods). One potential drawback, in this case, is that #detach takes time proportional to the length of the observer list, due to the recursive remove function. While this is not a problem for the small number of observers that exist in typical usage, it can be a serious problem in large-scale systems.

One important principle in actor-based design is "never block". A weaker version of this is "avoid unpredictable delays". How can we approach this problem in a way that avoids the O(n) delay caused by our implementation of #detach? How about if we distribute the work in such a way that we can perform updates (especially observer removal) and notifications in an overlapping manner.

One way to do this is to create an actor responsible for each observer. They each represent an observer's interest in a subject. We will take care to ensure that the collection of actors representing the subject are always in a consistent state, without blocking updates or notifications. We also need an actor that represents that base-case, a subject with no observers. This will be our new representation of an "empty" subject.

LET empty_subject_beh = \msg.[
CASE msg OF
(#attach, observer) : [
CREATE next WITH empty_subject_beh
BECOME subject_beh(observer, next)
]
END
]


The only message understood by empty_subject_beh is #attach, since both #detach and #notify require no action if there are no observers. When an actor with empty_subject_beh receives an #attach message, a new actor with empty_subject_beh is created as next, and the current actor changes behavior to remember the observer and link to next.

The subject_beh now represents the interest of one observer in our subject. Messages that are not relevant to this subject/observer are passed on to the next actor, and eventually reach an actor with empty_subject_beh.

LET subject_beh(observer, next) = \msg.[
CASE msg OF
(#attach, observer') : [
CREATE next' WITH subject_beh(observer, next)
BECOME subject_beh(observer', next')
]
(#detach, $observer) : [ BECOME forward_beh(next) ] (#notify, event) : [ SEND event TO observer SEND msg TO next ] _ : [ SEND msg TO next ] END ]  When an actor with subject_beh receives an #attach message, a new actor with subject_beh is created as next', and the current actor changes behavior to remember the new observer' and link to next'. The next' actor remembers the original observer and links to the original next. Essentially, this accomplishes an insertion "before" the current actor. When an actor with subject_beh receives a #detach message that matches the current observer, it changes its behavior to forward_beh with a link to next. When an actor with subject_beh receives a #notify message, it sends the event to its observer, and forwards the entire message msg on to the next actor. In this way, each actor in the chain is given an opportunity to notify its associated observer. Any other messages (including #detach messages that don't match the current observer) are forwarded on to the next actor for further processing. LET forward_beh(next) = \msg.[ SEND msg TO next ]  An actor with forward_beh immediately forwards any message msg it receives on to the next actor. This allows transparent deactivation of an actor while maintaining its identity. This identity is important to maintain, since other actors may be referencing it. We use this forwarding behavior to "remove" an actor (representing an observer's interest in the subject) from the notification chain. As Agha notes, concerns about the performance impact of forwarders can be systematically eliminated [2]. The basic approach is to allow the runtime system to "patch" all references to forwarding actors, replacing them with references to the actor to which they forward. Without this optimization, using forwarders could obviously degrade performance and create space leaks. Rather than assume that this optimization is always present (for example, it is lacking in the simulator), we will explore ways to explicitly prune this forwarding actor. Pruning Our strategy for pruning forwarders will involve reversing the process we used for inserting a new observer. We want the pruned actor to become equivalent to its successor, the next actor in the chain. However, we must take care to avoid having two actors simultaneously responsible for the same observer. The pruning process starts when a subject receives a #detach message. LET subject_beh(observer, next) = \msg.[ CASE msg OF (#attach, observer') : [ CREATE next' WITH subject_beh(observer, next) BECOME subject_beh(observer', next') ] (#detach,$observer) : [
BECOME prune_beh(next)
SEND (#prune, SELF) TO next
]
(#notify, event) : [
SEND event TO observer
SEND msg TO next
]
(#prune, prev) : [
SEND (SELF, observer, next) TO prev
]
_ : [ SEND msg TO next ]
END
]


When an actor with subject_beh receives a #detach message that matches the current observer, it changes its behavior to prune_beh (with a link to next), and sends a #prune message (with a reference to its SELF) to the next actor.

When an actor with subject_beh receives a #prune message, that means that its predecessor prev needs to be pruned. The second step of the pruning process involves sending the current actor's state (its observer and next link) to its predecessor. This message is tagged with the identity of the current actor SELF.

At the end of the notification chain there is an "empty" subject, which may be the successor of some node to be pruned.

LET empty_subject_beh = \msg.[
CASE msg OF
(#attach, observer) : [
CREATE next WITH empty_subject_beh
BECOME subject_beh(observer, next)
]
(#prune, prev) : [
SEND (SELF, NIL) TO prev
]
END
]


When an actor with empty_subject_beh receives a #prune message, that means that its predecessor prev needs to be pruned. The second step of the pruning process involves sending the current actor's state to its predecessor. Since the empty subject has no state, we represent that by sending NIL. As before, this message is tagged with the identity of the current actor SELF.

Now we can consider the behavior of an actor waiting to be pruned. Previously this was simply a forwarder. Now we need to support the final step of the pruning process, eliminating a node from the notification chain.

LET prune_beh(next) = \msg.[
CASE msg OF
($next, NIL) : [ BECOME empty_subject_beh ] ($next, observer', next') : [
BECOME subject_beh(observer', next')
]
_ : [ SEND msg TO next ]
END
]


When an actor with prune_beh receives a message tagged with the identity of its successor next, having NIL state, it becomes an empty_subject_beh. The current actor has become equivalent to its successor. Since there are no further references to the successor, it can be garbage collected.

When an actor with prune_beh receives a message tagged with the identity of its successor next, having an observer' and next' link state, it becomes subject_beh with that state. The current actor has become equivalent to its successor, eliminating the successor from the notification chain. Since there are no further references to the successor, it can be garbage collected.

Any other messages (those not tagged with the successor's identity) are forwarded on to the next actor for further processing. This is just the behavior of a forwarder.

Figure 3 - Notification During Pruning

Consider what could happen if, as Figure 3 illustrates, we have notifications propagating down the dispatch chain while a pruning operation is in progress. Let's say we have three observers, o1, o2 and o3 (as demonstrated by our test fixtures). A #detach message is received by the subject_beh representing o2, so it becomes prune_beh and sends a #prune message to it successor, the subject_beh representing o3. The actor representing o3 receives the #prune message, so it sends its identity and state to the actor representing o2. Next, the actor representing o2 (which is now prune_beh) receives a #notify message, which it forwards to the actor representing o3. Then, the actor representing o2 receives the identity and state of the actor representing o3, so it becomes subject_beh representing o3, linking to the successor of the actor previously representing o3. The actor previously representing o3 receives the forwarded #notify message, sends the event to o3 and forwards the message to its successor. Notice that the event is correctly delivered to o3, but not o2, despite the fact that the responsibility for representing o3 is in the process of migrating due to pruning.

State and Behavior

Our pruning solution migrates responsiblity between actors by packaging up state and sending it in a message. The receiving actor must then take that state and become an appropriate behavior. We can do better than that. Why not just send the desired behavior to be migrated? Recognizing that behavior is a value leads to a simplified solution. Instead of prune_beh we will use become_beh to become the target of behavioral migration. An actor with become_beh has an auth token (usually an actor identity) and an actor to which it can delegate requests.

LET become_beh(auth, delegate) = \msg.[
CASE msg OF
($auth, beh) : [ BECOME beh ] _ : [ SEND msg TO delegate ] END ]  When an actor with become_beh receives a message tagged with its auth token, it becomes the received behavior beh. All other messages are forwarded to the delegate actor. We can use this become_beh to improve our implementation of the pruning process. LET subject_beh(observer, next) = \msg.[ CASE msg OF (#attach, observer') : [ CREATE next' WITH subject_beh(observer, next) BECOME subject_beh(observer', next') ] (#detach,$observer) : [
BECOME become_beh(next, next)
SEND (#prune, SELF) TO next
]
(#notify, event) : [
SEND event TO observer
SEND msg TO next
]
(#prune, prev) : [
SEND (SELF, subject_beh(observer, next)) TO prev
]
_ : [ SEND msg TO next ]
END
]


When an actor with subject_beh receives a #detach message, it becomes become_beh with its successor as both the auth token and the delegate.

When an actor with subject_beh receives a #prune message, it sends its identity and its subject_beh to the pruning target prev. The identity SELF is the auth token for the migration become_beh of prev.

LET empty_subject_beh = \msg.[
CASE msg OF
(#attach, observer) : [
CREATE next WITH empty_subject_beh
BECOME subject_beh(observer, next)
]
(#prune, prev) : [
SEND (SELF, empty_subject_beh) TO prev
]
END
]


When an actor with empty_subject_beh receives a #prune message, it sends its identity and its empty_subject_beh to the pruning target prev. The identity SELF is the auth token for the migration become_beh of prev.

Conclusion

We have explored several implementations of the Observer pattern. An initial implementation in JavaScript illustrated some of the pitfalls inherent in using synchronous messaging for Observers. The asynchronous messaging of actor-based systems elegantly avoids the pitfalls, giving an implementation that more closely matches the intent of the pattern, as well as our mental model of its operation. We iteratively refactored our actor-based Observer implementation, applying patterns that take advantage of the unique strengths of actors. We moved from the "single responsiblity principle" to what could be called the "collaborative emergence principle", where a collection of cooperating actors provide a solution without centralized control.

References

[1]
E. Gamma, et. al. Design Patterns: elements of reusable object-oriented software. Addison-Wesley Publishing Company, 1995.
[2]
G. Agha. Actors: A Model of Concurrent Computation in Distributed Systems. MIT Press, Cambridge, Mass., 1986.