Parallel Iterator - Quick Start


Here you will find background information for the Parallel Iterator's motivation, as well as some examples to illustrate the numerous features it provides.

1. Motivation

1.1 Why Parallel computing?

Parallel computing has arrived to mainstream desktop systems in the form of multi-core processors because of the difficulties in maintaining improvements in uni-processor clock-speed. Unfortunately, users will not witness any performance improvements unless their applications are parallelized. Parallel computing is however notoriously difficult, especially in terms of program correctness and high performance.

1.2 Object-oriented programming and (sequential) Iterators

An important aspect of the Parallel Iterator is that it focusses on object-oriented (OO) programming, since the most popular languages are OO [TIOBE], especially for general purpose desktop applications. Iterative computations usually carry the lion's share of computational load, and in OO languages this is often implemented with iterators. For example, consider the following Java application code that traverses a collection of files and processes each one:
	Collection<File> elements = ...
Iterator<File> it = elements.iterator();
while ( it.hasNext() ) {
File file = it.next();
processFile(file);
}
Unfortunately, this code only contains one thread - therefore only one processor core will be employed to execute the loop while the other cores remain idle! So, what are software developers left to do? Unfortunately, they must first create multiple threads, and then explicitly distribute the elements amongst the threads. The following problems arise:
  • how are the elements distributed amongst the threads? (i.e. the scheduling policy)
  • how to implement such a scheme?
  • what about the correctness of the manually-implemented distribution scheme?
  • what about the performance of this scheme?
In most cases, the programmer must manually implement such a scheduling policy. Unfortunately, the simplest schemes to implement tend to be the least efficient, while the most efficient schemes tend to be too complicated and error prone.

1.3 Parallel Iterator

This is where the Parallel Iterator comes in. It is a thread-safe Iterator that is shared amongst the threads, all without worrying about implementing the underlying scheduling policy (including the tedious parallelization concerns). Anyone familiar with the standard sequential Iterator will find the Parallel Iterator very intuitive. Below is the same example from above but using the Parallel Iterator:
	Collection<File> elements = ...
ParIterator<File> it = ParIteratorFactory.createParIterator(elements, threadCount);

// each thread does this
while ( it.hasNext() ) {
File file = it.next();
processFile(file);
} // implicit synchronization barrier
What is the difference compared to the sequential Iterator?
The difference is that Parallel Iterator is thread-safe, unlike the standard sequential Iterator. This allows all the threads to share it, while the distribution of the elements is handled internally. Other than that, notice how Parallel Iterator has the same interface as the standard sequential Iterator. In fact, the Parallel Iterator interface even extends the sequential Iterator's interface! 

What is the implicit synchronization barrier for?
Since multiple threads are sharing the same Parallel Iterator, it is important that none of the threads continue executing past the loop until all the other threads have also finished. This is because code following the loop might rely on results of the completed loop. Therefore, the call to hasNext() will actually block a thread until all the other threads finish executing their respective iterations. After this block, false is returned to all threads and they therefore break out of the loop together.

Wait a minute, is the combination of hasNext()/next() atomic?!?!
Absolutely! At first, it might seem that the respective calls of hasNext() and next() are not atomic, since another thread might also invoke hasNext() in between! This is the main reason why the standard sequential Iterator cannot be used, since only one element might remain in the list (and 'true' is returned to more than one thread!).
    However, the Parallel Iterator is totally thread safe! If a particular thread receives 'true' from calling hasNext(), then it is guaranteed that an element is reserved for it. Other threads will receive 'false' (after blocking at the barrier mentioned above).
    For this reason, threads must continue to iterate until they receive  'false' from the Parallel Iterator. Otherwise, the Parallel Iterator will keep waiting for that thread to come back to get the elements it reserved for it. Early loop termination is still possible, by using one of the Parallel Iterator's break mechanisms (see section 2.4).


2.  Parallel Iterator examples

2.1  Hello, World!

No tutorial would be complete without the nutorious Hello, World! example! Below is the complete code to run our first application using the Parallel Iterator:

MainApp.java (download here)
import pu.pi.ParIterator;
import pu.pi.ParIteratorFactory;
import java.util.*;

public class MainApp {

public static Collection<String> getElements() {
List<String> list = new ArrayList<String>();
list.add("one");
list.add("two");
list.add("three");
list.add("four");
list.add("five");
list.add("six");
list.add("seven");
list.add("eight");
return list;
}

public static void main(String[] args) {

int threadCount = 2;

// Get a Parallel Iterator for the collection of elements
Collection<String> elements = getElements();
ParIterator<String> pi = ParIteratorFactory.createParIterator(elements, threadCount);
 
// Create and start a pool of worker threads
Thread[] threadPool = new WorkerThread[threadCount];
for (int i = 0; i < threadCount; i++) {
threadPool[i] = new WorkerThread(i, pi);
threadPool[i].start();
}

// ... Main thread may compute other (independant) tasks

// Main thread waits for worker threads to complete
for (int i = 0; i < threadCount; i++) {
try {
threadPool[i].join();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("All worker threads have completed.");
}
}

WorkerThread.java (download here)
import pu.pi.ParIterator;

public class WorkerThread extends Thread {

private ParIterator<String> pi = null;
private int id = -1;

public WorkerThread(int id, ParIterator<String> pi) {
this.id = id;
this.pi = pi;
}

public void run() {
while (pi.hasNext()) {
String element = pi.next();
System.out.println("Thread " + id + " got element: " + element);

// slow down the threads (to illustrate the scheduling)
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(" Thread "+id+" has finished.");
}
}
A possible output from the above program might look like:
Thread 0 got element: one
Thread 1 got element: two
Thread 0 got element: three
Thread 1 got element: four
Thread 0 got element: five
Thread 1 got element: six
Thread 1 got element: seven
Thread 0 got element: eight
Thread 0 has finished.
Thread 1 has finished.
All worker threads have completed.

The Parallel Iterator in this example made use of the default scheduling policy (dynamic scheduling with chunk size of one). In this policy, elements are distributed one at a time to threads as they are requested. This works well for most applications, but some situations might require other scheduling policies (see section 2.2).

2.2  Scheduling policies

What is a scheduling policy?
A scheduling policy determines how the iteration space is divided amongst threads into smaller chunks. The chunk size allows for further fine-tuning, by determining how many iterations at a time are reserved for a particular thread. The Parallel Iterator has been implemented to support the following major scheduling policies:
  • Static: all iterations are assigned to threads before the execution of the loop. This may either be block or cyclic:
    • Block: each thread is assigned (at most) one large chunk.
    • Cyclic: the iterations are grouped into smaller chunks, and threads are assigned chunks in a round-robin fashion.
  • Dynamic: each thread requests a chunk of iterations to process at runtime. 
  • Guided: similar to dynamic, except the size of each chunk decreases as iterations are distributed.
Static block Static block
(a) Static block (b) Static cyclic
Static block Static block
(c) Dynamic (d) Guided
Figure 1: The major scheduling policies supported by the Parallel Iterator. These examples show some of the possible ways a collection of 9 elements might be allocated amongst 3 threads.

Why would I want to change the scheduling policy?
For many applications, the default scheduling policy will be sufficient. However, some cases might require a slightly different policy. For example:
  • if iterations are very fine-grained, then a dynamic scheduling policy with a chunk size of 1 is the worst policy to use due to the high amounts of run-time synchronization overhead required. In such a scenario, it is best to increase the chunk size.
  • if iterations are known to require equal amounts of computing, then a static scheduling is best since this minimizes the run-time distribution overhead. However, such a policy is not good for iterations that take variable amounts of computing time since this reserves too many iterations while other threads remain idle.
  • the guided schedule is a compromise between the dynamic and static block scheduling schemes: chunks are reserved for each thread proportional to the number of threads and number of iterations remaining. Therefore, this scheme starts off looking like a static block scheduling policy, but eventually converges into a dyncamic schedule.

How are these scheduling policies used?

Using the above scheduling policies is extremely easy! The first Hello, World! example above used the default scheduling policy, which is dynamic with chunk size 1. To use any of the other scheduling schemes, simply specify this when creating the Parallel Iterator.

To best illustrate the different scheduling schemes, we will modify the run() method of the WorkerThread as follows (only the code in blue has been modified):
	public void run() {
while (pi.hasNext()) {
String element = pi.next();
System.out.println("Thread " + id + " got element: " + element);

// slow down the threads (to illustrate the scheduling)
try {
if (id == 0)
Thread.sleep(50);
if (id == 1)
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Thread "+id+" has finished.");
}
Iterations of the thread 0 will take only 50ms, while the iterations of thread 1 will take 1000ms. Below are the various outputs as we change the scheduling policy. Notice that the only modification is the parameters in creating the Parallel Iterator; other than this, the rest of the code remains the same! 

ParIteratorFactory.createParIterator(elements, threadCount);

This example uses the default scheduling policy: dynamic with chunk size of 1:
Thread 0 got element: one
Thread 1 got element: two
Thread 0 got element: three
Thread 0 got element: four
Thread 0 got element: five
Thread 0 got element: six
Thread 0 got element: seven
Thread 0 got element: eight
Thread 1 has finished.
Thread 0 has finished.
All worker threads have completed.
Notice how thread 1 only managed to complete 1 of the 8 iterations, since thread 0 had much smaller iterations to compute.

ParIteratorFactory.createParIterator(elements, threadCount, ParIterator.Schedule.DYNAMIC, 3);

This example uses dynamic scheduling with chunk size of 3:
Thread 0 got element: one
Thread 1 got element: four
Thread 0 got element: two
Thread 0 got element: three
Thread 0 got element: seven
Thread 0 got element: eight
Thread 1 got element: five
Thread 1 got element: six
Thread 1 has finished.
Thread 0 has finished.
All worker threads have completed.
The above scheduling policy changed the chunk size to 3. This means that 3 iterations are reserved at a time for a thread. In this particular run, thread 0 was first to the Parallel Iterator, so it was reserved elements 'one', 'two' and 'three'. Thread 1 was then allocated iterations 'four', 'five' and 'six'.
    Since thread 1 is much slower than thread 0, completed it's current chunk (consisting of 'one', 'two' and 'three'). It then went back to the Parallel Iterator and got the next chunk reserved: only elements 'seven' and 'eight' are unreserved so these were assigned to thread 0. By this time, thread 1 has not even completed its first iteration! Thread 0 however is forced to wait until thread 1 completes the other elements.
    Note that is this particular example, increasing the chunk-size was a bad idea. But for other fine-grained iterations, increasing the chunk size is a good thing. This is why the Parallel Iterator allows the scheduling to be customised, since it is application specific.

ParIteratorFactory.createParIterator(elements, threadCount, ParIterator.Schedule.STATIC);

This example uses static block scheduling (block scheduling if no chunk size is specified):
Thread 0 got element: one
Thread 1 got element: five
Thread 0 got element: two
Thread 0 got element: three
Thread 0 got element: four
Thread 1 got element: six
Thread 1 got element: seven
Thread 1 got element: eight
Thread 1 has finished.
Thread 0 has finished.
All worker threads have completed.
In static scheduling, the results above will look the same regardless of how many times we run the program or even if we change the sleep amount for each thread. This is because the iterations are assigned before the loop begins executing. Since we have 2 threads and 8 elements, thread 0 gets the first half of the elements while thread 1 gets the second half. Again, thread 0 waits for thread 1 to finish. In some situations, where the iterations are known to be equal sized and fine-grained, static block scheduling works very well.

ParIteratorFactory.createParIterator(elements, threadCount, ParIterator.Schedule.STATIC, 2);

This example uses static scheduling with a chunk size of 2:
Thread 0 got element: one
Thread 1 got element: three
Thread 0 got element: two
Thread 0 got element: five
Thread 0 got element: six
Thread 1 got element: four
Thread 1 got element: seven
Thread 1 got element: eight
Thread 1 has finished.
Thread 0 has finished.
All worker threads have completed.
Again, in this static cyclic scheduling example, iterations are assigned before the loop begins executed. However, chunks are broken into smaller sizes by specifying a chunk size. Therefore, the 8 elements are broken into 4 chunks of size 2:
  • thread 0 will always get the 1st chunk containing iterations 'one', 'two'
  • thread 1 will always get the 2nd chunk containing iterations 'three', 'four'
  • thread 0 will always get the 3rd chunk containing iterations 'five', 'six'
  • thread 1 will always get the 4th chunk containing iterations 'seven', 'eight'
ParIteratorFactory.createParIterator(elements, threadCount, ParIterator.Schedule.GUIDED, 1);

This example uses guided scheduling with a chunk size of 1:
Thread 0 got element: one
Thread 1 got element: five
Thread 0 got element: two
Thread 0 got element: three
Thread 0 got element: four
Thread 0 got element: seven
Thread 0 got element: eight
Thread 1 got element: six
Thread 1 has finished.
Thread 0 has finished.
All worker threads have completed.
As mentioned earlier, the guided schedule is a combination of static and dynamic. The static part comes from the fact that the number of chunks (and their respective sizes) is fixed. However, the dynamic component of guided is that these chunks are only assigned as threads request work. Therefore, in our example, the 8 elements are broken into the following 4 chunks (initially unassigned to the threads). Note that sizes are determined by dividing the number of elements left by the number of threads:
  • The 1st chunk contains iterations 'one', 'two', 'three', 'four' (8/2 = 4)
  • The 2nd chunk contains iterations 'five', 'six' (4/2 = 2)
  • The 3rd chunk contains iteration 'seven' (2/2 = 1)
  • The 4th chunk contains iteration 'eight' (1/2 = 1)
Since thread 1 is much slower, it only managed to get the 2nd chunk, while thread 0 executed all the other chunks. Note that the chunk size in the context of the guided schedule resembles the minimum number of elements in a chunk (unless fewer elements remain). Therefore, if the chunk size in this case was increased to 2, then only 3 chunks would exist (since the 3rd chunk would contain the last two remaining elements).

2.3  Reductions

For programs that share variables, programmers must provide mutual exclusion to ensure correct results. In a threading library, this is typically solved using a mutex. Unfortunately, programs with fine-grained parallelism would suffer heavily in performance; consequently, a reduction is a standard parallelization technique to deal with such cases. PARCutils offers RedLib as a reduction library that provides an extensive set of ready-to-use reduction operations. Below is a simple example to illustrate the necessary steps in performing a reduction with the Parallel Iterator:
// initialise Parallel Iterator and Reducible
Collection<Integer> elements = ...
ParIterator<Integer> it = ParIteratorFactory.createParIterator(elements);
Reducible<Integer> localMin = new Reducible<Integer>(Integer.MAX);

// each thread does this while (it.hasNext()) { int v = it.next(); if (v < localMin.get()) localMin.set(v); }
// final code, executed by any thread int finalMin = localMin.reduce(new IntegerMinimum());

The Reducible object is essentially a thread-local object. In fact, the get() and set() methods are used exactly like Java's ThreadLocal class! The only difference is the reduce() method at the end, it performs the specified reduction across all the internal thread-local values, and returns the final value.

What if I want a custom (user-defined) reduction?
As it is mentioned above, RedLib already provides an extensive set of reduction operations. Moreover, users are able to extend RedLib pre-defined operations by nesting the existing operations.
  However, no one can claim that there will be no other possible operations that are not provided by RedLib. Therefore, one might still require to develop their own custom reductions.
  As a matter of fact, the Reducible class is quite flexible in this aspect, such that you can specify your own customised reductions using any object type! The only requirements are that the custom reduction must be:
  • associative: the order of evaluating the reduction makes no difference, and 
  • commutative: the order of the thread-local values makes no difference.
In order to define such a reduction, simply create a class that extends the Reduction interface of RedLib. Following is a use-case for which the reduction is already provided by RedLib, but for the sake of this tutorial let's assume that a user needs to implement their own operation. Let's consider the following application where the WorkerThreads count word occurances in a collection of Strings (or a collection of Files!):
	
public void run() {
  HashMap<String, Integer> myMap = new HashMap<String, Integer>();
  localMap.set(myMap);
  while (pi.hasNext()) {
    String element = pi.next();
    System.out.println("Thread "+id+" got element: "+element);
    if (myMap.containsKey(element)) {
      int oldCount = myMap.get(element);
      myMap.put(element, oldCount+1);
    } else {
      myMap.put(element, 1);
    }
  }
  System.out.println("Thread "+id+" has finished.");
}
When each WorkerThread completes its iterations, it will have only a partial result for the files that it processed. Therefore, the HashMaps need to be reduced into a single result set. The main code now looks as follows:

//element contains 1 x "one", 2 x "two", 3 x "three" and 4 x "four"
Collection <String> elements = getElements();
ParIterator<String> pi = ParIteratorFactory.createParIterator(elements, 
threadCount, ParIterator.Schedule.STATIC); // The Reducible object acts like a thread-local, except thread-local values are // reduced at the end. Reducible<HashMap<String,Integer>> localMap = new Reducible<HashMap<String,Integer>>(); // create and start a pool of worker threads ... for (...) { threadPool[i] = new WorkerThread(i, pi, localMap); } // Main thread waits for worker threads to complete ... System.out.println("All worker threads have completed."); // define a custom reduction in the form of 2 elements into 1. Reduction<HashMap<String,Integer>> mapReduction = new Reduction<HashMap<String,Integer>>() { public HashMap<String,Integer> reduce( HashMap<String,Integer> first, HashMap<String,Integer> second) { for (String word : second.keySet()) { if (first.containsKey(word)) first.put(word, first.get(word)+second.get(word)); else first.put(word, second.get(word); } return first; } } }; HashMap<String,Integer> finalMap = localMap.reduce(mapReduction); for (String word : finalMap.keySet()) { System.out.println(word+" occured "+finalMap.get(word)+" times."); }

What happens here is that the Reducible object is shared amongst the threads, just like the Parallel Iterator. At the end, the Reducible contains many partial results, each of which is local to a particular thread (just like a standard Java ThreadLocal). Therefore, this application requires a customised reduction that will accumulate the partial results into one final result: this is what the mapReduction instance does. And just like our first reduction example above, we perform the desired reduction by calling the reduce() method on the Reducible localMap.
    Not only did this example illustrate how easy it is to perform any reduction on any data type (now that's real OOP!), but it also shows how easy it is to "plug in" new reductions without modifying the rest of the code (since reductions are defined in a separate class). The output from this example looks like the following:
Thread 1 got element: three
Thread 0 got element: three
Thread 1 got element: four
Thread 0 got element: two
Thread 1 got element: two
Thread 0 got element: one
Thread 1 got element: three
Thread 0 got element: four
Thread 0 got element: four
Thread 1 got element: four
Thread 1 has finished.
Thread 0 has finished.
All worker threads have completed.
two occured 2 times.
one occured 1 times.
three occured 3 times.
four occured 4 times.

2.4  Early loop termination

An important concept in iterative computation is the break statement. However, in a parallel environment, this break statement must be avoided for the following reasons:
  1. First, from a semantics point of view, what does this mean in a parallel environment? Does it mean that only the current thread wants to cancel, or does it mean that all threads should cancel?
  2. Second, if a thread exits the loop while other threads are still executing iterations, this potentially breaks the program correctness since code following the loop will be executed while some iterations are still being computed! 
  3. Third, if a thread exits the loop without informing the Parallel Iterator, the other threads will be waiting indefinitely at the synchronization barrier since the Parallel Iterator is blocking those threads until that thread finishes (but it has already left the loop without saying goodbye!) 
Therefore, the Parallel Iterator provides a solution that addresses all the above problems. This comes in the form of the localBreak() and globalBreak() methods.

Global break
A global break is the situation where all threads should stop processing the loop. Some examples include when an item has been found in a parallel search, or the user presses the cancel button. This is achieved by one thread calling globalBreak() on the Parallel Iterator. All the threads will then receive a false the next time they call hasNext() and they therefore all return synchronised from this last hasNext() call. Therefore, each thread breaks out of its loop in a controlled manner at an iteration boundary: 
// each thread does this
boolean itemFound = false;
while ( pi.hasNext() ) {
itemFound = searchDocument( pi.next(), searchQuery );
if ( itemFound )
pi.globalBreak();
}
In this example, each thread searches the current iteration (a document) for the search query. If the query has been found, then itemFound is updated to true for the current thread. As a result, the globalBreak() is called and all the other threads will be told to stop. 

Local break
The other type of break is when one of the thread wants to stop, but all the other iterations of the loop should still be executed (by the other threads). An example of this might be to reduce disk contention, for instance if too many threads are accessing memory at the same time. However, just because the number of threads is (potentially) reducing, it is important that the other iterations are still executed. Consider the following example:
while ( pi.hasNext() ) {
processDocument( pi.next() );
if ( tooManyThreads )
pi.localBreak();
}
Clearly, we wish that all the documents are processed. Therefore, when a thread calls localBreak(), it releases all the elements it has reserved for itself so that these elements are processed by the other threads. You are probably wondering, what happens if all the threads call localBreak() - then won't all the elements be released and not executed!?!? No, the localBreak() guarantees that at least one thread will be left to process the remaining elements (even if all called localBreak()). This one thread (the last to call localBreak()) will keep processing all the elements until they complete. Therefore, the attempt to localBreak() will only succeed if the Parallel Iterator guarantees that the elements will be processed by another thread, otherwise the attempt to localBreak() will fail (the localBreak() returns a boolean denoting whether the thread succeeded in breaking or not).

2.5  Exception handling

Exception handling is very important in object-oriented languages, especially Java. Therefore, the Parallel Iterator provides a helper function to support debugging exceptions that occur in a parallel loop. This might be especially tricky to achieve in a parallel loop since multiple threads might each come across an exception. Unlike a sequential loop, a parallel loop might therefore result in multiple exceptions! However, similar to the arguments given in the break section, it is important that a thread never prematurely leaves the parallel loop without informing the Parallel Iterator.
    Therefore, threads must always catch exceptions within scope of the Parallel Iterator (otherwise, the Parallel Iterator will force all the other threads to wait as discussed above). When a thread catches an exception, one of the following behaviours is then possible:
  • Do nothing: the thread catches an exception but decides to ignore it, so it continues to call hasNext() and next() as usual
  • Stop locally: the thread catches an exception and determines only it should stop, so it calls localBreak()  
  • Stop globally: the thread catches an exception and determines that all threads should stop, so it calls globalBreak()  
The register() helper function
In order to handle exceptions in parallel, the programmer would need to manually implement logic to record the exception, the iteration in which it occurred, the thread that encountered it and then notify the other threads. Managing all
this manually is difficult for the programmer. The Parallel Iterator provides a helper function to conveniently record this information. The programmer is only required to catch exceptions (standard procedure as if using a sequential iterator). When an exception is caught, register(Exception) is invoked which will record the following information (storing them within a ParIteratorException):
  • The Exception encountered
  • The Thread that encountered the exception (for potential debugging purposes)
  • The iteration in which the exception occurred (determined using hasNext() as iteration boundary)
Below is example code that shows how the different behaviours may be achieved:
// each thread does this
while ( pi.hasNext() ) {
try {
...
// Exception thrown
...
} catch(FileNotFoundException fe) {
// no thread will stop, just record exception
pi.register(fe);
} catch (TooManyThreadsException te) {
pi.register(te); // disk contention, reduce thread count
pi.localBreak(); // current thread will stop
} catch (DiskFullException de) {
pi.register(de); // full disk, might need to clean up
pi.globalBreak(); // all other iterations should stop
}
}

// finally, when all threads complete
ParIteratorException[] piExceptions = pi.getExceptions();

for (int i = 0; i < piExceptions.length; i++) {
ParIteratorException pie = piExceptions[i];

// object for iteration in which exception was encountered
Object iteration = pie.getIteration();

// thread executing that iteration
Thread thread = pie.getThread();

// actual exception thrown
Exception e = pie.getException();

...

// print exact location of exception
e.printStackTrace();

...
}
This example shows, depending on the exception encountered, the different possible behaviour. In some cases, we only wish to record the exception and then continue processing the other elements. Whereas in other cases, we may wish to either cancel the current thread or all threads. Even if the programmer does not wish to use the register() method provided, they must at least catch the exceptions so that the thread does not terminate before it has a chance to inform the Parallel Iterator (otherwise the other threads will continue waiting for it to complete).