4#ifndef included_AMP_ThreadPool
5#define included_AMP_ThreadPool
8#include <condition_variable>
18#include "AMP/utils/threadpool/ThreadPoolId.h"
19#include "AMP/utils/threadpool/ThreadPoolQueue.h"
20#include "AMP/utils/threadpool/ThreadPoolWorkItem.h"
69 template <
typename return_type>
74 virtual void run()
override = 0;
76 virtual bool has_result() const override final {
return !std::is_same<return_type,void>::value; }
103 ThreadPool(
const int N = 0,
const std::string &affinity =
"none",
104 const std::vector<int> &procs = std::vector<int>(),
int queueSize = 4096 );
180 const std::vector<int> &procs = std::vector<int>() );
224 template <
class return_type>
234 template<
class Ret,
class... Args1,
class... Args2>
235 static inline WorkItem*
createWork( std::function<Ret(Args1...)> routine, std::tuple<Args2...> &&args );
244 template<
class Ret,
class... Args1,
class... Args2>
254 template<
class Ret,
class... Args1,
class... Args2>
264 template <
class Ret,
class... Args1,
class... Args2>
289 inline std::vector<ThreadPoolID>
add_work(
const std::vector<ThreadPool::WorkItem *> &work,
290 const std::vector<int> &priority = std::vector<int>() );
311 inline size_t wait_any(
const std::vector<ThreadPoolID> &ids )
const;
321 inline void wait_all(
const std::vector<ThreadPoolID> &ids )
const;
335 inline std::vector<int>
wait_some(
int N_wait,
const std::vector<ThreadPoolID> &ids,
int max_wait = 10000000 )
const;
415 static inline std::vector<ThreadPoolID>
add_work(
ThreadPool* tpool,
const std::vector<ThreadPool::WorkItem *> &work,
416 const std::vector<int> &priority = std::vector<int>() );
449 inline void wait()
const { std::unique_lock<std::mutex> lock(
d_mutex);
d_cv.wait(lock); }
452 std::unique_lock<std::mutex> lock(
d_mutex);
453 if ( seconds < 4e-6 )
454 d_cv.wait_for(lock,std::chrono::nanoseconds(
static_cast<int>(1e9*seconds)));
455 else if ( seconds < 4e-3 )
456 d_cv.wait_for(lock,std::chrono::microseconds(
static_cast<int>(1e6*seconds)));
457 else if ( seconds < 4 )
458 d_cv.wait_for(lock,std::chrono::milliseconds(
static_cast<int>(1e3*seconds)));
460 d_cv.wait_for(lock,std::chrono::seconds(
static_cast<int>(seconds)));
465 mutable std::condition_variable
d_cv;
483 return id.finished() ?
id.getWork():
nullptr;
543#include "AMP/utils/threadpool/ThreadPool.hpp"
This a class to hold the work item id.
Class to store the queue for the ThreadPool using an thread-safe list.
size_t size() const
The number of items that are in the queue.
Base class for the work item (users should derive from ThreadPool::WorkItemRet)
Class to define a work item returning a variable.
virtual bool has_result() const override final
Will the routine return a result.
WorkItemRet(const WorkItemRet &)
virtual void run() override=0
Run the work item.
return_type get_results() const
Return the results.
WorkItemRet & operator=(const WorkItemRet &)
virtual ~WorkItemRet()
Virtual destructor.
void wait_for(double seconds) const
std::condition_variable d_cv
This is a concrete class that provides for a basic thread pool.
int N_queued() const
Return the number of items queued.
static void wait_all(const ThreadPool *tpool, const std::vector< ThreadPoolID > &ids)
Function to wait until all of the given work items have finished their work.
volatile bool d_signal_empty
void setNumThreads(const int N, const std::string &affinity="none", const std::vector< int > &procs=std::vector< int >())
Function to set the number of threads in the thread pool.
static std::vector< int > getThreadAffinity()
Function to return the affinity of the current thread.
static void wait_pool_finished(const ThreadPool *tpool)
Function to wait until all work items in the thread pool have finished their work.
static return_type getFunctionRet(const ThreadPoolID &id)
Function to get the returned function value.
bool isFinished(const ThreadPoolID &id) const
Function to check if the work item has finished processing.
static ThreadPoolID add_work(ThreadPool *tpool, ThreadPool::WorkItem *work, int priority=0)
Function to add a work item.
static int getNumberOfProcessors()
Function to return the number of processors available.
volatile std::atomic_uint32_t vint32_t
void wait(const ThreadPoolID &id) const
Function to wait until a specific work item has finished.
vint64_t d_cancel[MAX_THREADS/64]
void setThreadAffinity(int thread, const std::vector< int > &procs) const
static void create_new_thread(ThreadPool *tpool, int id)
static std::vector< int > getProcessAffinity()
Function to return the affinity of the current process.
void add_work(size_t N, ThreadPool::WorkItem *work[], const int *priority, ThreadPoolID *id)
void tpool_thread(int id)
static WorkItem * createWork(std::function< Ret(Args1...)> routine, Args2... args)
Function to create a work item.
ThreadPool(const int N=0, const std::string &affinity="none", const std::vector< int > &procs=std::vector< int >(), int queueSize=4096)
static void setThreadAffinity(const std::vector< int > &procs)
ThreadPool(const ThreadPool &)=delete
Copy constructor.
static WorkItem * createWork(Ret(*routine)(Args1...), Args2... args)
Function to create a work item.
static std::vector< ThreadPoolID > add_work(ThreadPool *tpool, const std::vector< ThreadPool::WorkItem * > &work, const std::vector< int > &priority=std::vector< int >())
Function to add multiple work items.
static constexpr uint16_t MAX_THREADS
ThreadPoolWorkItem WorkItem
Base class for the work item (users should derive from WorkItemRet)
size_t wait_any(const std::vector< ThreadPoolID > &ids) const
Function to wait until any of the given work items have finished their work.
void wait_N(int N, double time) const
ThreadPoolListQueue d_queue
static int getCurrentProcessor()
Function to return the processor number that the current thread is running on.
void wait_all(const std::vector< ThreadPoolID > &ids) const
Function to wait until all of the given work items have finished their work.
std::vector< int > wait_some(int N_wait, const std::vector< ThreadPoolID > &ids, int max_wait=10000000) const
Function to wait until some of the given work items have finished their work.
bool isMemberThread() const
condition_variable cond_t
ThreadPool & operator=(const ThreadPool &)=delete
Assignment operator.
static void setProcessAffinity(const std::vector< int > &procs)
Function to set the affinity of the current process.
volatile std::atomic_uint64_t vint64_t
std::vector< ThreadPoolID > add_work(const std::vector< ThreadPool::WorkItem * > &work, const std::vector< int > &priority=std::vector< int >())
Function to add multiple work items.
ThreadPoolID add_work(ThreadPool::WorkItem *work, int priority=0)
Function to add a work item.
int getThreadNumber() const
Function to return the current thread number.
void setMaxWaitTimeDebug(const int time)
Function to set the maximum wait time.
vint64_t d_active[MAX_THREADS/64]
void print_wait_warning() const
std::vector< bool > wait_some(size_t N_work, const ThreadPoolID *ids, size_t N_wait, int max_wait) const
static WorkItem * createWork(std::function< Ret(Args1...)> routine, std::tuple< Args2... > &&args)
Function to create a work item.
bool isValid(const ThreadPoolID &id) const
Function to check if the work item is valid.
std::vector< int > getThreadAffinity(int thread) const
static bool is_valid(const ThreadPool *tpool)
Function to check if the thread pool is valid.
static WorkItem * createWork(Ret(*routine)(Args1...), std::tuple< Args2... > &&args)
Function to create a work item.
void wait_pool_finished() const
Function to wait until all work items in the thread pool have finished their work.
void setErrorHandler(std::function< void(const std::string &)> fun)
Set the error handler for threads.
static void set_OS_warnings(int behavior=0)
Function to enable/disable OS warning messages.
std::function< void(const std::string &)> d_errorHandler
int getNumThreads() const
Function to return the number of threads in the thread pool.
static WorkItem * getFinishedWorkItem(const ThreadPoolID &id)
static int numThreads(const ThreadPool *tpool)
Function to return the number of work threads.