#ifndef THREADPOOL_INCLUDE_GUARD_H #define THREADPOOL_INCLUDE_GUARD_H #include #include #include #include #include #include #include #include // thread pool class ThreadPool { public: ThreadPool(std::size_t n = std::thread::hardware_concurrency()) : _shutdown{false}, _running{0} { // Reserve max nr. of worker space _workers.reserve(n); // Launce `n` workers for (std::size_t i = 0; i < n; ++i) { _workers.emplace_back([this, i]() { // setup infinit loop (continue untill told to shut down or // everything is done) for (;;) { // setup a (skopped) lock to avoid inference std::unique_lock lock(_mtx); // wait untill ether shutdown or work is available // (wait iff `!_shutdown && _jobs.empty()`) _pager.wait(lock, [&]() { return _shutdown || !_jobs.empty(); }); // in case of shutdown terminate the infinit loop after all // jobs have been processes if (_shutdown && _jobs.empty()) { break; // releases the lock } // extract a job from the job queue auto job = _jobs.front(); _jobs.pop(); // increment the running jobs counter (before releasing the // lock as the number of outstanding and running jobs needs // to be precise) _running += 1; // free the lock for other workers lock.unlock(); // execute the job job(); // decrement the running jobs counter _running -= 1; // and report a done job (to everyone listening) _callback.notify_all(); } }); } } ~ThreadPool() { { // set shutdown with a lock to ensure that workers note the change // in setting shutdown! std::lock_guard lock(_mtx); _shutdown = true; } // notify all workers for the change _pager.notify_all(); // finally join the worker threads into the main thread for (auto& thr : _workers) { thr.join(); } } // Add jobs to the job queue template void push(Fun&& job, Args&&... args) { // add a new task to the job queue with a lock { std::unique_lock lock(_mtx); _jobs.push([job, args...]() { job(args...); }); } // notify one waiting worker that there is work to be done _pager.notify_one(); } // wait till all jobs have been processes void wait() { // infinit loop till all threads are idle for (;;) { // guard against job queue retriefel std::unique_lock lock(_mtx); // wait for a callback (done job) to check again but only if there // are any jobs to be performed _callback.wait(lock, [&]() { return _jobs.empty() && !_running; }); if (_jobs.empty() && !_running) { break; } // lock released by end of skope } } // clears the job queue void clear() { // lock the queue std::lock_guard lock(_mtx); // and swap the jobs queue with an empty queue std::queue>().swap(_jobs); } // get number of currently running jobs std::size_t running_jobs() { return _running; } // get number of queued (waiting for execution) jobs std::size_t queued_jobs() { return _jobs.size(); } // get number of worker threads std::size_t workers() { return _workers.size(); } private: bool _shutdown; std::size_t _running; // number of running jobs std::vector _workers; std::queue> _jobs; std::condition_variable _pager; // for wayking idle workers std::condition_variable _callback; // for workers reporting a done job std::mutex _mtx; // mutex base for cond. variables }; #endif /* THREADPOOL_INCLUDE_GUARD_H */