Producer-Consumer
Patterns
The Producer-Consumer pattern is
classically defined as two threads or tasks coordinating their
behavior through a shared fixed length buffer. The producer writes
data to the buffer. The consumer reads data from the buffer. The
producer must stop writing to the buffer while the buffer is full.
The consumer must stop reading from the buffer while the buffer is
empty.
The size of the buffer is fixed and
does not grow during the life of the program. The minimum size of the
buffer must be one element. The maximum size of the buffer is limited
only by memory constraints on the program.
The classic Producer-Consumer design
uses one producer and one consumer.
|
Classic Producer-Consumer Pattern |
In this pattern the producer and
consumer have no direct interaction. Each interacts with the fixed
length buffer, allowing the two tasks to execute asynchronously until
the buffer is full or empty. It is important that the producer cannot
write to the buffer while the consumer is reading from the buffer to
avoid race conditions. Similarly, the consumer cannot read from the
buffer while the producer is writing to the buffer. Coordination of
writing and reading can be implemented through a simple binary
semaphore when there is one producer and one consumer.
Coordination becomes more difficult
when other configurations are used. Additional configurations include
Single Producer with Multiple
Consumers
Multiple Producers with Single
Consumer
Multiple Producers with Multiple
Consumers
|
Single Producer Multiple Consumer Pattern |
|
Multiple Consumer Single Producer Pattern |
|
Multiple Producer Multiple Consumer Pattern |
The coordination problem
becomes most complex when dealing with multiple producers writing to
a single buffer and multiple consumers reading from the same single
buffer.
Producer-Consumer Implemented Using Ada
Ada tasks are similar to threads in many other languages. Tasks
are active objects. Ada protected objects are passive elements of
concurrency. Protected objects are executed by tasks when a task
calls one of the protected object's methods.
Ada Tasks
Tasks work independently unless they are suspended while waiting
for a lock or in an entry queue. Tasks communicate synchronously with
each other through task entries. They communicate asynchronously with
each other through protected objects.
Task Entries
Task entries implement the Rendezvous model of communication. A
task may declare an entry, which is a synchronization point during
which data may be passed directly between tasks. A task may call
another task's entry. The task declaring the entry can accept the
entry call at the point in its logic best suited for handling the
entry call.
A task calling the entry of another task will suspend until the
called task reaches the accept statement for that entry. At this
point the two tasks are synchronized to allow data to be passed from
one task to the other. If the called task reaches its accept
statement before another task calls the corresponding entry the
called task will suspend, waiting for a calling task. Following
completion of the task entry both tasks will continue executing
independently of each other.
Select statements
Ada provides a method for conditionally accepting entry calls. The
select clause provides the ability to handle one of many
entries, or to poll an entry so that a rendezvous can be handled
without incurring a protracted suspension of the task. An example
from the Ada Language Reference Manual is
task
body Server is
Current_Work_Item :
Work_Item;
begin
loop
select
accept
Next_Work_Item(WI : in Work_Item) do
Current_Work_Item :=
WI;
end;
Process_Work_Item(Current_Work_Item);
or
accept Shut_Down;
exit;
-- Premature shut down requested
or
terminate; --
Normal shutdown at end of scope
end
select;
end
loop;
end
Server;
Ada Protected Objects
Ada protected objects are protected against race conditions and
deadlock. Protected objects are designed to be shared by multiple
tasks. Protected objects implement sophisticated versions of Hoare
monitors. Protected objects handle their own locking mechanisms,
making their design highly Object Oriented. The calling tasks never
directly manipulate locks.
Protected Methods
There are three kinds of protected methods:
- Procedures
- Entries
- Functions
Protected Procedures
Protected Procedures are allowed to read from or write to the
protected object. Protected Procedures acquire exclusive
unconditional access to the Protected Object through a read-write
lock.
Protected Entries
Protected Entries are also allowed to read from or write to the
protected object. Like Protected Procedures, Protected Entries
acquire an exclusive read-write lock. The difference between a
protected Procedure and a Protected Entry is the conditional nature
of a Protected Entry call. Each Protected Entry is defined with a
guard condition which must evaluate to True or False. When the guard
condition is False execution of the Protected Entry is blocked and
the calling task suspends until the guard condition evaluates to
True. Each suspended task is placed in an Entry Queue so that calls
to a Protected Entry are executed in the proper order. There are two
common queuing policies; First In First Out (FIFO) and Priority. The
default queuing policy is FIFO, so that the queued tasks execute the
Protected Entry in the temporal order in which the Protected Entry
was called.
Protected Functions
Protected functions are only allowed to read data from the
Protected Object. They are not allowed to modify the Protected Object
in any way. Protected functions acquire a shared read-only lock on
the Protected Object. This lock prevents Protected Procedures and
Protected Entries from executing while the lock is asserted, but
permit any number of simultaneous function calls to execute on the
Protected Object. Protected objects cannot execute while a Protected
Entry or Protected Procedure read-write lock is asserted.
All these locking and queuing operations are performed implicitly.
The programmer does not create any explicit lock or queue
manipulation calls.
Ada Producer-Consumer Example
The following example uses Ada tasks, task entries, Ada Protected
Objects and Protected Object entries.
with
Ada.Strings.Bounded;
generic
Buf_Size
: Positive;
package
Universal_PC is
package
Label_Strings is new Ada.Strings.Bounded.Generic_Bounded_Length(30);
use
Label_Strings;
task type
Producer is
entry
Set_Id(Label : Label_Strings.Bounded_String);
entry Done;
end Producer;
task type
Consumer is
entry
Set_Id(Label : Label_Strings.Bounded_String);
entry Stop;
end Consumer;
end Universal_PC;
The package specification defines a bounded string type named
Label_Strings with a maximum length of 30 characters. It also defines
two task types; Producer and Consumer. The Producer task type has two
task entries; Set_Id, which passes in an instance of Bounded_String
and Done which passes no information. Done only acts to synchronize
the Producer and a calling task upon an event. In this case the event
is when the Producer is done. The Consumer task type has two task
entries; Set_Id, which passes an instance of Bounded_String to
Consumer, and Stop, which commands the Consumer task instance to
terminate.
with
Ada.Text_IO.Bounded_IO;
package
body Universal_PC is
package
Label_IO is new Ada.Text_IO.Bounded_IO (Label_Strings);
use
Label_IO;
------------
--
Buffer --
------------
type
Internal_Buf is array
(Natural range 0 .. Buf_Size - 1) of
Label_Strings.Bounded_String;
protected
Buffer is
entry
Read (Item : out Label_Strings.Bounded_String);
entry
Write (Item : in Label_Strings.Bounded_String);
private
Count : Natural := 0;
Buf : Internal_Buf;
Write_Index
: Natural := 0;
Read_Index : Natural := 0;
end
Buffer;
protected
body Buffer is
entry
Read (Item : out Label_Strings.Bounded_String)
when
Count > 0 is
begin
Item := Buf (Read_Index);
Count := Count - 1;
Read_Index
:= (Read_Index + 1) mod Buf_Size;
end
Read;
entry
Write (Item : in Label_Strings.Bounded_String)
when
Count < Buf_Size is
begin
Buf
(Write_Index) := Item;
Count := Count + 1;
Write_Index := (Write_Index + 1) mod Buf_Size;
end
Write;
end
Buffer;
--------------
--
Producer --
--------------
task
body Producer is
Id
: Label_Strings.Bounded_String;
begin
accept
Set_Id (Label : Label_Strings.Bounded_String) do
Id
:= Label;
end
Set_Id;
for
I in 1 .. 10 loop
Buffer.Write
(Id & ' ' & To_Bounded_String (I'Image));
delay
0.01;
end
loop;
accept
Done;
end
Producer;
--------------
--
Consumer --
--------------
task
body Consumer is
Id
: Label_Strings.Bounded_String;
Value
: Label_Strings.Bounded_String;
begin
accept
Set_Id (Label : Label_Strings.Bounded_String) do
Id
:= Label;
end
Set_Id;
loop
select
accept
Stop;
exit;
else
select
Buffer.Read
(Value);
Put_Line
(Id & ':' & ' ' & Value);
or
delay
0.01;
end
select;
end
select;
end
loop;
end
Consumer;
end
Universal_PC;
The package body provides the implementation of the tasks declared
in the package specification. It also provides the implementation for
the Protected Object named Buffer.
The Protected Object specification declares two Protected Entries
for Buffer. The Write entry passes Bounded_String into Buffer and the
Read entry passes a Bounded_String out of Buffer. The private part of
the Protected Object specification defines the data contained in
Buffer. Four data items are defined. Count maintains a count of the
number of data elements currently in use in Buf array of Buffer. Buf
is an array of Bounded_Strings. Write_Index contains the index for
the next Write entry call. Read_Index contains the index for the next
Read entry call.
The Protected Body contains the implementation of the two entry
calls. The Read entry call has a condition stating that the call can
only execute when Count > 0. This condition enforces the
requirement that one cannot read from an empty Producer-Consumer
queue. The Write entry call has a condition stating that the call can
only execute when Count < Buf_Size. This condition enforces the
requirement that one cannot write to a full Producer-Consumer queue.
All the Protected Entry locking and queuing is written automatically
by the compiler. The programmer is only concerned with writing the
entry behaviors not associated with locking and queuing.
The package body also contains the implementation of the task
bodies for Producer and Consumer.
Each Producer starts by accepting the Set_Id task entry, allowing
the programmer to assign an ID label unique to each Producer task.
The Producer task then iterates through the numbers in the range 1
through 10. Each iteration calls Buffer.Write, passing a bounded
string containing the task ID concatenated with the iteration value.
After writing all 10 values to Buffer the task accepts the Done entry
then terminates.
Each Consumer task starts by accepting the Set_Id entry, just like
the Producer tasks. The Consumer then enters a simple loop that
iterates through a set of select calls polling the Stop task entry
then polling the Buffer.Read protected entry. The loop exits when the
Stop task entry is handled.
with
universal_pc;
procedure
Universal_pc_test is
package
Pc is new Universal_Pc(10);
use
PC;
P1
: Producer;
P2
: Producer;
C1
: Consumer;
C2
: Consumer;
begin
P1.Set_Id(Label_Strings.to_Bounded_String("Producer
1"));
P2.Set_Id(Label_Strings.To_Bounded_String("Producer
2"));
C1.Set_Id(Label_Strings.To_Bounded_String("Consumer
1"));
C2.Set_Id(Label_Strings.To_Bounded_String("Consumer
2"));
P1.Done;
P2.Done;
C1.Stop;
C2.Stop;
end
Universal_pc_test;
This procedure is the “main” procedure or program entry point
for the program. Every program entry point is also implicitly a task.
The generic package Universal_PC is instantiated with a Buf_Size
of 10. Two producers, P1 and P2, are declared. Two Consumers, C1 and
C2, are declared. The four tasks start as soon as execution of the
program reaches the “begin” in the procedure Universal_pc_test.
Initially all four tasks are suspended at their Set_Id task accept
calls. The four task entries are called, passing appropriate labels
to the four tasks.
P1.Done and P2.Done are called immediately. The calling task
(Universal_pc_test) is suspended untils P1 and then P2 accept their
Done entries. At that point both producers have completed. C1.Stop
and C2.Stop are then called to terminate the two Consumer tasks.
The output of this program is
Consumer
1: Producer 1 1
Consumer
1: Producer 2 1
Consumer
1: Producer 2 2
Consumer
2: Producer 1 2
Consumer
1: Producer 1 3
Consumer
2: Producer 2 3
Consumer
1: Producer 2 4
Consumer
2: Producer 1 4
Consumer
1: Producer 2 5
Consumer
2: Producer 1 5
Consumer
1: Producer 2 6
Consumer
2: Producer 1 6
Consumer
2: Producer 2 7
Consumer
1: Producer 1 7
Consumer
2: Producer 1 8
Consumer
1: Producer 2 8
Consumer
2: Producer 2 9
Consumer
1: Producer 1 9
Consumer
2: Producer 1 10
Consumer
1: Producer 2 10
This solution will work with any Buf_Size greater than 0 and any
number of producers and consumers.