Producer-Consumer Patterns


Producer-Consumer Patterns
The Producer-Consumer pattern is classically defined as two threads or tasks coordinating their behavior through a shared fixed length buffer. The producer writes data to the buffer. The consumer reads data from the buffer. The producer must stop writing to the buffer while the buffer is full. The consumer must stop reading from the buffer while the buffer is empty.

The size of the buffer is fixed and does not grow during the life of the program. The minimum size of the buffer must be one element. The maximum size of the buffer is limited only by memory constraints on the program.

The classic Producer-Consumer design uses one producer and one consumer.

Classic Producer-Consumer Pattern


In this pattern the producer and consumer have no direct interaction. Each interacts with the fixed length buffer, allowing the two tasks to execute asynchronously until the buffer is full or empty. It is important that the producer cannot write to the buffer while the consumer is reading from the buffer to avoid race conditions. Similarly, the consumer cannot read from the buffer while the producer is writing to the buffer. Coordination of writing and reading can be implemented through a simple binary semaphore when there is one producer and one consumer.

Coordination becomes more difficult when other configurations are used. Additional configurations include
  • Single Producer with Multiple Consumers
  • Multiple Producers with Single Consumer
  • Multiple Producers with Multiple Consumers




Single Producer Multiple Consumer Pattern


Multiple Consumer Single Producer Pattern





Multiple Producer Multiple Consumer Pattern



The coordination problem becomes most complex when dealing with multiple producers writing to a single buffer and multiple consumers reading from the same single buffer.

Producer-Consumer Implemented Using Ada

Ada tasks are similar to threads in many other languages. Tasks are active objects. Ada protected objects are passive elements of concurrency. Protected objects are executed by tasks when a task calls one of the protected object's methods.



Ada Tasks

Tasks work independently unless they are suspended while waiting for a lock or in an entry queue. Tasks communicate synchronously with each other through task entries. They communicate asynchronously with each other through protected objects.

Task Entries

Task entries implement the Rendezvous model of communication. A task may declare an entry, which is a synchronization point during which data may be passed directly between tasks. A task may call another task's entry. The task declaring the entry can accept the entry call at the point in its logic best suited for handling the entry call.

A task calling the entry of another task will suspend until the called task reaches the accept statement for that entry. At this point the two tasks are synchronized to allow data to be passed from one task to the other. If the called task reaches its accept statement before another task calls the corresponding entry the called task will suspend, waiting for a calling task. Following completion of the task entry both tasks will continue executing independently of each other.

Select statements

Ada provides a method for conditionally accepting entry calls. The select clause provides the ability to handle one of many entries, or to poll an entry so that a rendezvous can be handled without incurring a protracted suspension of the task. An example from the Ada Language Reference Manual is



task body Server is
   Current_Work_Item : Work_Item;
begin
   loop
      select
         accept Next_Work_Item(WI : in Work_Item) do
            Current_Work_Item := WI;
         end;
         Process_Work_Item(Current_Work_Item);
      or
         accept Shut_Down;
         exit; -- Premature shut down requested
      or
         terminate; -- Normal shutdown at end of scope
      end select;
   end loop;
end Server;

Ada Protected Objects
Ada protected objects are protected against race conditions and deadlock. Protected objects are designed to be shared by multiple tasks. Protected objects implement sophisticated versions of Hoare monitors. Protected objects handle their own locking mechanisms, making their design highly Object Oriented. The calling tasks never directly manipulate locks.

Protected Methods

There are three kinds of protected methods:
  • Procedures
  • Entries
  • Functions

Protected Procedures

Protected Procedures are allowed to read from or write to the protected object. Protected Procedures acquire exclusive unconditional access to the Protected Object through a read-write lock.

Protected Entries

Protected Entries are also allowed to read from or write to the protected object. Like Protected Procedures, Protected Entries acquire an exclusive read-write lock. The difference between a protected Procedure and a Protected Entry is the conditional nature of a Protected Entry call. Each Protected Entry is defined with a guard condition which must evaluate to True or False. When the guard condition is False execution of the Protected Entry is blocked and the calling task suspends until the guard condition evaluates to True. Each suspended task is placed in an Entry Queue so that calls to a Protected Entry are executed in the proper order. There are two common queuing policies; First In First Out (FIFO) and Priority. The default queuing policy is FIFO, so that the queued tasks execute the Protected Entry in the temporal order in which the Protected Entry was called.

Protected Functions

Protected functions are only allowed to read data from the Protected Object. They are not allowed to modify the Protected Object in any way. Protected functions acquire a shared read-only lock on the Protected Object. This lock prevents Protected Procedures and Protected Entries from executing while the lock is asserted, but permit any number of simultaneous function calls to execute on the Protected Object. Protected objects cannot execute while a Protected Entry or Protected Procedure read-write lock is asserted.

All these locking and queuing operations are performed implicitly. The programmer does not create any explicit lock or queue manipulation calls.

Ada Producer-Consumer Example

The following example uses Ada tasks, task entries, Ada Protected Objects and Protected Object entries.



with Ada.Strings.Bounded;
generic
   Buf_Size : Positive;
package Universal_PC is
   package Label_Strings is new Ada.Strings.Bounded.Generic_Bounded_Length(30);
   use Label_Strings;

   task type Producer is
      entry Set_Id(Label : Label_Strings.Bounded_String);
      entry Done;
   end Producer;


   task type Consumer is
      entry Set_Id(Label : Label_Strings.Bounded_String);
      entry Stop;
   end Consumer;


end Universal_PC;


The package specification defines a bounded string type named Label_Strings with a maximum length of 30 characters. It also defines two task types; Producer and Consumer. The Producer task type has two task entries; Set_Id, which passes in an instance of Bounded_String and Done which passes no information. Done only acts to synchronize the Producer and a calling task upon an event. In this case the event is when the Producer is done. The Consumer task type has two task entries; Set_Id, which passes an instance of Bounded_String to Consumer, and Stop, which commands the Consumer task instance to terminate.



with Ada.Text_IO.Bounded_IO;

package body Universal_PC is
   package Label_IO is new Ada.Text_IO.Bounded_IO (Label_Strings);
   use Label_IO;

   ------------
   -- Buffer --
   ------------

   type Internal_Buf is array (Natural range 0 .. Buf_Size - 1) of
       Label_Strings.Bounded_String;

   protected Buffer is
      entry Read (Item : out Label_Strings.Bounded_String);
      entry Write (Item : in Label_Strings.Bounded_String);
   private
      Count       : Natural := 0;
      Buf         : Internal_Buf;
      Write_Index : Natural := 0;
      Read_Index  : Natural := 0;
   end Buffer;

   protected body Buffer is
      entry Read (Item : out Label_Strings.Bounded_String)
         when Count > 0 is
      begin
         Item       := Buf (Read_Index);
         Count      := Count - 1;
         Read_Index := (Read_Index + 1) mod Buf_Size;
      end Read;

      entry Write (Item : in Label_Strings.Bounded_String)
         when Count < Buf_Size is
      begin
         Buf (Write_Index) := Item;
         Count             := Count + 1;
         Write_Index       := (Write_Index + 1) mod Buf_Size;
      end Write;

   end Buffer;

   --------------
   -- Producer --
   --------------

   task body Producer is
      Id : Label_Strings.Bounded_String;
   begin
      accept Set_Id (Label : Label_Strings.Bounded_String) do
         Id := Label;
      end Set_Id;
      for I in 1 .. 10 loop
         Buffer.Write (Id & ' ' & To_Bounded_String (I'Image));
         delay 0.01;
      end loop;
      accept Done;
   end Producer;

   --------------
   -- Consumer --
   --------------

   task body Consumer is
      Id : Label_Strings.Bounded_String;
      Value : Label_Strings.Bounded_String;
   begin
      accept Set_Id (Label : Label_Strings.Bounded_String) do
         Id := Label;
      end Set_Id;
      loop
         select
            accept Stop;
            exit;
         else
            select
               Buffer.Read (Value);
               Put_Line (Id & ':' & ' ' & Value);
            or
               delay 0.01;
            end select;
         end select;
      end loop;
   end Consumer;

end Universal_PC;


The package body provides the implementation of the tasks declared in the package specification. It also provides the implementation for the Protected Object named Buffer.
The Protected Object specification declares two Protected Entries for Buffer. The Write entry passes Bounded_String into Buffer and the Read entry passes a Bounded_String out of Buffer. The private part of the Protected Object specification defines the data contained in Buffer. Four data items are defined. Count maintains a count of the number of data elements currently in use in Buf array of Buffer. Buf is an array of Bounded_Strings. Write_Index contains the index for the next Write entry call. Read_Index contains the index for the next Read entry call.

The Protected Body contains the implementation of the two entry calls. The Read entry call has a condition stating that the call can only execute when Count > 0. This condition enforces the requirement that one cannot read from an empty Producer-Consumer queue. The Write entry call has a condition stating that the call can only execute when Count < Buf_Size. This condition enforces the requirement that one cannot write to a full Producer-Consumer queue. All the Protected Entry locking and queuing is written automatically by the compiler. The programmer is only concerned with writing the entry behaviors not associated with locking and queuing.

The package body also contains the implementation of the task bodies for Producer and Consumer.
Each Producer starts by accepting the Set_Id task entry, allowing the programmer to assign an ID label unique to each Producer task. The Producer task then iterates through the numbers in the range 1 through 10. Each iteration calls Buffer.Write, passing a bounded string containing the task ID concatenated with the iteration value. After writing all 10 values to Buffer the task accepts the Done entry then terminates.

Each Consumer task starts by accepting the Set_Id entry, just like the Producer tasks. The Consumer then enters a simple loop that iterates through a set of select calls polling the Stop task entry then polling the Buffer.Read protected entry. The loop exits when the Stop task entry is handled.



with universal_pc;

procedure Universal_pc_test is
   package Pc is new Universal_Pc(10);
   use PC;

   P1 : Producer;
   P2 : Producer;
   C1 : Consumer;
   C2 : Consumer;
begin
   P1.Set_Id(Label_Strings.to_Bounded_String("Producer 1"));
   P2.Set_Id(Label_Strings.To_Bounded_String("Producer 2"));
   C1.Set_Id(Label_Strings.To_Bounded_String("Consumer 1"));
   C2.Set_Id(Label_Strings.To_Bounded_String("Consumer 2"));
   P1.Done;
   P2.Done;
   C1.Stop;
   C2.Stop;
end Universal_pc_test;


This procedure is the “main” procedure or program entry point for the program. Every program entry point is also implicitly a task.

The generic package Universal_PC is instantiated with a Buf_Size of 10. Two producers, P1 and P2, are declared. Two Consumers, C1 and C2, are declared. The four tasks start as soon as execution of the program reaches the “begin” in the procedure Universal_pc_test. Initially all four tasks are suspended at their Set_Id task accept calls. The four task entries are called, passing appropriate labels to the four tasks.

P1.Done and P2.Done are called immediately. The calling task (Universal_pc_test) is suspended untils P1 and then P2 accept their Done entries. At that point both producers have completed. C1.Stop and C2.Stop are then called to terminate the two Consumer tasks.

The output of this program is

Consumer 1: Producer 1 1
Consumer 1: Producer 2 1
Consumer 1: Producer 2 2
Consumer 2: Producer 1 2
Consumer 1: Producer 1 3
Consumer 2: Producer 2 3
Consumer 1: Producer 2 4
Consumer 2: Producer 1 4
Consumer 1: Producer 2 5
Consumer 2: Producer 1 5
Consumer 1: Producer 2 6
Consumer 2: Producer 1 6
Consumer 2: Producer 2 7
Consumer 1: Producer 1 7
Consumer 2: Producer 1 8
Consumer 1: Producer 2 8
Consumer 2: Producer 2 9
Consumer 1: Producer 1 9
Consumer 2: Producer 1 10
Consumer 1: Producer 2 10



This solution will work with any Buf_Size greater than 0 and any number of producers and consumers.

Comments

  1. You really need to sort out the code formatting, it's really narrow and all indentation has been removed.

    ReplyDelete
    Replies
    1. I forgot how the blog editing tool removes formatting. Thanks for noticing.

      Delete
  2. Thanks Jim for your great work. I have noticed that Producer tasks wait at accept Done after producing products. I thought tasks are supposed to run continuously unless block by PO. They can produce products while checking completion signal. I would suggest replace part of Producer task

    for I in 1 .. 10 loop
    Buffer.Write (Id & ' ' & To_Bounded_String (I'Image));
    delay 0.01;
    end loop;
    accept Done;

    by

    loop
    for I in 1 .. 10 loop
    Buffer.Write (Id & " " & To_Bounded_String(I'Image));
    delay 0.01;
    end loop;
    select
    accept Done;
    exit;
    else
    delay 0.01;
    end select;
    end loop;

    Then in main procedure Universal_Pc_Test, insert 'delay 10.0' before P1.Done allowing the test to run for 10 seconds. The duration can be changed to one's dire to test to his heart content. By the way, instead of producing 10 values every time, I changed it to random between 1 to 10. The codes worked very well, also.

    ReplyDelete
    Replies
    1. Producer-consumer patterns need not be continuous, although that may be common. I chose to have the producer tasks produce a small amount of data for the purpose of illustration. I could have added a Stop entry to the Producer task type, but that would have complicated the example. I was trying to show how the example provided correct task-buffer-task coordination no matter how many producers and consumers were used. Using a timer to shut things down would have complicated the ability to both process all data produced and cleanly shut down the consumers.

      Delete

Post a Comment

Popular posts from this blog

Threads of Confusion

Comparing Ada and High Integrity C++

Ada vs C++ Bit-fields