An Ada Publish-Subscribe Producer-Consumer Exercise

There are many ways to express the classic producer-consumer problem in Ada. Following is a problem which illustrates some of the interesting features of Ada protected objects.

Problem Statement

This producer-consumer problem has several requirements:
·         There shall be one producer and many consumers.
·         The number of consumers shall be established at run-time through user input.
·         The producer shall produce a series of random floating point numbers
·         All consumers shall read each number produced by the producer exactly once.
·         The producer shall not know the number of consumers
The requirement that the producer shall not know the number of consumers prohibits the use of the Ada Rendezvous mechanism for direct communication between the producer and all the consumers. An alternative compliant with this requirement is to use a protected object as a shared buffer between the producer and all the consumers. In fact, the solution below uses a protected object that implements a publish-subscribe mechanism for communication between the producer and all the consumers.

A Publish-Subscribe Solution

The example below handles the publish-subscribe details through the protected object. Each consumer must register with the protected object before it can read data from the protected object. The protected object only allows the producer to publish data when all the registered consumers are ready to read the data. The registered consumers  are only allowed to read a published value once.
The package specification for the Ada tasks and protected object is:



---------------------------------------------------------------------
-- This package implements a consumer task type and its associated
-- protected object.
-- All instances of the consumer task type consume data from the
-- protected object once each time it is written by the producer.
-- The producer cannot write a new value until all consumers are
-- ready to read the next value.
-- The producer does not know how many consumers there are, but must
-- not write any values until there is at least one consumer.
---------------------------------------------------------------------
with Ada.Task_Identification; use Ada.Task_Identification;

package Batch_Consumers is
   task type Consumer is
      entry Stop;
   end Consumer;
  
   task Producer is
      entry Stop;
   end Producer;

   protected Buffer is
      procedure Register;
      entry Read  (Value : out Float);
      entry Write (Value : in Float);
   private
      The_Value       : Float   := Float'First;
      Done            : Boolean := False;
      Trigger         : Natural := 0;
   end Buffer;

end Batch_Consumers;
The task type Consumer allows multiple consumers to be defined, while the task object Producer is unique, as is the protected object Buffer.
Both tasks have a Stop entry to facilitate shutting down the program in an orderly manner.
The instances of the task type Consumer will Register once before reading data from Buffer. The Consumers will then read data from Buffer until they are told to Stop.
The task Producer will write random floating point numbers to Buffer until Producer is told to Stop.
The devil is in the details, and the details are expressed in the package body.



with Ada.Task_Identification; use Ada.Task_Identification;
with Ada.Text_IO; use Ada.Text_IO;
with Ada.Numerics.Float_Random; use Ada.Numerics.Float_Random;

package body Batch_Consumers is

   --------------
   -- Consumer --
   --------------

   task body Consumer is
      My_Value : Float := Float'First;
   begin
      Buffer.Register;
      loop
         select
            accept Stop;
            exit;
         else
            select
               Buffer.Read(My_Value);
               Put_Line("Task " & Image(Current_Task) & " read the value: " &
                          Float'Image(My_Value));
            or
               delay 0.01;
            end select;
         end select;
      end loop;
   end Consumer;
  
   --------------
   -- Producer --
   --------------
  
   task body Producer is
      Num  : Float;
      Seed : Generator;
   begin
      Reset(Seed);
      loop
         select
            accept Stop;
            exit;
         else
            Num := Random(Seed);
            Put_Line("********************** Writing " & Float'Image(Num));
            Buffer.Write(Num);
         end select;
      end loop;
   end Producer;
  
   ------------
   -- Buffer --
   ------------

   protected body Buffer is

      --------------
      -- Register --
      --------------

      procedure Register is
      begin
         Trigger := Trigger + 1;
      end Register;

      ----------
      -- Read --
      ----------

      entry Read (Value : out Float) when Done is
      begin
         Value := The_Value;
         if Read'Count = 0 then
            Done := False;
         end if;
      end Read;

      -----------
      -- Write --
      -----------

      entry Write (Value : in Float) when Trigger > 0 and then
        Read'Count >= Trigger and then not Done is
      begin
         The_Value := Value;
         Done      := True;
      end Write;

   end Buffer;

end Batch_Consumers;

All the interesting control elements are embedded into the guard conditions for the protected entries Read and Write.
Every Ada entry maintains its own entry Queue. Tasks are enqueued, and suspended while the guard condition is false. Tasks are dequeued while the guard condition is true. Ada entry queues follow the Internal Progress First Rule, which means that all tasks queued on an entry call will be serviced, once the entry guard is true, before any other calls are accepted.
This rule is used to make the Read entry work as we want. When Done evaluates to true all tasks in the Read entry queue are serviced before any more calls on Read are accepted. Thus, within the body of the Read entry the value Done is set to False when the entry queue is empty. This closes the door to any more reads until the Write entry executes.
The Write entry will only execute under the following conditions:
·         There is at least one consumer registered and
·         The number of tasks waiting in the Read queue is at least as large as the number of registered consumers and
·         Done is False.
You can see here that the Buffer is aware of the number of consumers, but the Producer is not.
The main procedure used to test this package is:
with Batch_Consumers;     use Batch_Consumers;
with Ada.Text_IO;         use Ada.Text_IO;
with Ada.Integer_Text_IO; use Ada.Integer_Text_IO;

procedure Batch_Test is
   Num_Consumers : Positive;
begin
   Put ("Enter the number of consumer tasks: ");
   Get (Num_Consumers);
   declare
      Consumers : array (1 .. Num_Consumers) of Consumer;
   begin
      delay 2.0; -- wait for 2 seconds
      Producer.Stop;
      for C of Consumers loop
         C.Stop;
      end loop;
   end;
end Batch_Test;


The main procedure prompts the user for the number of consumer tasks, reads the response, then, in an inner block, declares an array of consumer equal to the number of consumer specified by the user. The main procedure delays for 2.0 seconds then stops Producer and each Consumer.

Comments

Popular posts from this blog

Threads of Confusion

Comparing Ada and High Integrity C++

Ada vs C++ Bit-fields