/************************************************************************ MODULE: BasicThreadPool SUMMARY: A simple thread pool class BasicThreadPool, as well as some higher-level macros which facilitite simple parallel for loops. ***************************************************************************/ // ********************** Simple parallel for loops ************************** // // We begin with a description of the higher-level macros for writing simple // parallel for loops. These facilitaties are activated only when NTL is // configured with NTL_THREAD_BOOST=on (which implies NTL_THREADS=on). // However, code that uses these facilties should still compile and run // correctly even when NTL_THREAD_BOOST=off, or even when NTL_THREADS=off, so // this is the simplest way to write parallel for loops across a range of // compile-time and run-time environments. Note that if NTL_THREADS=on, C++11 // features are reqired, but when NTL_THREADS=off, these features are not // required, so the code should compile on older C++ compilers. // // Here is a simple recipe for writing parallel for loop. // // At the start of program execution, your program should execute SetNumThreads(nt); // You can choose nt to be any positive integer, but for best results, it // should correspond to the number of available cores on your machine. // [NOTE: if NTL_THREAD_BOOST=off, this function is still defined, but does // nothing.] // // Now consider the following routine: void mul(ZZ *x, const ZZ *a, const ZZ *b, long n) { for (long i = 0; i < n; i++) mul(x[i], a[i], b[i]); } // We can parallelize it as follows: void mul(ZZ *x, const ZZ *a, const ZZ *b, long n) { NTL_EXEC_RANGE(n, first, last) for (long i = first; i < last; i++) mul(x[i], a[i], b[i]); NTL_EXEC_RANGE_END } // NTL_EXEC_RANGE and NTL_EXEC_RANGE_END are macros that just "do the right // thing". If there are nt threads available, the interval [0..n) will be // partitioned into (up to) nt subintervals, and a different thread will be // used to process each subinterval. You still have to write the for loop // yourself: the macro just declares and initializes variables "first" and // "last" (or whatever you want to call them) of type long that represent the // subinterval [first..last) to be processed by one thread. // // Note that the current thread participates as one of the nt available // threads, and that the current thread will wait for all other participating threads // to finish their task before proceeding. The current thread can be identified // as the one with first == 0. // // Withing the "body" of this construct, you can freely reference any variables // that are visible at this point. This is implemented using the C++ lambda // feature (capturing all variables by reference). // // This construct will still work even if threads are disabled, in which case // it runs single-threaded with first=0 and last=n. // // Note that the code within the EXEC_RANGE body could call other routines that // themselves attempt to execute an EXEC_RANGE: if this happens, the latter // EXEC_RANGE will detect this and run single-threaded. // // You may wish to do other things within the EXEC_RANGE body than just execute // a loop. One thing you may want to do is to declare variables. Another // thing you may want to do is setup a local context for a ZZ_p modulus (or // other type of modulus). Here is an example of doing this: void mul(ZZ_p *x, const ZZ_p *a, const ZZ_p *b, long n) { ZZ_pContext context; context.save(); NTL_EXEC_RANGE(n, first, last) context.restore(); for (long i = first; i < last; i++) mul(x[i], a[i], b[i]); NTL_EXEC_RANGE_END } // Another useful function is AvailableThreads(), which will return the number // of available threads. If threads or thread boosting is not enabled, this // will return 1. Even if thread boosting is enabled, this may return 1 if for // whatever reason, the thread pool is not available for use (for example, // SetNumThreads was never called, or the thread pool is already active). // // A lower-level set of tools is available, which allow you to simply run a // specified number of threads. Assuming nt <= AvailableThreads(), the code NTL_EXEC_INDEX(nt, index) ... code ... NTL_EXEC_INDEX_END // will execute the body on nt different threads, each with a unique index in // the range [0..nt). A variable named "index" (or whatever name you specify) // of type long will hold the given index. Just as with EXEC_RANGE, the current // thread will participate as one of the nt threads, and will always be // assigned an index of 0. // // This tool is useful if you need to manage memory a bit more carefully. For // example, the following code will compute an inner product using all // available threads: ZZ InnerProd(const ZZ *a, const ZZ *b, long n) { PartitionInfo pinfo(n); long cnt = pinfo.NumIntervals(); Vec<ZZ> acc; acc.SetLength(cnt); NTL_EXEC_INDEX(cnt, index) long first, last; pinfo.interval(first, last, index); ZZ& sum = acc[index]; sum = 0; for (long i = first; i < last; i++) MulAddTo(sum, a[i], b[i]); NTL_EXEC_INDEX_END ZZ sum; sum = 0; for (long i = 0; i < cnt; i++) sum += acc[i]; return sum; } // This example also illustrates the class PartitionInfo, which is useful for // partitioning a large interval into smaller intervals (it is used internally // by EXEC_RANGE). The constructor takes a single argument (in this example n) // and computes a partition of [0..n) into nearly equally sized subintervals. // The method NumIntervals() returns the number of subintervals, and the method // interval(first, last, index) sets first and last according to the endpoints // of the subinterval [first..last) with the given index. // // So in this example, cnt threads will run, each accumulating a sum into a // corresponding element of the vector acc, and afterwords, these elements are // summed. // // Note that if threads are not enabled or otherwise unavailable, the above // code will compile and run correctly (just using one thread). // // Finally, there is a "guarded" version of NTL_EXEC_RANGE called // NTL_GEXEC_RANGE. This allows one to dynamically "guard" against parallel // execution. For example, on very small problems the runtime overhead of a // parallel for loop may not be worthwhile, or in other situations parallel // execution could cause incorrect behavior. See below for details. // ********************** Other useful patterns ************************* // // You can use these tools together with some other elements of the // C++11 standard library to implement some other useful patterns. // // ****** Accumulation: // // Consider again the example above of computing an inner product. // This can be implemented more easily as follows: ZZ InnerProd(const ZZ *a, const ZZ *b, long n) { ZZ sum; sum = 0; std::mutex sum_mutex; NTL_EXEC_RANGE(n, first, last) ZZ acc; acc = 0; for (long i = first; i < last; i++) MulAddTo(acc, a[i], b[i]); std::lock_guard<std::mutex> guard(sum_mutex); sum += acc; NTL_EXEC_RANGE_END return sum; } // There is some extra run-time overead, but in many cases this is negligible. // Also, unlike the code above for computing an inner product, this code will // require C++11. // ****** dynamic scheduling // // Suppose you need to perform parallel tasks i = 0..n-1, but the tasks may // vary greatly in their run time. A more efficient approach than a simple // NTL_EXEC_RANGE is as follows: long nt = AvailableThreads(); std::atomic<long> counter(n); NTL_EXEC_INDEX(nt, index) long i; while ((i = --counter) >= 0) { // perform task i ... } NTL_EXEC_INDEX_END // Each time the body of the loop is executed, a different task i = 0..n-1 is // performed. // ********************** Simple parallel divide and conquer ************ // // Some tools are provided for parallelizing simple divide and conquer // algorithms. The interface for this set of tools is still experimental // and subject to change (but hopefully not). // // Suppose you have a recursive divide and conquer algorithm: void rec_alg(...) { ... // two recursive invocations that can run in parallel rec_alg(...call 0...); rec_alg(...call 1...); ... } // and this algorithm is invoked initially as void alg(...) { rec_alg(...initial call...); } // Then the following recipe will paralelize it: void rec_alg(..., RecursiveThreadPool *pool) { ... NTL_EXEC_DIVIDE(seq, pool, helper, load, rec_alg(...call 0..., helper.subpool(0)), rec_alg(...call 1..., helper.subpool(1)) ) ... } void alg(...) { rec_alg(...initial call..., NTL_INIT_DIVIDE); } // Here, seq is a boolean "guard" value: if true, the two recursive calls will // run sequentially, as usual. Also, load is a floating point number, which // represents the fraction of threads that should be assigned to call 0. // If the load should be equally balanced, then load=0.5 is a good choice. // The name helper is the name of an auxilliary object, which will be in the // scope of the two recursive calls. It supplies a method helper.concurrent(), // which returns true if the two calls are being run concurrently, and false // otherwise. // If thread-boosting is not enabled, these macros will revert to the normal // sequential execution, with no overhead. If thread-boosting is enabled, then // the two calls may or may not run concurrently, depending on a number of // factors. If they do run concurrently, call 0 will run on the current // thread, while call 1 will run on another thread; in addition, when the // current thread finishes execution of call 0, it will wait for the execution // of call 1 on the other thread to finish before continuing any further. // As one can see, the use cases here are fairly limited to those in which you // have a recursive algorithm that calls itself exactly twice. One example of // this is QuickSort. Another example is a recursive FFT. // ************************** Thread Pools ****************************** // // The above facilities are built on top of a more general thread pool class, // which you may use for your own purposes. // // You create a thread pool by constructing a BasicThreadPool object. For // example: long nthreads = 4; BasicThreadPool pool(nthreads); // creates a thread pool of 4 threads. These threads will exist until the // destructor for pool is called. // // The simplest way to use a thread pools is as follows. Suppose you have a // task that consists of sz subtasks, indexed 0..sz-1. Then you can write: pool.exec_range(sz, [&](long first, long last) { for (long i = first; i < last; i++) { ... code to process subtask i ... } } ); // The second argument to exec_range is a C++11 "lambda". The "[&]" indicates // that all local variables in the calling context are captured by reference, // so the lambda body can reference all visible local variables directly. // C++11 provides other methods for capturing local variables. The interval // [0..sz) is partitioned into subintervals of the form [first..last), which // are processed by the code in the supplied lambda. // // A lower-level interface is also provided. One can write: pool.exec_index(cnt, [&](long index) { ... code to process index i ... } ); // This will activate exactly cnt threads with indices 0..cnt-1, and execute // the given code on each index. The parameter cnt must not exceed nthreads, // otherwise an error is raised. // ==================================================================== // // NOTES: // // When one activates a thread pool with nthreads threads, the *current* thread // (the one activating the pool) will also participate in the computation. // This means that the thread pool only contains nthreads-1 other threads. // // If, during an activation, any thread throws an exception, it will be caught // and rethrown in the activating thread when all the threads complete. If // more than one thread throws an exception, the first one that is caught is // the one that is rethrown. // // Methods are also provided for adding, deleting, and moving threads in and // among thread pools. // // If NTL_THREADS=off, the corresponding header file may be included, but the // BasicThreadPool class is not defined. // // Unlike most classes in NTL, the BasicThreadPool is not relocatable and hence // cannot be used in a Vec. One should first wrap it in a pointer class, such // as UniquePtr. // class BasicThreadPool: provided basic functionality for thread pools class BasicThreadPool { private: BasicThreadPool(const BasicThreadPool&); // disabled void operator=(const BasicThreadPool&); // disabled public: explicit BasicThreadPool(long nthreads); // creates a pool with nthreads threads, including the current thread // (so nthreads-1 other threads get created) template<class Fct> void exec_range(long sz, const Fct& fct); // activate by range (see example usage above) template<class Fct> void exec_index(long cnt, const Fct& fct); // activate by index (see example usage above) void add(long n = 1); // add n threads to the pool long NumThreads() const; // return number of threads (including current thread) void remove(long n = 1); // remove n threads from the pool void move(BasicThreadPool& other, long n = 1) // move n threads from other pool to this pool bool active() const; // indicates an activation is in process: invoking any of the methods // exec_index, exec_range, add, remove, move, or the destructor // whie active will raise an error template<class Fct> static void relaxed_exec_range(BasicThreadPool *pool, long sz, const Fct& fct); // similar to pool->exec_range(sz, fct), but will still work even // if !pool or pool->active(), using just the current thread template<class Fct> static void relaxed_exec_index(BasicThreadPool *pool, long cnt, const Fct& fct); // similar to pool->exec_index(cnt, fct), but will still work even // if !pool or pool->active(), provided cnt <= 1, using just the current thread }; // THREAD BOOSTING FEATURES: void SetNumThreads(long nt); // convenience routine to set NTL's thread pool. // If called more than once, the old thread pool is destroyed and // replaced by a new one. // If NTL_THREAD_BOOST=off, then this is still defined, but does nothing. long AvailableThreads(); // Number of threads currently availble to use in NTL's thread pool. This is // always at least 1 (for the current thread). // If NTL_THREAD_BOOST=off, then this is still defined, and always returns 1. BasicThreadPool *GetThreadPool(); void ResetThreadPool(BasicThreadPool *pool = 0); BasicThreadPool *ReleaseThreadPool(); // Routines to get and set NTL's thread pool. The interfaces parallel NTL's // UniquePtr class, and indeed, behind the scenes, NTL's thread pool is stored // as a UniquePtr<BasicThreadPool>. // These are only declared when NTL_THREAD_BOOST=on. #define NTL_EXEC_RANGE(sz, first, last) ... #define NTL_EXEC_RANGE_END ... #define NTL_EXEC_INDEX(cnt, index) ... #define NTL_EXEC_INDEX_END ... // convenience macros to implement "parallel for loops" using NTL's thread // pool. See examples above for usage. If NTL_THREAD_BOOST=off, then these // are still defined, and code will run on a single thread #define NTL_GEXEC_RANGE(seq, sz, first, last) ... #define NTL_GEXEC_RANGE_END ... // "guarded" version of NTL_EXEC_RANGE: if seq evaluates to true, the code runs // on a single thread. This is useful in avoiding situations where the // overhead of a parallel loop is too high. If seq evaluates to the constant // true, a good compiler will optimize code to run on a single thread, with no // overhead. #define NTL_IMPORT(x) // To be used in conjunction with NTL_EXEC_RANGE and friends. When // NTL_THREAD_BOOST=on, this will copy the variable named x from the enclosing // scope to a local copy. This should only be used for types with cheap // copies, such as scalars and pointers. In some situations, this allows the // compiler to optimize a bit more aggressively. One or more of these may be // placed right after an NTL_EXEC_RANGE. // When NTL_THREAD_BOOST=off, this is still defined, and does nothing. // class PartitionInfo: A helper class to facilitate partitioning an interval // into subintervals. NOTE: this class is available, even when // NTL_THREAD_BOOST=off. class PartitionInfo { public: explicit PartitionInfo(long sz, long nt = AvailableThreads()); // partitions [0..sz) into at most nt subintervals. sz may be 0 or // negative, in which case the number of subintervals is 0. long NumIntervals() const; // return the number of subintervals void interval(long& first, long& last, long i) const; // [first..last) is the ith interval, where i in [0..NumInvervals()). No // range checking is performed. };