Advanced Multi-Physics (AMP)
On-Line Documentation
ThreadPool.h
Go to the documentation of this file.
1// Copyright 2004 Mark Berrill. All Rights Reserved. This work is distributed with permission,
2// but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
3// PARTICULAR PURPOSE.
4#ifndef included_AMP_ThreadPool
5#define included_AMP_ThreadPool
6
7#include <atomic>
8#include <condition_variable>
9#include <cstring>
10#include <functional>
11#include <mutex>
12#include <stdexcept>
13#include <string>
14#include <thread>
15#include <typeinfo>
16#include <vector>
17
18#include "AMP/utils/threadpool/ThreadPoolId.h"
19#include "AMP/utils/threadpool/ThreadPoolQueue.h"
20#include "AMP/utils/threadpool/ThreadPoolWorkItem.h"
21
22
23namespace AMP {
24// clang-format off
25
26
48class alignas(16) ThreadPool final
49{
50public:
52 constexpr static uint16_t MAX_THREADS = 128; // The maximum number of threads (must be a multiple of 64)
53
54public:
56
57
60
61
69 template <typename return_type>
71 {
72 public:
74 virtual void run() override = 0;
76 virtual bool has_result() const override final { return !std::is_same<return_type,void>::value; }
78 return_type get_results() const { return d_result; }
80 virtual ~WorkItemRet() {}
81 protected:
82 return_type d_result;
83 protected:
84 inline WorkItemRet() : d_result( return_type() ) { }
85 private:
86 WorkItemRet( const WorkItemRet & ); // Private copy constructor
87 WorkItemRet &operator=( const WorkItemRet & ); // Private assignment operator
88 };
89
90
91public:
93
103 ThreadPool( const int N = 0, const std::string &affinity = "none",
104 const std::vector<int> &procs = std::vector<int>(), int queueSize = 4096 );
105
106
108 ThreadPool( const ThreadPool & ) = delete;
109
110
112 ThreadPool &operator=( const ThreadPool & ) = delete;
113
114
117
118
121
122
125
126
128 static std::vector<int> getProcessAffinity();
129
130
132 static void setProcessAffinity( const std::vector<int>& procs );
133
134
136 static std::vector<int> getThreadAffinity();
137
138
143 std::vector<int> getThreadAffinity( int thread ) const;
144
145
150 static void setThreadAffinity( const std::vector<int>& procs );
151
152
158 void setThreadAffinity( int thread, const std::vector<int>& procs ) const;
159
160
162 inline int getNumThreads() const { return d_N_threads; }
163
164
179 void setNumThreads( const int N, const std::string &affinity = "none",
180 const std::vector<int> &procs = std::vector<int>() );
181
182
190 inline void setMaxWaitTimeDebug( const int time ) { d_max_wait_time = time; }
191
192
198 int getThreadNumber() const;
199
200
202
207 inline bool isValid( const ThreadPoolID &id ) const;
208
209
215 inline bool isFinished( const ThreadPoolID& id ) const { return id.finished(); }
216
217
224 template <class return_type>
225 static inline return_type getFunctionRet( const ThreadPoolID &id );
226
227
234 template<class Ret, class... Args1, class... Args2>
235 static inline WorkItem* createWork( std::function<Ret(Args1...)> routine, std::tuple<Args2...> &&args );
236
237
244 template<class Ret, class... Args1, class... Args2>
245 static inline WorkItem* createWork( Ret( *routine )( Args1... ), std::tuple<Args2...> &&args );
246
247
254 template<class Ret, class... Args1, class... Args2>
255 static inline WorkItem* createWork( std::function<Ret(Args1...)> routine, Args2... args );
256
257
264 template <class Ret, class... Args1, class... Args2>
265 static inline WorkItem* createWork( Ret( *routine )( Args1... ), Args2... args );
266
267
277 inline ThreadPoolID add_work( ThreadPool::WorkItem *work, int priority = 0 );
278
279
289 inline std::vector<ThreadPoolID> add_work( const std::vector<ThreadPool::WorkItem *> &work,
290 const std::vector<int> &priority = std::vector<int>() );
291
292
300 inline void wait( const ThreadPoolID &id ) const;
301
302
311 inline size_t wait_any( const std::vector<ThreadPoolID> &ids ) const;
312
313
321 inline void wait_all( const std::vector<ThreadPoolID> &ids ) const;
322
323
335 inline std::vector<int> wait_some( int N_wait, const std::vector<ThreadPoolID> &ids, int max_wait = 10000000 ) const;
336
337
344 void wait_pool_finished() const;
345
346
356 static bool is_valid( const ThreadPool *tpool );
357
358
373 static void set_OS_warnings( int behavior = 0 );
374
375
377 int N_queued( ) const { return d_queue.size(); }
378
379
381 void setErrorHandler( std::function<void(const std::string&)> fun );
382
383
384public: // Static interface
385
392 static inline int numThreads( const ThreadPool* tpool ) { return tpool ? tpool->getNumThreads() : 0; }
393
403 static inline ThreadPoolID add_work( ThreadPool* tpool, ThreadPool::WorkItem *work, int priority = 0 );
404
405
415 static inline std::vector<ThreadPoolID> add_work( ThreadPool* tpool, const std::vector<ThreadPool::WorkItem *> &work,
416 const std::vector<int> &priority = std::vector<int>() );
417
418
427 static inline void wait_all( const ThreadPool* tpool, const std::vector<ThreadPoolID> &ids );
428
429
437 static inline void wait_pool_finished( const ThreadPool* tpool ) { if ( tpool ) { tpool->wait_pool_finished(); } }
438
439
440private:
442
443 // Implimentation of condition_variable which does not require a lock
445 {
446 public:
449 inline void wait() const { std::unique_lock<std::mutex> lock(d_mutex); d_cv.wait(lock); }
450 inline void wait_for( double seconds ) const
451 {
452 std::unique_lock<std::mutex> lock(d_mutex);
453 if ( seconds < 4e-6 )
454 d_cv.wait_for(lock,std::chrono::nanoseconds(static_cast<int>(1e9*seconds)));
455 else if ( seconds < 4e-3 )
456 d_cv.wait_for(lock,std::chrono::microseconds(static_cast<int>(1e6*seconds)));
457 else if ( seconds < 4 )
458 d_cv.wait_for(lock,std::chrono::milliseconds(static_cast<int>(1e3*seconds)));
459 else
460 d_cv.wait_for(lock,std::chrono::seconds(static_cast<int>(seconds)));
461 }
462 inline void notify_one() const { d_cv.notify_one(); }
463 inline void notify_all() const { d_cv.notify_all(); }
464 private:
465 mutable std::condition_variable d_cv;
466 mutable std::mutex d_mutex;
467 };
468
469
470private:
472
473 // Function to check the startup
475
476 // Function to add an array of work items
478 size_t N, ThreadPool::WorkItem *work[], const int *priority, ThreadPoolID *id );
479
480 // Function to get a work item that has finished
481 static inline WorkItem *getFinishedWorkItem( const ThreadPoolID& id )
482 {
483 return id.finished() ? id.getWork():nullptr;
484 }
485
486 // This function provides a wrapper (needed for the threads)
487 static inline void create_new_thread( ThreadPool *tpool, int id )
488 {
489 tpool->tpool_thread( id );
490 }
491
492 /* This is the function that controls the individual thread and allows it to do work.
493 * Note: this version uses a last in - first out work scheduling.
494 * param thread_init - Structure address containing the startup information for the thread */
495 void tpool_thread( int id );
496
497 // Function to check if the current thread is a member of the thread pool
498 inline bool isMemberThread() const { return getThreadNumber()>=0; }
499
500 // Function to wait for some work items to finish
501 std::vector<bool> wait_some( size_t N_work, const ThreadPoolID *ids, size_t N_wait, int max_wait ) const;
502
503 // Function to wait for N work items to finish (may return early)
504 void wait_N( int N, double time ) const;
505
506 // Check if we are waiting too long and pring debug info
507 void print_wait_warning( ) const;
508
509
510private:
512
513 // Typedefs
514 typedef volatile std::atomic_uint32_t vint32_t; // volatile atomic int
515 typedef volatile std::atomic_uint64_t vint64_t; // volatile atomic int64
516 typedef condition_variable cond_t; // condition variable
517
518 // Internal data
519 uint32_t d_NULL_HEAD; // Null data buffer to check memory bounds
520 volatile mutable bool d_signal_empty; // Do we want to send a signal when the queue is empty
521 uint16_t d_N_threads; // Number of threads
522 int d_max_wait_time; // The maximum time waiting before printing a warning message
523 mutable vint32_t d_signal_count; // Signal count
524 vint32_t d_num_active; // Number of threads that are currently active
525 vint64_t d_id_assign; // An internal variable used to store the current id to assign
526 vint64_t d_active[MAX_THREADS/64]; // Which threads are currently active
527 vint64_t d_cancel[MAX_THREADS/64]; // Which threads should be deleted
528 vint64_t d_N_added; // Number of items added to the work queue
529 vint64_t d_N_started; // Number of items started
530 vint64_t d_N_finished; // Number of items finished
531 mutable cond_t d_wait_finished; // Condition variable to signal when work is finished
532 mutable cond_t d_wait_work; // Condition variable to signal when there is new work
533 ThreadPoolListQueue d_queue; // The work queue
534 std::function<void(const std::string&)> d_errorHandler; // Error handler
535 std::thread *d_thread; // Handles to the threads
536 uint32_t d_NULL_TAIL; // Null data buffer to check memory bounds
537};
538
539
540} // AMP namespace
541
542
543#include "AMP/utils/threadpool/ThreadPool.hpp"
544
545
546// clang-format on
547#endif
This a class to hold the work item id.
Class to store the queue for the ThreadPool using an thread-safe list.
size_t size() const
The number of items that are in the queue.
Base class for the work item (users should derive from ThreadPool::WorkItemRet)
Class to define a work item returning a variable.
Definition ThreadPool.h:71
virtual bool has_result() const override final
Will the routine return a result.
Definition ThreadPool.h:76
WorkItemRet(const WorkItemRet &)
virtual void run() override=0
Run the work item.
return_type get_results() const
Return the results.
Definition ThreadPool.h:78
WorkItemRet & operator=(const WorkItemRet &)
virtual ~WorkItemRet()
Virtual destructor.
Definition ThreadPool.h:80
void wait_for(double seconds) const
Definition ThreadPool.h:450
std::condition_variable d_cv
Definition ThreadPool.h:465
This is a concrete class that provides for a basic thread pool.
Definition ThreadPool.h:49
int N_queued() const
Return the number of items queued.
Definition ThreadPool.h:377
static void wait_all(const ThreadPool *tpool, const std::vector< ThreadPoolID > &ids)
Function to wait until all of the given work items have finished their work.
volatile bool d_signal_empty
Definition ThreadPool.h:520
void setNumThreads(const int N, const std::string &affinity="none", const std::vector< int > &procs=std::vector< int >())
Function to set the number of threads in the thread pool.
static std::vector< int > getThreadAffinity()
Function to return the affinity of the current thread.
vint64_t d_N_started
Definition ThreadPool.h:529
static void wait_pool_finished(const ThreadPool *tpool)
Function to wait until all work items in the thread pool have finished their work.
Definition ThreadPool.h:437
static return_type getFunctionRet(const ThreadPoolID &id)
Function to get the returned function value.
bool isFinished(const ThreadPoolID &id) const
Function to check if the work item has finished processing.
Definition ThreadPool.h:215
static ThreadPoolID add_work(ThreadPool *tpool, ThreadPool::WorkItem *work, int priority=0)
Function to add a work item.
uint16_t d_N_threads
Definition ThreadPool.h:521
static int getNumberOfProcessors()
Function to return the number of processors available.
volatile std::atomic_uint32_t vint32_t
Definition ThreadPool.h:514
vint64_t d_N_added
Definition ThreadPool.h:528
std::thread * d_thread
Definition ThreadPool.h:535
void wait(const ThreadPoolID &id) const
Function to wait until a specific work item has finished.
vint64_t d_cancel[MAX_THREADS/64]
Definition ThreadPool.h:527
void setThreadAffinity(int thread, const std::vector< int > &procs) const
static void create_new_thread(ThreadPool *tpool, int id)
Definition ThreadPool.h:487
static std::vector< int > getProcessAffinity()
Function to return the affinity of the current process.
void add_work(size_t N, ThreadPool::WorkItem *work[], const int *priority, ThreadPoolID *id)
void tpool_thread(int id)
uint32_t d_NULL_HEAD
Definition ThreadPool.h:519
static WorkItem * createWork(std::function< Ret(Args1...)> routine, Args2... args)
Function to create a work item.
ThreadPool(const int N=0, const std::string &affinity="none", const std::vector< int > &procs=std::vector< int >(), int queueSize=4096)
~ThreadPool()
Destructor.
static void setThreadAffinity(const std::vector< int > &procs)
vint32_t d_num_active
Definition ThreadPool.h:524
ThreadPool(const ThreadPool &)=delete
Copy constructor.
static WorkItem * createWork(Ret(*routine)(Args1...), Args2... args)
Function to create a work item.
static std::vector< ThreadPoolID > add_work(ThreadPool *tpool, const std::vector< ThreadPool::WorkItem * > &work, const std::vector< int > &priority=std::vector< int >())
Function to add multiple work items.
static constexpr uint16_t MAX_THREADS
Definition ThreadPool.h:52
uint32_t d_NULL_TAIL
Definition ThreadPool.h:536
ThreadPoolWorkItem WorkItem
Base class for the work item (users should derive from WorkItemRet)
Definition ThreadPool.h:59
size_t wait_any(const std::vector< ThreadPoolID > &ids) const
Function to wait until any of the given work items have finished their work.
void wait_N(int N, double time) const
ThreadPoolListQueue d_queue
Definition ThreadPool.h:533
static int getCurrentProcessor()
Function to return the processor number that the current thread is running on.
void wait_all(const std::vector< ThreadPoolID > &ids) const
Function to wait until all of the given work items have finished their work.
std::vector< int > wait_some(int N_wait, const std::vector< ThreadPoolID > &ids, int max_wait=10000000) const
Function to wait until some of the given work items have finished their work.
bool isMemberThread() const
Definition ThreadPool.h:498
vint64_t d_N_finished
Definition ThreadPool.h:530
condition_variable cond_t
Definition ThreadPool.h:516
ThreadPool & operator=(const ThreadPool &)=delete
Assignment operator.
static void setProcessAffinity(const std::vector< int > &procs)
Function to set the affinity of the current process.
volatile std::atomic_uint64_t vint64_t
Definition ThreadPool.h:515
std::vector< ThreadPoolID > add_work(const std::vector< ThreadPool::WorkItem * > &work, const std::vector< int > &priority=std::vector< int >())
Function to add multiple work items.
ThreadPoolID add_work(ThreadPool::WorkItem *work, int priority=0)
Function to add a work item.
int getThreadNumber() const
Function to return the current thread number.
void setMaxWaitTimeDebug(const int time)
Function to set the maximum wait time.
Definition ThreadPool.h:190
vint64_t d_active[MAX_THREADS/64]
Definition ThreadPool.h:526
cond_t d_wait_finished
Definition ThreadPool.h:531
vint32_t d_signal_count
Definition ThreadPool.h:523
void print_wait_warning() const
std::vector< bool > wait_some(size_t N_work, const ThreadPoolID *ids, size_t N_wait, int max_wait) const
vint64_t d_id_assign
Definition ThreadPool.h:525
void check_startup()
static WorkItem * createWork(std::function< Ret(Args1...)> routine, std::tuple< Args2... > &&args)
Function to create a work item.
bool isValid(const ThreadPoolID &id) const
Function to check if the work item is valid.
std::vector< int > getThreadAffinity(int thread) const
static bool is_valid(const ThreadPool *tpool)
Function to check if the thread pool is valid.
static WorkItem * createWork(Ret(*routine)(Args1...), std::tuple< Args2... > &&args)
Function to create a work item.
void wait_pool_finished() const
Function to wait until all work items in the thread pool have finished their work.
void setErrorHandler(std::function< void(const std::string &)> fun)
Set the error handler for threads.
static void set_OS_warnings(int behavior=0)
Function to enable/disable OS warning messages.
std::function< void(const std::string &)> d_errorHandler
Definition ThreadPool.h:534
int getNumThreads() const
Function to return the number of threads in the thread pool.
Definition ThreadPool.h:162
static WorkItem * getFinishedWorkItem(const ThreadPoolID &id)
Definition ThreadPool.h:481
static int numThreads(const ThreadPool *tpool)
Function to return the number of work threads.
Definition ThreadPool.h:392



Advanced Multi-Physics (AMP)
Oak Ridge National Laboratory
Idaho National Laboratory
Los Alamos National Laboratory
This page automatically produced from the
source code by doxygen
Last updated: Tue Mar 10 2026 13:06:41.
Comments on this page