forpy  2
ctpl.h
Go to the documentation of this file.
1 // This is an adapted version of the excellent CTPL library
2 // (https://raw.githubusercontent.com/vit-vit/CTPL/master/ctpl_stl.h). The
3 // original copyright notice from Vitaliy Vitsentiy can be found below. There
4 // have been minor changes to the file for the use with forpy.
5 
6 /*********************************************************
7  *
8  * Copyright (C) 2014 by Vitaliy Vitsentiy
9  *
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  *
14  * http://www.apache.org/licenses/LICENSE-2.0
15  *
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  *
22  *********************************************************/
23 
24 #ifndef FORPY_UTIL_THREADING_CTPL_H_
25 #define FORPY_UTIL_THREADING_CTPL_H_
26 
27 #include "../../global.h"
28 
29 #include <atomic>
30 #include <exception>
31 #include <functional>
32 #include <future>
33 #include <memory>
34 #include <mutex>
35 #include <queue>
36 #include <thread>
37 #include <vector>
38 
39 #include "../../types.h"
40 #include "../desk.h"
41 
42 namespace forpy {
43 namespace threading {
44 
45 typedef forpy::Desk *INFOT;
46 
47 namespace detail {
48 template <typename T>
49 class Queue {
50  public:
51  bool push(T const &value) {
52  std::unique_lock<std::mutex> lock(this->mutex);
53  this->q.push(value);
54  return true;
55  }
56  // deletes the retrieved element, do not use for non integral types
57  bool pop(T &v) {
58  std::unique_lock<std::mutex> lock(this->mutex);
59  if (this->q.empty()) return false;
60  v = this->q.front();
61  this->q.pop();
62  return true;
63  }
64  bool empty() {
65  std::unique_lock<std::mutex> lock(this->mutex);
66  return this->q.empty();
67  }
68 
69  private:
70  std::queue<T> q;
71  std::mutex mutex;
72 };
73 } // namespace detail
74 
75 class thread_pool {
76  public:
77  thread_pool() { this->init(); }
78  thread_pool(int nThreads) {
79  this->init();
80  this->resize(nThreads);
81  }
82 
83  // the destructor waits for all the functions in the queue to be finished
84  ~thread_pool() { this->stop(true); }
85 
86  // get the number of running threads in the pool
87  int size() { return static_cast<int>(this->threads.size()); }
88 
89  // number of idle threads
90  int n_idle() { return this->nWaiting; }
91  std::thread &get_thread(int i) { return *this->threads[i]; }
92 
93  // change the number of threads in the pool
94  // should be called from one thread, otherwise be careful to not interleave,
95  // also with this->stop()
96  // nThreads must be >= 0
97  void resize(int nThreads) {
98  if (!this->isStop && !this->isDone) {
99  int oldNThreads = static_cast<int>(this->threads.size());
100  if (oldNThreads <= nThreads) { // if the number of threads is increased
101  this->threads.resize(nThreads);
102  this->flags.resize(nThreads);
103 
104  for (int i = oldNThreads; i < nThreads; ++i) {
105  this->flags[i] = std::make_shared<std::atomic<bool>>(false);
106  this->set_thread(i);
107  }
108  } else { // the number of threads is decreased
109  for (int i = oldNThreads - 1; i >= nThreads; --i) {
110  *this->flags[i] = true; // this thread will finish
111  this->threads[i]->detach();
112  }
113  {
114  // stop the detached threads that were waiting
115  std::unique_lock<std::mutex> lock(this->mutex);
116  this->cv.notify_all();
117  }
118  this->threads.resize(
119  nThreads); // safe to delete because the threads are detached
120  this->flags.resize(nThreads); // safe to delete because the threads
121  // have copies of shared_ptr of the
122  // flags, not originals
123  }
124  }
125  }
126 
127  // empty the queue
128  void clear_queue() {
129  std::function<void(INFOT)> *_f;
130  while (this->q.pop(_f)) delete _f; // empty the queue
131  }
132 
133  // pops a functional wrapper to the original function
134  std::function<void(INFOT)> pop() {
135  std::function<void(INFOT)> *_f = nullptr;
136  this->q.pop(_f);
137  std::unique_ptr<std::function<void(INFOT)>> func(
138  _f); // at return, delete the function even if an exception occurred
139  std::function<void(INFOT)> f;
140  if (_f) f = *_f;
141  return f;
142  }
143 
144  // wait for all computing threads to finish and stop all threads
145  // may be called asynchronously to not pause the calling thread while waiting
146  // if isWait == true, all the functions in the queue are run, otherwise the
147  // queue is cleared without running the functions
148  void stop(bool isWait = false) {
149  if (!isWait) {
150  if (this->isStop) return;
151  this->isStop = true;
152  for (int i = 0, n = this->size(); i < n; ++i) {
153  *this->flags[i] = true; // command the threads to stop
154  }
155  this->clear_queue(); // empty the queue
156  } else {
157  if (this->isDone || this->isStop) return;
158  this->isDone = true; // give the waiting threads a command to finish
159  }
160  {
161  std::unique_lock<std::mutex> lock(this->mutex);
162  this->cv.notify_all(); // stop all waiting threads
163  }
164  for (int i = 0; i < static_cast<int>(this->threads.size());
165  ++i) { // wait for the computing threads to finish
166  if (this->threads[i]->joinable()) this->threads[i]->join();
167  }
168  // if there were no threads in the pool but some functors in the queue, the
169  // functors are not deleted by the threads
170  // therefore delete them here
171  this->clear_queue();
172  this->threads.clear();
173  this->flags.clear();
174  }
175 
177  template <typename C, typename F, typename M, typename... Rest>
178  auto push_move(F &&f, C *c, M &&movable, Rest &&... rest)
179  -> std::future<decltype((c->*f)(new forpy::Desk(0), movable, rest...))> {
180  auto pck = std::make_shared<std::packaged_task<decltype(
181  (c->*f)(new forpy::Desk(0), movable, rest...))(INFOT)>>(
182  std::bind(std::forward<F>(f), std::forward<C *>(c),
183  std::placeholders::_1, std::move(movable),
184  std::forward<Rest>(rest)...));
185  auto _f = new std::function<void(INFOT)>([pck](INFOT s) { (*pck)(s); });
186  this->q.push(_f);
187  std::unique_lock<std::mutex> lock(this->mutex);
188  this->cv.notify_one();
189  return pck->get_future();
190  }
191 
193  template <typename C, typename F, typename... Rest>
194  auto push(F &&f, C *c, Rest &&... rest)
195  -> std::future<decltype((c->*f)(new forpy::Desk(0), rest...))> {
196  auto pck = std::make_shared<std::packaged_task<decltype(
197  (c->*f)(new forpy::Desk(0), rest...))(INFOT)>>(
198  std::bind(std::forward<F>(f), std::forward<C *>(c),
199  std::placeholders::_1, std::forward<Rest>(rest)...));
200  auto _f = new std::function<void(INFOT)>([pck](INFOT s) { (*pck)(s); });
201  this->q.push(_f);
202  std::unique_lock<std::mutex> lock(this->mutex);
203  this->cv.notify_one();
204  return pck->get_future();
205  }
206 
208  template <typename F, typename... Rest>
209  auto push(F &&f, Rest &&... rest)
210  -> std::future<decltype(f(new forpy::Desk(0), rest...))> {
211  auto pck = std::make_shared<
212  std::packaged_task<decltype(f(new forpy::Desk(0), rest...))(INFOT)>>(
213  std::bind(std::forward<F>(f), std::placeholders::_1,
214  std::forward<Rest>(rest)...));
215  auto _f = new std::function<void(INFOT)>([pck](INFOT s) { (*pck)(s); });
216  this->q.push(_f);
217  std::unique_lock<std::mutex> lock(this->mutex);
218  this->cv.notify_one();
219  return pck->get_future();
220  }
221 
223  template <typename F>
224  auto push(F &&f) -> std::future<decltype(f(new forpy::Desk(0)))> {
225  auto pck = std::make_shared<
226  std::packaged_task<decltype(f(new forpy::Desk(0)))(INFOT)>>(
227  std::forward<F>(f));
228  auto _f = new std::function<void(INFOT)>([pck](INFOT s) { (*pck)(s); });
229  this->q.push(_f);
230  std::unique_lock<std::mutex> lock(this->mutex);
231  this->cv.notify_one();
232  return pck->get_future();
233  }
234 
235  void init() {
236  this->nWaiting = 0;
237  this->isStop = false;
238  this->isDone = false;
239  }
240 
241  private:
242  // deleted
243  thread_pool(const thread_pool &) = delete;
244  thread_pool(thread_pool &&) = delete;
245  thread_pool &operator=(const thread_pool &) = delete;
246  thread_pool &operator=(thread_pool &&) = delete;
247 
248  void set_thread(int i) {
249  std::shared_ptr<std::atomic<bool>> flag(
250  this->flags[i]); // a copy of the shared ptr to the flag
251  auto f = [this, i, flag /* a copy of the shared ptr to the flag */]() {
252  VLOG(3) << "Starting up thread " << i << " (id "
253  << std::this_thread::get_id() << ").";
254  forpy::Desk d(i);
255  std::atomic<bool> &_flag = *flag;
256  std::function<void(INFOT)> *_f;
257  bool isPop = this->q.pop(_f);
258  while (true) {
259  while (isPop) { // if there is anything in the queue
260  std::unique_ptr<std::function<void(INFOT)>> func(
261  _f); // at return, delete the function even if an exception
262  // occurred
263  (*_f)(&d);
264  d.reset();
265  if (_flag)
266  return; // the thread is wanted to stop, return even if the queue
267  // is not empty yet
268  else
269  isPop = this->q.pop(_f);
270  }
271  // the queue is empty here, wait for the next command
272  std::unique_lock<std::mutex> lock(this->mutex);
273  ++this->nWaiting;
274  this->cv.wait(lock, [this, &_f, &isPop, &_flag]() {
275  isPop = this->q.pop(_f);
276  return isPop || this->isDone || _flag;
277  });
278  --this->nWaiting;
279  if (!isPop)
280  return; // if the queue is empty and this->isDone == true or *flag
281  // then return
282  }
283  };
284  this->threads[i].reset(
285  new std::thread(f)); // compiler may not support std::make_unique()
286  }
287 
288  std::vector<std::unique_ptr<std::thread>> threads;
289  std::vector<std::shared_ptr<std::atomic<bool>>> flags;
291  std::atomic<bool> isDone;
292  std::atomic<bool> isStop;
293  std::atomic<int> nWaiting; // how many threads are waiting
294 
295  std::mutex mutex;
296  std::condition_variable cv;
297 };
298 } // namespace threading
299 
301  private:
302  inline ThreadControl() : ttp() {
303  VLOG(1) << "Creating thread control (main thread id: "
304  << std::this_thread::get_id() << ").";
305  }
307  std::unique_ptr<threading::thread_pool> ttp;
308 
309  public:
310  inline static ThreadControl &getInstance() {
311  static ThreadControl instance;
312  return instance;
313  }
314 
315  inline void set_num(size_t n) {
316  if (n == 0) n = std::thread::hardware_concurrency();
317  VLOG(1) << "Setting thread pool size to " << n << ".";
318  if (ttp == nullptr) {
319  VLOG(1) << "Initializing thread pool from scratch.";
320  ttp = std::make_unique<threading::thread_pool>(n);
321  } else {
322  if (get_num() != n) {
323  VLOG(1) << "Resizing thread pool.";
324  ttp->resize(n);
325  }
326  }
327  }
328 
329  inline size_t get_num() {
330  if (ttp == nullptr)
331  return 0;
332  else
333  return ttp->size();
334  }
335 
336  inline size_t get_idle() {
337  if (ttp == nullptr)
338  return 0;
339  else
340  return ttp->n_idle();
341  }
342 
344  template <typename C, typename F, typename M, typename... Rest>
345  auto push_move(F &&f, C *c, M &&movable, Rest &&... rest)
346  -> std::future<decltype((c->*f)(new forpy::Desk(0), movable, rest...))> {
347  return ttp->push_move(f, c, movable, rest...);
348  }
349 
350  template <typename C, typename F, typename... Rest>
351  auto push(F &&f, C *c, Rest &&... rest)
352  -> std::future<decltype((c->*f)(new forpy::Desk(0), rest...))> {
353  return ttp->push(f, c, rest...);
354  }
355 
356  template <typename F, typename... Rest>
357  auto push(F &&f, Rest &&... rest)
358  -> std::future<decltype(f(new Desk(0), rest...))> {
359  return ttp->push(f, rest...);
360  }
361 
362  template <typename F>
363  auto push(F &&f) -> std::future<decltype(f(new Desk(0)))> {
364  return ttp->push(f);
365  }
366 
367  inline void stop(const bool &wait = false) {
368  ttp->stop(wait);
369  ttp->init();
370  }
371 
372  inline ~ThreadControl() {
373  VLOG(1) << "Destroying thread control...";
374  ttp.reset();
375  VLOG(1) << "Done.";
376  }
377 };
378 
379 } // namespace forpy
380 
381 #endif // FORPY_UTIL_THREADING_CTPL_H_
void set_thread(int i)
Definition: ctpl.h:248
detail::Queue< std::function< void(INFOT)> * > q
Definition: ctpl.h:290
void set_num(size_t n)
Definition: ctpl.h:315
thread_pool & operator=(const thread_pool &)=delete
auto push_move(F &&f, C *c, M &&movable, Rest &&... rest) -> std::future< decltype((c-> *f)(new forpy::Desk(0), movable, rest...))>
For member functions (with parameters).
Definition: ctpl.h:178
auto push(F &&f, C *c, Rest &&... rest) -> std::future< decltype((c-> *f)(new forpy::Desk(0), rest...))>
Definition: ctpl.h:351
DISALLOW_COPY_AND_ASSIGN(ThreadControl)
auto push(F &&f, Rest &&... rest) -> std::future< decltype(f(new Desk(0), rest...))>
Definition: ctpl.h:357
auto push(F &&f, Rest &&... rest) -> std::future< decltype(f(new forpy::Desk(0), rest...))>
For functions with parameters.
Definition: ctpl.h:209
std::function< void(INFOT)> pop()
Definition: ctpl.h:134
std::atomic< bool > isStop
Definition: ctpl.h:292
void resize(int nThreads)
Definition: ctpl.h:97
auto push(F &&f, C *c, Rest &&... rest) -> std::future< decltype((c-> *f)(new forpy::Desk(0), rest...))>
For member functions (with parameters).
Definition: ctpl.h:194
void stop(const bool &wait=false)
Definition: ctpl.h:367
thread_pool(int nThreads)
Definition: ctpl.h:78
void stop(bool isWait=false)
Definition: ctpl.h:148
size_t get_num()
Definition: ctpl.h:329
std::condition_variable cv
Definition: ctpl.h:296
std::atomic< int > nWaiting
Definition: ctpl.h:293
auto push(F &&f) -> std::future< decltype(f(new forpy::Desk(0)))>
For functions without parameters.
Definition: ctpl.h:224
forpy::Desk * INFOT
Definition: ctpl.h:45
std::vector< std::shared_ptr< std::atomic< bool > > > flags
Definition: ctpl.h:289
std::atomic< bool > isDone
Definition: ctpl.h:291
auto push(F &&f) -> std::future< decltype(f(new Desk(0)))>
Definition: ctpl.h:363
size_t get_idle()
Definition: ctpl.h:336
std::queue< T > q
Definition: ctpl.h:70
std::unique_ptr< threading::thread_pool > ttp
Definition: ctpl.h:307
Main thread desk object.
Definition: desk.h:201
std::thread & get_thread(int i)
Definition: ctpl.h:91
void reset()
Definition: desk.h:222
static ThreadControl & getInstance()
Definition: ctpl.h:310
auto push_move(F &&f, C *c, M &&movable, Rest &&... rest) -> std::future< decltype((c-> *f)(new forpy::Desk(0), movable, rest...))>
For member functions (with parameters).
Definition: ctpl.h:345
bool push(T const &value)
Definition: ctpl.h:51
std::vector< std::unique_ptr< std::thread > > threads
Definition: ctpl.h:288