# Producer/Consumer Rate-Matching

Flow control is a critical feature in a network of asynchronous communicating processes. Our fanciful exploration of a yak-shaving barber’s shop provided us with patterns we can apply to more general problems. The bounded-buffer mechanism is a generalization of our barber’s waiting room. It mediates between producers and consumers, matching the rate of production with the rate of consumption.

Traditionally, producers and consumers are both active processes. Producers do some processing and “produce” a product. Consumers then “consume” that product. Pipelines are often formed where each step is both a consumer and a producer. If these processes communicate directly with each other, they must “rendezvous” to synchronize their activities (as discussed previously). This temporal coupling can be loosened by introducing buffers.

A buffer (sometimes called a “channel”) is usually viewed as a passive component. Producers and consumers synchronize their access to the buffer in order to maintain consistency. The buffer provides a place for the producer to store products without waiting until the consumer is ready to consume them. It also helps keep the consumer busy by providing a backlog of ready products to consume. When production and consumption rates are similar, but variable, this tends to enhance utilization for both producers and consumers. When the rate of production generally exceeds the rate of consumption, the buffers tend to stay full. When the rate of consumption generally exceeds the rate of production, the buffers tend to stay empty. System tuning involves matching the rate of production to the rate of consumption, usually by creating multiple producers or consumers to establish a balance.

We will use actors to recast the traditionally passive buffer as an active component. Synchronization will become the responsibility of the buffer. This is a bounded-buffer, which means it has a limit to the number of items it can hold. This limit provides feedback, in the form of back-pressure, to producers. This back-pressure can be used to throttle production. The buffer can also provide an interface to non-deterministic event processes (e.g.: external interrupts), by holding arrival events until a consumer is ready to process them.

## Consumer

First we model the consumer (based on the barber) as a simple state-machine with two states. In the “sleep” state, the consumer is awaiting the arrival of new work (a #ready message). When the work arrives we send a work-in-process signal (a #working message) to the work, and transition to “awake” state.

LET consumer_sleep_beh(buffer) = \(work, #ready).[
SEND (SELF, #working) TO work
BECOME consumer_awake_beh(buffer, work)
]


In the “awake” state, the consumer is awaiting the completion signal (a #done message) from the work. When the work is completed, the consumer requests more work from the buffer, and transitions to “sleep” state.

LET consumer_awake_beh(buffer, work) = \($work, #done).[ SEND (SELF, #take) TO buffer BECOME consumer_sleep_beh(buffer) ]  The consumer can only work on one item at a time. When the consumer is ready, new work is pulled from the buffer (with #take messages). The rate of consumption is throttled by the buffer’s response to requests for work. Once work is requested, the consumer stays in “sleep” state until work is available. ## Bounded-Buffer An active bounded-buffer actor (based on the waiting room) is the key to our rate-matching solution. The buffer is initially empty. When consumers request work (using #take messages), the buffer keeps the requests in a queue until work is available. When work is added to the buffer (using #put messages), the buffer keeps the work in a queue until requested by consumers. We will assume the definitions (shown previously) for a Banker’s Queue implementation. LET empty_buffer_beh(limit) = \msg.[ CASE msg OF (consumer, #take) : [ SEND (#buffer, -1) TO println LET queue =$(q-put(q-empty, consumer))
BECOME ready_buffer_beh(limit, -1, queue)
]
(work, #put) : [
SEND (SELF, #waiting) TO work
SEND (#buffer, 1) TO println
LET queue = $(q-put(q-empty, work)) BECOME wait_buffer_beh(limit, 1, queue) ] _ : [ THROW (#Unexpected, msg) ] END ]  When a buffer in “empty” state receives a #take message, a buffer status message with a count of -1 is sent to the console, indicating one consumer waiting. The consumer is added to an empty queue, and the buffer transitions to “ready” state. When a buffer in “empty” state receives a #put message, a #waiting notification is sent to the work, and a buffer status message with a count of 1 is sent to the console, indicating one work item waiting. The work is added to an empty queue, and the buffer transitions to “wait” state. Figure 1 - Bounded-Buffer State Machine ### Buffer Ready State The buffer is in “ready” state when there are consumers ready for work, but there is no work available. Additional consumers are queued as they become ready for work. When work arrives, a consumer is removed from the queue to handle the work item. If the consumer queue becomes empty, the buffer returns to “empty” state. LET ready_buffer_beh(limit, count, queue) = \msg.[ CASE msg OF (consumer, #take) : [ LET count' =$(sub(count, 1))
SEND (#buffer, count') TO println
LET queue' = $(q-put(queue, consumer)) BECOME ready_buffer_beh(limit, count', queue') ] (work, #put) : [ LET (consumer, queue') =$(q-take(queue))
SEND (work, #ready) TO consumer
LET count' = $(add(count, 1)) CASE count' OF 0 : [ SEND (#buffer, 0) TO println BECOME empty_buffer_beh(limit) ] _ : [ SEND (#buffer, count') TO println BECOME ready_buffer_beh(limit, count', queue') ] END ] _ : [ THROW (#Unexpected, msg) ] END ]  When a buffer in “ready” state receives a #take message, the count is decremented, indicating the number of consumers waiting. A buffer status message with the new count is sent to the console, and the consumer is added to the ready queue. When a buffer in “ready” state receives a #put message, a consumer is removed from the ready queue and given the work. The count is incremented, since there is one less consumer ready. A buffer status message with the new count is sent to the console. If the count is zero, the buffer transitions to “empty” state. ### Buffer Wait State The buffer is in “wait” state when there is work waiting, but no consumers ready for work. When a consumer is ready, a work item is removed from the queue and given to the consumer. As additional work arrives, it is added to the queue of work waiting. If the buffer reaches its limit, additional work is rejected. If the waiting work queue becomes empty, the buffer returns to “empty” state. LET wait_buffer_beh(limit, count, queue) = \msg.[ CASE msg OF (consumer, #take) : [ LET (work, queue') =$(q-take(queue))
SEND (work, #ready) TO consumer
LET count' = $(sub(count, 1)) CASE count' OF 0 : [ SEND (#buffer, 0) TO println BECOME empty_buffer_beh(limit) ] _ : [ SEND (#buffer, count') TO println BECOME wait_buffer_beh(limit, count', queue') ] END ] (work, #put) : [ LET count' =$(add(count, 1))
CASE compare(count', limit) OF
1 : [
SEND (SELF, #rejected) TO work
]
_ : [
SEND (SELF, #waiting) TO work
SEND (#buffer, count') TO println
LET queue' = \$(q-put(queue, work))
BECOME wait_buffer_beh(limit, count', queue')
]
END
]
_ : [ THROW (#Unexpected, msg) ]
END
]


When a buffer in “wait” state receives a #take message, a work item is removed from the waiting queue and given to the consumer. The count is decremented, since there is one less work item waiting A buffer status message with the new count is sent to the console. If the count is zero, the buffer transitions to “empty” state.

When a buffer in “wait” state receives a #put message, the count is incremented, indicating the amount of work waiting. If the count exceeds the bounded-buffer limit, a #rejected message is sent to the work. Otherwise, a buffer status message with the new count is sent to the console, and the work is added to the waiting queue.

## Producer

We model the producer with an actor that generates work at random intervals. The producer is configured with a reference to the buffer and a counter used to label work items.

LET producer_beh(buffer, n) = \t.[
CREATE work WITH work_beh(n)
SEND (work, #put) TO buffer
BECOME producer_beh(buffer, add(n, 1))
SEND (t, (SELF, 5000), random) TO timer
]


When the producer receives a message, it creates a new work item labelled with the counter n. The work is sent in a #put message to the buffer. The counter is incremented, and a delayed message is scheduled via the timer. After the delay t, the timer sends a message to random, generating the next delay-time message for the producer.

## Work

Naturally, the work is also an actor. Each work item is created with a numeric label. Any message it receives is sent to the console with the label as a prefix. In addition, the work actor models the time taken to complete the work.

LET work_time = 4000  # 4 seconds
LET work_beh(n) = \msg.[
SEND (n, msg) TO println
CASE msg OF
(consumer, #working) : [
SEND (work_time, (consumer, #complete), SELF) TO timer
]
(consumer, #complete) : [
SEND (SELF, #done) TO consumer
]
END
]


When the work receives a message, it sends the message to the console with its label n as a prefix. For a #working message from a consumer, a delayed #complete message is scheduled via the timer. This represents the time taken to consume/complete the work. When the #complete message arrives, a #done message is sent to the consumer, indicate that the work is completed.

In this example, since the work model is so simple, we could have simply scheduled a delayed #done message for the consumer directly. Our use of a separate #complete message represents the potential to perform some kind of wrap-up action before notifying the consumer that the work is done.

## Usage Example

We will demonstrate the operation of our bounded-buffer by creating a buffer with a capacity of three, a single producer and two consumers.

CREATE buffer WITH empty_buffer_beh(3)
CREATE producer WITH producer_beh(buffer, 1)
SEND (producer, 5000) TO random
CREATE consumer_1 WITH consumer_sleep_beh(buffer)
SEND (consumer_1, #take) TO buffer
CREATE consumer_2 WITH consumer_sleep_beh(buffer)
SEND (consumer_2, #take) TO buffer


The producer is kicked off with an initial delay (of up to 5 seconds) by a message to the random service. The initial readiness of each consumer is signalled by a #take message to the buffer.

## Conclusion

The bounded-buffer is a very useful mechanism for connecting producers and consumers. Producers push work into the buffer as it arrives, or is produced. Consumers pull work from the buffer when they are ready. If too much work is produced, the excess is rejected. This signal can be use to throttle the rate of production. Our buffer implementation generates status messages (here sent to the console) indicating the level of work in the queue. As an alternative, administrative messages to query buffer status could be added to the protocol of the buffer.