Producer-Consumer Patterns
Producer-Consumer
Patterns
The Producer-Consumer pattern is
classically defined as two threads or tasks coordinating their
behavior through a shared fixed length buffer. The producer writes
data to the buffer. The consumer reads data from the buffer. The
producer must stop writing to the buffer while the buffer is full.
The consumer must stop reading from the buffer while the buffer is
empty.
The size of the buffer is fixed and
does not grow during the life of the program. The minimum size of the
buffer must be one element. The maximum size of the buffer is limited
only by memory constraints on the program.
The classic Producer-Consumer design
uses one producer and one consumer.
Classic Producer-Consumer Pattern |
In this pattern the producer and
consumer have no direct interaction. Each interacts with the fixed
length buffer, allowing the two tasks to execute asynchronously until
the buffer is full or empty. It is important that the producer cannot
write to the buffer while the consumer is reading from the buffer to
avoid race conditions. Similarly, the consumer cannot read from the
buffer while the producer is writing to the buffer. Coordination of
writing and reading can be implemented through a simple binary
semaphore when there is one producer and one consumer.
Coordination becomes more difficult
when other configurations are used. Additional configurations include
- Single Producer with Multiple Consumers
- Multiple Producers with Single Consumer
- Multiple Producers with Multiple Consumers
Single Producer Multiple Consumer Pattern |
The coordination problem
becomes most complex when dealing with multiple producers writing to
a single buffer and multiple consumers reading from the same single
buffer.
Producer-Consumer Implemented Using Ada
Ada tasks are similar to threads in many other languages. Tasks are active objects. Ada protected objects are passive elements of concurrency. Protected objects are executed by tasks when a task calls one of the protected object's methods.Ada Tasks
Tasks work independently unless they are suspended while waiting for a lock or in an entry queue. Tasks communicate synchronously with each other through task entries. They communicate asynchronously with each other through protected objects.Task Entries
Task entries implement the Rendezvous model of communication. A task may declare an entry, which is a synchronization point during which data may be passed directly between tasks. A task may call another task's entry. The task declaring the entry can accept the entry call at the point in its logic best suited for handling the entry call.A task calling the entry of another task will suspend until the called task reaches the accept statement for that entry. At this point the two tasks are synchronized to allow data to be passed from one task to the other. If the called task reaches its accept statement before another task calls the corresponding entry the called task will suspend, waiting for a calling task. Following completion of the task entry both tasks will continue executing independently of each other.
Select statements
Ada provides a method for conditionally accepting entry calls. The select clause provides the ability to handle one of many entries, or to poll an entry so that a rendezvous can be handled without incurring a protracted suspension of the task. An example from the Ada Language Reference Manual is
task
body Server is
Current_Work_Item :
Work_Item;
begin
loop
select
accept
Next_Work_Item(WI : in Work_Item) do
Current_Work_Item :=
WI;
end;
Process_Work_Item(Current_Work_Item);
or
accept Shut_Down;
exit;
-- Premature shut down requested
or
terminate; --
Normal shutdown at end of scope
end
select;
end
loop;
end
Server;
Ada Protected Objects
Ada protected objects are protected against race conditions and
deadlock. Protected objects are designed to be shared by multiple
tasks. Protected objects implement sophisticated versions of Hoare
monitors. Protected objects handle their own locking mechanisms,
making their design highly Object Oriented. The calling tasks never
directly manipulate locks.Protected Methods
There are three kinds of protected methods:- Procedures
- Entries
- Functions
Protected Procedures
Protected Procedures are allowed to read from or write to the protected object. Protected Procedures acquire exclusive unconditional access to the Protected Object through a read-write lock.Protected Entries
Protected Entries are also allowed to read from or write to the protected object. Like Protected Procedures, Protected Entries acquire an exclusive read-write lock. The difference between a protected Procedure and a Protected Entry is the conditional nature of a Protected Entry call. Each Protected Entry is defined with a guard condition which must evaluate to True or False. When the guard condition is False execution of the Protected Entry is blocked and the calling task suspends until the guard condition evaluates to True. Each suspended task is placed in an Entry Queue so that calls to a Protected Entry are executed in the proper order. There are two common queuing policies; First In First Out (FIFO) and Priority. The default queuing policy is FIFO, so that the queued tasks execute the Protected Entry in the temporal order in which the Protected Entry was called.Protected Functions
Protected functions are only allowed to read data from the Protected Object. They are not allowed to modify the Protected Object in any way. Protected functions acquire a shared read-only lock on the Protected Object. This lock prevents Protected Procedures and Protected Entries from executing while the lock is asserted, but permit any number of simultaneous function calls to execute on the Protected Object. Protected objects cannot execute while a Protected Entry or Protected Procedure read-write lock is asserted.All these locking and queuing operations are performed implicitly. The programmer does not create any explicit lock or queue manipulation calls.
Ada Producer-Consumer Example
The following example uses Ada tasks, task entries, Ada Protected Objects and Protected Object entries.with Ada.Strings.Bounded;
generic
Buf_Size : Positive;
package Universal_PC is
package Label_Strings is new Ada.Strings.Bounded.Generic_Bounded_Length(30);
use Label_Strings;
task type Producer is
entry Set_Id(Label : Label_Strings.Bounded_String);
entry Done;
end Producer;
task type Consumer is
entry Set_Id(Label : Label_Strings.Bounded_String);
entry Stop;
end Consumer;
end Universal_PC;
with
Ada.Text_IO.Bounded_IO;
package
body Universal_PC is
package
Label_IO is new Ada.Text_IO.Bounded_IO (Label_Strings);
use
Label_IO;
------------
--
Buffer --
------------
type
Internal_Buf is array
(Natural range 0 .. Buf_Size - 1) of
Label_Strings.Bounded_String;
protected
Buffer is
entry
Read (Item : out Label_Strings.Bounded_String);
entry
Write (Item : in Label_Strings.Bounded_String);
private
Count : Natural := 0;
Buf : Internal_Buf;
Write_Index
: Natural := 0;
Read_Index : Natural := 0;
end
Buffer;
protected
body Buffer is
entry
Read (Item : out Label_Strings.Bounded_String)
when
Count > 0 is
begin
Item := Buf (Read_Index);
Count := Count - 1;
Read_Index
:= (Read_Index + 1) mod Buf_Size;
end
Read;
entry
Write (Item : in Label_Strings.Bounded_String)
when
Count < Buf_Size is
begin
Buf
(Write_Index) := Item;
Count := Count + 1;
Write_Index := (Write_Index + 1) mod Buf_Size;
end
Write;
end
Buffer;
--------------
--
Producer --
--------------
task
body Producer is
Id
: Label_Strings.Bounded_String;
begin
accept
Set_Id (Label : Label_Strings.Bounded_String) do
Id
:= Label;
end
Set_Id;
for
I in 1 .. 10 loop
Buffer.Write
(Id & ' ' & To_Bounded_String (I'Image));
delay
0.01;
end
loop;
accept
Done;
end
Producer;
--------------
--
Consumer --
--------------
task
body Consumer is
Id
: Label_Strings.Bounded_String;
Value
: Label_Strings.Bounded_String;
begin
accept
Set_Id (Label : Label_Strings.Bounded_String) do
Id
:= Label;
end
Set_Id;
loop
select
accept
Stop;
exit;
else
select
Buffer.Read
(Value);
Put_Line
(Id & ':' & ' ' & Value);
or
delay
0.01;
end
select;
end
select;
end
loop;
end
Consumer;
end
Universal_PC;
The Protected Object specification declares two Protected Entries for Buffer. The Write entry passes Bounded_String into Buffer and the Read entry passes a Bounded_String out of Buffer. The private part of the Protected Object specification defines the data contained in Buffer. Four data items are defined. Count maintains a count of the number of data elements currently in use in Buf array of Buffer. Buf is an array of Bounded_Strings. Write_Index contains the index for the next Write entry call. Read_Index contains the index for the next Read entry call.
The Protected Body contains the implementation of the two entry calls. The Read entry call has a condition stating that the call can only execute when Count > 0. This condition enforces the requirement that one cannot read from an empty Producer-Consumer queue. The Write entry call has a condition stating that the call can only execute when Count < Buf_Size. This condition enforces the requirement that one cannot write to a full Producer-Consumer queue. All the Protected Entry locking and queuing is written automatically by the compiler. The programmer is only concerned with writing the entry behaviors not associated with locking and queuing.
The package body also contains the implementation of the task bodies for Producer and Consumer.
Each Producer starts by accepting the Set_Id task entry, allowing the programmer to assign an ID label unique to each Producer task. The Producer task then iterates through the numbers in the range 1 through 10. Each iteration calls Buffer.Write, passing a bounded string containing the task ID concatenated with the iteration value. After writing all 10 values to Buffer the task accepts the Done entry then terminates.
Each Consumer task starts by accepting the Set_Id entry, just like the Producer tasks. The Consumer then enters a simple loop that iterates through a set of select calls polling the Stop task entry then polling the Buffer.Read protected entry. The loop exits when the Stop task entry is handled.
with
universal_pc;
procedure
Universal_pc_test is
package
Pc is new Universal_Pc(10);
use
PC;
P1
: Producer;
P2
: Producer;
C1
: Consumer;
C2
: Consumer;
begin
P1.Set_Id(Label_Strings.to_Bounded_String("Producer
1"));
P2.Set_Id(Label_Strings.To_Bounded_String("Producer
2"));
C1.Set_Id(Label_Strings.To_Bounded_String("Consumer
1"));
C2.Set_Id(Label_Strings.To_Bounded_String("Consumer
2"));
P1.Done;
P2.Done;
C1.Stop;
C2.Stop;
end
Universal_pc_test;
The generic package Universal_PC is instantiated with a Buf_Size of 10. Two producers, P1 and P2, are declared. Two Consumers, C1 and C2, are declared. The four tasks start as soon as execution of the program reaches the “begin” in the procedure Universal_pc_test. Initially all four tasks are suspended at their Set_Id task accept calls. The four task entries are called, passing appropriate labels to the four tasks.
P1.Done and P2.Done are called immediately. The calling task (Universal_pc_test) is suspended untils P1 and then P2 accept their Done entries. At that point both producers have completed. C1.Stop and C2.Stop are then called to terminate the two Consumer tasks.
The output of this program is
Consumer
1: Producer 1 1
Consumer
1: Producer 2 1
Consumer
1: Producer 2 2
Consumer
2: Producer 1 2
Consumer
1: Producer 1 3
Consumer
2: Producer 2 3
Consumer
1: Producer 2 4
Consumer
2: Producer 1 4
Consumer
1: Producer 2 5
Consumer
2: Producer 1 5
Consumer
1: Producer 2 6
Consumer
2: Producer 1 6
Consumer
2: Producer 2 7
Consumer
1: Producer 1 7
Consumer
2: Producer 1 8
Consumer
1: Producer 2 8
Consumer
2: Producer 2 9
Consumer
1: Producer 1 9
Consumer
2: Producer 1 10
Consumer
1: Producer 2 10
This solution will work with any Buf_Size greater than 0 and any number of producers and consumers.
You really need to sort out the code formatting, it's really narrow and all indentation has been removed.
ReplyDeleteI forgot how the blog editing tool removes formatting. Thanks for noticing.
DeleteThanks Jim for your great work. I have noticed that Producer tasks wait at accept Done after producing products. I thought tasks are supposed to run continuously unless block by PO. They can produce products while checking completion signal. I would suggest replace part of Producer task
ReplyDeletefor I in 1 .. 10 loop
Buffer.Write (Id & ' ' & To_Bounded_String (I'Image));
delay 0.01;
end loop;
accept Done;
by
loop
for I in 1 .. 10 loop
Buffer.Write (Id & " " & To_Bounded_String(I'Image));
delay 0.01;
end loop;
select
accept Done;
exit;
else
delay 0.01;
end select;
end loop;
Then in main procedure Universal_Pc_Test, insert 'delay 10.0' before P1.Done allowing the test to run for 10 seconds. The duration can be changed to one's dire to test to his heart content. By the way, instead of producing 10 values every time, I changed it to random between 1 to 10. The codes worked very well, also.
Producer-consumer patterns need not be continuous, although that may be common. I chose to have the producer tasks produce a small amount of data for the purpose of illustration. I could have added a Stop entry to the Producer task type, but that would have complicated the example. I was trying to show how the example provided correct task-buffer-task coordination no matter how many producers and consumers were used. Using a timer to shut things down would have complicated the ability to both process all data produced and cleanly shut down the consumers.
Delete