Advanced Multi-Physics (AMP)
On-Line Documentation
ThreadPoolQueue.h
Go to the documentation of this file.
1// This file contains the template functions for the thread pool queues
2#ifndef included_AMP_ThreadPoolQueue
3#define included_AMP_ThreadPoolQueue
4
5
6#include "AMP/utils/threadpool/AtomicList.h"
7#include "AMP/utils/threadpool/ThreadPoolId.h"
8
9#include <algorithm>
10
11
12namespace AMP {
13
14
17{
18public:
21
23 explicit ThreadPoolListQueue( size_t N ) : d_queue_list( N ) {}
24
27
30
32 inline bool empty() const { return d_queue_list.empty(); }
33
35 inline size_t capacity() const { return d_queue_list.capacity(); }
36
38 inline size_t size() const { return d_queue_list.size(); }
39
42 {
43 return d_queue_list.remove( []( const ThreadPoolID &id ) { return id.ready(); } );
44 }
45
47 inline void insert( size_t N, const ThreadPoolID *ids )
48 {
49 // Add the work items in reverse order for efficiency, queue will maintain order)
50 for ( int64_t i = N - 1; i >= 0; i-- )
51 d_queue_list.insert( ids[i] );
52 }
53
55 inline void changePriorities( const std::vector<std::pair<uint64_t, int8_t>> &list )
56 {
57 auto compare = []( const ThreadPoolID &a, const uint64_t &b ) {
58 return a.getLocalID() == b;
59 };
60 for ( auto item : list ) {
61 // Remove and add the id back with a higher priority
62 auto tmp = d_queue_list.remove( compare, item.first );
63 if ( tmp.isNull() )
64 continue;
65 tmp.setPriority( std::max( item.second, tmp.getPriority() ) );
66 d_queue_list.insert( tmp );
67 }
68 }
69
70private:
72 queue_type d_queue_list; // The work queue
73};
74
75
78{
79public:
82
84 explicit ThreadPoolHeapQueue( size_t N )
85 : d_lock( 0 ), d_Nc( N ), d_Nh( 0 ), d_Nb( 0 ), d_ids( new ThreadPoolID[N] )
86 {
87 }
88
91
94
96 inline size_t capacity() const { return d_Nc; }
97
99 inline size_t size() const { return d_Nh + d_Nb; }
100
102 inline bool empty() const { return size() == 0; }
103
106 {
107 lock();
108 checkBlocked();
109 ThreadPoolID id;
110 if ( d_Nh > 0 ) {
111 std::pop_heap( const_cast<ThreadPoolID *>( d_ids ),
112 const_cast<ThreadPoolID *>( d_ids + d_Nh ) );
113 std::swap( id, const_cast<ThreadPoolID &>( d_ids[--const_cast<size_t &>( d_Nh )] ) );
114 }
115 unlock();
116 return id;
117 }
118
120 inline void insert( size_t N, const ThreadPoolID *ids )
121 {
122 lock();
123 auto Nh2 = d_Nh;
124 for ( size_t i = 0; i < N; i++ ) {
125 if ( ids[i].ready() ) {
126 d_ids[Nh2++] = ids[i];
127 } else {
128 const_cast<size_t &>( d_Nb )++;
129 d_ids[d_Nc - d_Nb] = ids[i];
130 if ( d_ids[d_Nc - d_Nb] > d_ids[d_Nc - 1] )
131 std::swap( d_ids[d_Nc - d_Nb], d_ids[d_Nc - 1] );
132 }
133 }
134 if ( Nh2 - d_Nh > 3 ) {
135 std::make_heap( d_ids, d_ids + d_Nh );
136 } else {
137 for ( size_t i = d_Nh + 1; i <= Nh2; i++ )
138 std::push_heap( d_ids, d_ids + i );
139 }
140 d_Nh = Nh2;
141 unlock();
142 }
143
145 inline void changePriorities( const std::vector<std::pair<uint64_t, int8_t>> &list )
146 {
147 if ( list.empty() )
148 return;
149 lock();
150 // Search the items in the heap
151 bool changed = false;
152 for ( size_t i = 0; i < d_Nh; i++ ) {
153 auto id = const_cast<ThreadPoolID &>( d_ids[i] );
154 auto id2 = id.getLocalID();
155 for ( size_t j = 0; j < list.size(); j++ ) {
156 if ( list[j].first == id2 ) {
157 id.setPriority( std::max( list[j].second, id.getPriority() ) );
158 changed = true;
159 }
160 }
161 }
162 if ( changed )
163 std::make_heap( d_ids, d_ids + d_Nh );
164 // Search the items in the blocked list
165 for ( int64_t i = d_Nc - 1; i >= static_cast<int64_t>( d_Nc - d_Nb ); i-- ) {
166 auto id = const_cast<ThreadPoolID &>( d_ids[i] );
167 auto id2 = id.getLocalID();
168 for ( size_t j = 0; j < list.size(); j++ ) {
169 if ( list[j].first == id2 ) {
170 id.setPriority( std::max( list[j].second, id.getPriority() ) );
171 if ( d_ids[i] > d_ids[d_Nc - 1] )
172 std::swap( d_ids[i], d_ids[d_Nc - 1] );
173 }
174 }
175 }
176 unlock();
177 }
178
179private:
180 inline void lock()
181 {
182 int expected = 0;
183 while ( !d_lock.compare_exchange_weak( expected, 1 ) ) {
184 expected = 0;
185 }
186 }
187 inline void unlock()
188 {
189 int expected = 1;
190 d_lock.compare_exchange_weak( expected, 0 );
191 }
192 inline void checkBlocked()
193 {
194 if ( d_Nb == 0 )
195 return;
196 bool test = d_Nh == 0 || d_ids[d_Nc - 1] > d_ids[0];
197 if ( test ) {
198 for ( size_t i = d_Nc - d_Nb; i < d_Nc; i++ ) {
199 if ( const_cast<ThreadPoolID &>( d_ids[i] ).ready() ) {
200 std::swap( d_ids[const_cast<size_t &>( d_Nh )++], d_ids[i] );
201 std::push_heap( d_ids, d_ids + d_Nh );
202 if ( i > d_Nc - d_Nb )
203 std::swap( d_ids[d_Nc - d_Nb], d_ids[i] );
204 const_cast<size_t &>( d_Nb )--;
205 }
206 }
207 }
208 }
209
210private:
211 volatile std::atomic_int32_t d_lock;
212 const size_t d_Nc;
213 volatile size_t d_Nh;
214 volatile size_t d_Nb;
216};
217
218
219} // namespace AMP
220
221#endif
Maintain a sorted list of entries.
Definition AtomicList.h:20
size_t size() const
Return the size of the list.
Definition AtomicList.h:59
TYPE remove(Compare compare, const Args &...args)
Remove an item from the list.
void insert(const TYPE &x)
Insert an item.
constexpr size_t capacity() const
Return the capacity of the list.
Definition AtomicList.h:81
bool empty() const
Check if the list is empty.
Definition AtomicList.h:65
Class to store the queue for the ThreadPool using a binary heap.
ThreadPoolHeapQueue()=delete
Empty constructor.
ThreadPoolHeapQueue(const ThreadPoolHeapQueue &)=delete
Copy constructor.
void insert(size_t N, const ThreadPoolID *ids)
Add the given items to the queue.
bool empty() const
Check if the queue is empty.
size_t capacity() const
The number of items that can be in the queue.
volatile std::atomic_int32_t d_lock
ThreadPoolHeapQueue(size_t N)
Default constructor.
void changePriorities(const std::vector< std::pair< uint64_t, int8_t > > &list)
Change the prioirties of items in the queue.
ThreadPoolID pop()
Get the next item to process.
size_t size() const
The number of items that are in the queue.
ThreadPoolHeapQueue & operator=(const ThreadPoolHeapQueue &)=delete
Asignment operator.
volatile ThreadPoolID * d_ids
This a class to hold the work item id.
uint64_t getLocalID() const
void setPriority(int8_t priority)
Class to store the queue for the ThreadPool using an thread-safe list.
ThreadPoolListQueue & operator=(const ThreadPoolListQueue &)=delete
Asignment operator.
void insert(size_t N, const ThreadPoolID *ids)
Add the given items to the queue.
ThreadPoolListQueue(size_t N)
Default constructor.
void changePriorities(const std::vector< std::pair< uint64_t, int8_t > > &list)
Change the prioirties of items in the queue.
size_t size() const
The number of items that are in the queue.
size_t capacity() const
The number of items that can be in the queue.
ThreadPoolID pop()
Get the next item to process.
AtomicList< ThreadPoolID, std::greater< ThreadPoolID > > queue_type
ThreadPoolListQueue(const ThreadPoolListQueue &)=delete
Copy constructor.
ThreadPoolListQueue()=delete
Empty constructor.
bool empty() const
Check if the queue is empty.
decltype(void(T{ std::declval< Args >()... }), std::true_type()) test(int)



Advanced Multi-Physics (AMP)
Oak Ridge National Laboratory
Idaho National Laboratory
Los Alamos National Laboratory
This page automatically produced from the
source code by doxygen
Last updated: Tue Mar 10 2026 13:06:41.
Comments on this page