24 #ifndef FORPY_UTIL_THREADING_CTPL_H_ 25 #define FORPY_UTIL_THREADING_CTPL_H_ 27 #include "../../global.h" 39 #include "../../types.h" 51 bool push(T
const &value) {
52 std::unique_lock<std::mutex> lock(this->
mutex);
58 std::unique_lock<std::mutex> lock(this->
mutex);
59 if (this->
q.empty())
return false;
65 std::unique_lock<std::mutex> lock(this->
mutex);
66 return this->
q.empty();
99 int oldNThreads =
static_cast<int>(this->
threads.size());
100 if (oldNThreads <= nThreads) {
101 this->
threads.resize(nThreads);
102 this->
flags.resize(nThreads);
104 for (
int i = oldNThreads; i < nThreads; ++i) {
105 this->
flags[i] = std::make_shared<std::atomic<bool>>(
false);
109 for (
int i = oldNThreads - 1; i >= nThreads; --i) {
110 *this->
flags[i] =
true;
115 std::unique_lock<std::mutex> lock(this->
mutex);
116 this->
cv.notify_all();
120 this->
flags.resize(nThreads);
129 std::function<void(INFOT)> *_f;
130 while (this->
q.pop(_f))
delete _f;
134 std::function<void(INFOT)>
pop() {
135 std::function<void(INFOT)> *_f =
nullptr;
137 std::unique_ptr<std::function<void(INFOT)>> func(
139 std::function<void(INFOT)> f;
148 void stop(
bool isWait =
false) {
152 for (
int i = 0, n = this->
size(); i < n; ++i) {
153 *this->
flags[i] =
true;
161 std::unique_lock<std::mutex> lock(this->
mutex);
162 this->
cv.notify_all();
164 for (
int i = 0; i < static_cast<int>(this->
threads.size());
177 template <
typename C,
typename F,
typename M,
typename... Rest>
178 auto push_move(F &&f, C *c, M &&movable, Rest &&... rest)
179 -> std::future<decltype((c->*f)(
new forpy::Desk(0), movable, rest...))> {
180 auto pck = std::make_shared<std::packaged_task<decltype(
182 std::bind(std::forward<F>(f), std::forward<C *>(c),
183 std::placeholders::_1, std::move(movable),
184 std::forward<Rest>(rest)...));
185 auto _f =
new std::function<void(INFOT)>([pck](
INFOT s) { (*pck)(s); });
187 std::unique_lock<std::mutex> lock(this->
mutex);
188 this->
cv.notify_one();
189 return pck->get_future();
193 template <
typename C,
typename F,
typename... Rest>
194 auto push(F &&f, C *c, Rest &&... rest)
195 -> std::future<decltype((c->*f)(
new forpy::Desk(0), rest...))> {
196 auto pck = std::make_shared<std::packaged_task<decltype(
198 std::bind(std::forward<F>(f), std::forward<C *>(c),
199 std::placeholders::_1, std::forward<Rest>(rest)...));
200 auto _f =
new std::function<void(INFOT)>([pck](
INFOT s) { (*pck)(s); });
202 std::unique_lock<std::mutex> lock(this->
mutex);
203 this->
cv.notify_one();
204 return pck->get_future();
208 template <
typename F,
typename... Rest>
209 auto push(F &&f, Rest &&... rest)
210 -> std::future<decltype(f(
new forpy::Desk(0), rest...))> {
211 auto pck = std::make_shared<
213 std::bind(std::forward<F>(f), std::placeholders::_1,
214 std::forward<Rest>(rest)...));
215 auto _f =
new std::function<void(INFOT)>([pck](
INFOT s) { (*pck)(s); });
217 std::unique_lock<std::mutex> lock(this->
mutex);
218 this->
cv.notify_one();
219 return pck->get_future();
223 template <
typename F>
224 auto push(F &&f) -> std::future<decltype(f(new forpy::Desk(0)))> {
225 auto pck = std::make_shared<
226 std::packaged_task<decltype(f(new forpy::Desk(0)))(INFOT)>>(
228 auto _f =
new std::function<void(INFOT)>([pck](
INFOT s) { (*pck)(s); });
230 std::unique_lock<std::mutex> lock(this->
mutex);
231 this->
cv.notify_one();
232 return pck->get_future();
249 std::shared_ptr<std::atomic<bool>> flag(
251 auto f = [
this, i, flag ]() {
252 VLOG(3) <<
"Starting up thread " << i <<
" (id " 253 << std::this_thread::get_id() <<
").";
255 std::atomic<bool> &_flag = *flag;
256 std::function<void(INFOT)> *_f;
257 bool isPop = this->
q.pop(_f);
260 std::unique_ptr<std::function<void(INFOT)>> func(
269 isPop = this->
q.pop(_f);
272 std::unique_lock<std::mutex> lock(this->
mutex);
274 this->
cv.wait(lock, [
this, &_f, &isPop, &_flag]() {
275 isPop = this->
q.pop(_f);
276 return isPop || this->
isDone || _flag;
288 std::vector<std::unique_ptr<std::thread>>
threads;
289 std::vector<std::shared_ptr<std::atomic<bool>>>
flags;
296 std::condition_variable
cv;
303 VLOG(1) <<
"Creating thread control (main thread id: " 304 << std::this_thread::get_id() <<
").";
307 std::unique_ptr<threading::thread_pool>
ttp;
316 if (n == 0) n = std::thread::hardware_concurrency();
317 VLOG(1) <<
"Setting thread pool size to " << n <<
".";
318 if (
ttp ==
nullptr) {
319 VLOG(1) <<
"Initializing thread pool from scratch.";
320 ttp = std::make_unique<threading::thread_pool>(n);
323 VLOG(1) <<
"Resizing thread pool.";
340 return ttp->n_idle();
344 template <
typename C,
typename F,
typename M,
typename... Rest>
345 auto push_move(F &&f, C *c, M &&movable, Rest &&... rest)
346 -> std::future<decltype((c->*f)(
new forpy::Desk(0), movable, rest...))> {
347 return ttp->push_move(f, c, movable, rest...);
350 template <
typename C,
typename F,
typename... Rest>
351 auto push(F &&f, C *c, Rest &&... rest)
352 -> std::future<decltype((c->*f)(
new forpy::Desk(0), rest...))> {
353 return ttp->push(f, c, rest...);
356 template <
typename F,
typename... Rest>
357 auto push(F &&f, Rest &&... rest)
358 -> std::future<decltype(f(
new Desk(0), rest...))> {
359 return ttp->push(f, rest...);
362 template <
typename F>
363 auto push(F &&f) -> std::future<decltype(f(new Desk(0)))> {
367 inline void stop(
const bool &wait =
false) {
373 VLOG(1) <<
"Destroying thread control...";
381 #endif // FORPY_UTIL_THREADING_CTPL_H_
detail::Queue< std::function< void(INFOT)> * > q
thread_pool & operator=(const thread_pool &)=delete
auto push_move(F &&f, C *c, M &&movable, Rest &&... rest) -> std::future< decltype((c-> *f)(new forpy::Desk(0), movable, rest...))>
For member functions (with parameters).
auto push(F &&f, C *c, Rest &&... rest) -> std::future< decltype((c-> *f)(new forpy::Desk(0), rest...))>
DISALLOW_COPY_AND_ASSIGN(ThreadControl)
auto push(F &&f, Rest &&... rest) -> std::future< decltype(f(new Desk(0), rest...))>
auto push(F &&f, Rest &&... rest) -> std::future< decltype(f(new forpy::Desk(0), rest...))>
For functions with parameters.
std::function< void(INFOT)> pop()
std::atomic< bool > isStop
void resize(int nThreads)
auto push(F &&f, C *c, Rest &&... rest) -> std::future< decltype((c-> *f)(new forpy::Desk(0), rest...))>
For member functions (with parameters).
void stop(const bool &wait=false)
thread_pool(int nThreads)
void stop(bool isWait=false)
std::condition_variable cv
std::atomic< int > nWaiting
auto push(F &&f) -> std::future< decltype(f(new forpy::Desk(0)))>
For functions without parameters.
std::vector< std::shared_ptr< std::atomic< bool > > > flags
std::atomic< bool > isDone
auto push(F &&f) -> std::future< decltype(f(new Desk(0)))>
std::unique_ptr< threading::thread_pool > ttp
std::thread & get_thread(int i)
static ThreadControl & getInstance()
auto push_move(F &&f, C *c, M &&movable, Rest &&... rest) -> std::future< decltype((c-> *f)(new forpy::Desk(0), movable, rest...))>
For member functions (with parameters).
bool push(T const &value)
std::vector< std::unique_ptr< std::thread > > threads