Parallel Addition Revisited

In an earlier article I discussed aspects of parallel addition of elements of an array. A more general problem would be to sum a set of numbers from a collection of numbers. The collection cannot be simply an arbitrary unending stream of numbers, because a sum of such a stream could never be completed. Instead, it must be some discrete sample size of numbers. Furthermore, the sample size cannot be zero. There must be at least one number in the sample or no sum is possible.

The following example collects a sample of numbers into a queue and then uses concurrent tasks to perform pair-wise addition of the queue elements. Each result of pair-wise addition is placed back on the queue. Eventually, given a discrete number of items in the queue, the queue will be left containing only one value. That value will be the total of all the operations on the set of numbers. This approach works because addition is commutative. The order the numbers are added is irrelevant to the solution.

Parallel Addition In Ada

The following source code accomplishes this more general parallel addition capability.
The key to the operation of the program is the protected object Queue, which contains the list of numbers shared by all the parallel tasks.
Each task performing addition calls the entry Get_Two, which dequeues two integers from the Queue object. The task then adds the two integers and calls the procedure Enqueue to place the result on the Queue object.

Parallel_Addition.ads
------------------------------------------------------------------
-- Parallel Addition Package                                    --
------------------------------------------------------------------
with Ada.Containers.Doubly_Linked_Lists;

package Parallel_Addition is
   Package Int_Queue is new Ada.Containers.Doubly_Linked_Lists(Integer);
   use Int_Queue;

   protected Queue is
      Entry Get_Two(Item1, Item2 : out Integer);
      Entry Dequeue(Item : out Integer);
      Procedure Enqueue(Item : in Integer);
      Procedure Clear;
      Function Count return Integer;
   private
      Nums : Int_Queue.List;
   end Queue;

   task type Adder;

end Parallel_Addition;

Inner workings of the Parallel_Addition package

I have chosen the Doubly Linked List package provided by the Ada container library to implement my queue. The Enqueue procedure appends values to the end of the list (queue) and both the Get_Two and Dequeue entries dequeue values from the beginning of the list (queue).

Ada entries have a boundary condition. The boundary condition for the Get_Two entry evaluates to TRUE when the list contains more than 1 element. The boundary condition for the Dequeue entry evaluates to TRUE when the list contains more than 0 elements. The Clear procedure empties the list. The Count function reports the number of tasks suspended on the Get_Two entry queue. Every task calling an entry will suspend until the boundary condition evaluates to TRUE. The tasks will then be allowed to execute the entry in FIFO order, ensuring that all tasks are allowed to reliably access the entry.

The Adder task type contains an infinite loop which simply calls Get_Two to dequeue two values from the Queue object, then Enqueues the sum of those two values onto the Queue. Since every pair of values is replaced with a single value, the size of the Queue rapidly gets smaller. Once there is only one value left in the Queue, the Adder tasks will all suspend waiting for a second value. The last value left on the Queue is the total of all the numbers placed on the Queue.

Parallel_Addition.adb
with Ada.Containers; use Ada.Containers;
package body Parallel_Addition is
   -----------
   -- Queue --
   -----------
   --------------------------------------------------------------------
   -- The Queue protected object treats its internal list as a queue.--
   -- All data is read from the head of the list and new data is     --
   -- appended to the end of the list. Reading data from the head    --
   -- also deletes that data from the queue.                         --
   --                                                                --
   -- The Count function reports the number of tasks waiting in the  --
   -- entry queue for the Get_Two entry.                             --
   -- The Clear procedure empties the internal list of its contents. --
   --------------------------------------------------------------------
   protected body Queue is
   -------------
   -- Get_Two --
   -------------
   entry Get_Two (Item1, Item2 : out Integer) when Nums.Length > 1 is
   begin
      Item1 := Nums.First_Element;
      Nums.Delete_First;
      Item2 := Nums.First_Element;
      Nums.Delete_First;
   end Get_Two;

   -------------
   -- Dequeue --
   -------------
   entry Dequeue (Item : out Integer) when Nums.Length > 0 is
   begin
      Item := Nums.First_Element;
      Nums.Delete_First;
   end Dequeue;

   -------------
   -- Enqueue --
   -------------
   procedure Enqueue (Item : in Integer) is
   begin
      Nums.Append(Item);
   end Enqueue;

   -----------
   -- Count --
   -----------

   function Count return Integer is
   begin
      Return Get_Two'Count;
   end Count;

   -----------
   -- Clear --
   -----------
   procedure Clear is
   begin
      Nums.Clear;
   end Clear;
   end Queue;

   -----------
   -- Adder --
   -----------
   ---------------------------------------------------------------------
   -- Each Adder task simply calls Get_Two to dequeue two values from --
   -- the queue. It then enqueues the sum of those two values back    --
   -- on the queue. The queue will contain the final sum when it      --
   -- contains only a single value and no tasks are holding the lock  --
   -- for the Get_Two entry.                                          --
   ---------------------------------------------------------------------
   task body Adder is
      Value1, Value2 : Integer;
   begin
      Loop
         Queue.Get_Two(Value1, Value2);
         Queue.Enqueue(Value1 + Value2);
      end loop;
   end Adder;
end Parallel_Addition;

A test driver for the Parallel_Addition package

The Parallel_Addition_Main procedure is my test driver for the Parallel_Addition package.
The subtype The_Nums is created to define a range of values to add. In the source code below the range of values is specified as -600 through 601 inclusive. The sum of this set of numbers should be 601.
An array of Adder tasks is created. The tasks start running immediately. Since there are no values in the Queue protected object, all the tasks will suspend in the Get_Two entry queue waiting for the boundary condition to evaluate to TRUE.
The test driver performs the addition 20 times. This is done to reveal any race conditions that might exist in the code. For each iteration of the test the Queue is cleared and the numbers are Enqueued into the Queue object. The test driver then polls the Queue.Count function to determine when all the Adder tasks are once again suspended in the Get_Two entry queue. That event indicates there is only one value left in the Queue, which is our total. That final value is Dequeued and displayed.
After the 20 iterations all the Adder tasks are aborted and the program ends in an orderly manner.

Parallel_Addition_Main.adb
------------------------------------------------------------------
-- Parallel Addition Main Program                               --
--                                                              --
-- This program uses a task pool contained in the array named   --
-- workers. The task pool is reused for multiple iterations.    --
-- The tasks are aborted at the end of the program.             --
------------------------------------------------------------------
with Parallel_Addition; use Parallel_Addition;
with Ada.Text_IO; use Ada.Text_IO;
with Ada.Task_Identification; use Ada.Task_Identification;

procedure Parallel_Addition_Main is
   subtype The_Nums is Integer range -600..601;
   Workers : array(1..3) of Adder;
   Total : Integer;
begin
   for I in 1..20 loop
      Put(I'Img & " ");
      Queue.Clear;
      for I in The_Nums'Range loop
         Queue.Enqueue(I);
      end loop;

      Loop
         exit when Queue.Count = Workers'Length;
      end loop;

      Queue.Dequeue(Total);
      Put_Line("The total of the series of numbers from " &
                The_Nums'First'Img & " to " &
                The_Nums'Last'Img & " is " &
                Total'Img);
   end loop;

   for Worker of Workers loop
      Abort_Task(Worker'Identity);
   end loop;
end Parallel_Addition_Main;


Program output
 1 The total of the series of numbers from -600 to  601 is  601
 2 The total of the series of numbers from -600 to  601 is  601
 3 The total of the series of numbers from -600 to  601 is  601
 4 The total of the series of numbers from -600 to  601 is  601
 5 The total of the series of numbers from -600 to  601 is  601
 6 The total of the series of numbers from -600 to  601 is  601
 7 The total of the series of numbers from -600 to  601 is  601
 8 The total of the series of numbers from -600 to  601 is  601
 9 The total of the series of numbers from -600 to  601 is  601
 10 The total of the series of numbers from -600 to  601 is  601
 11 The total of the series of numbers from -600 to  601 is  601
 12 The total of the series of numbers from -600 to  601 is  601
 13 The total of the series of numbers from -600 to  601 is  601
 14 The total of the series of numbers from -600 to  601 is  601
 15 The total of the series of numbers from -600 to  601 is  601
 16 The total of the series of numbers from -600 to  601 is  601
 17 The total of the series of numbers from -600 to  601 is  601
 18 The total of the series of numbers from -600 to  601 is  601
 19 The total of the series of numbers from -600 to  601 is  601
 20 The total of the series of numbers from -600 to  601 is  601


Comments

Popular posts from this blog

Threads of Confusion

Comparing Ada and High Integrity C++

Ada vs C++ Bit-fields