140 lines
4.4 KiB
C
140 lines
4.4 KiB
C
|
#ifndef THREADPOOL_INCLUDE_GUARD_H
|
||
|
#define THREADPOOL_INCLUDE_GUARD_H
|
||
|
|
||
|
#include <iostream>
|
||
|
#include <vector>
|
||
|
#include <queue>
|
||
|
#include <mutex>
|
||
|
#include <atomic>
|
||
|
#include <functional>
|
||
|
#include <condition_variable>
|
||
|
#include <thread>
|
||
|
|
||
|
// thread pool
|
||
|
class ThreadPool {
|
||
|
public:
|
||
|
ThreadPool(std::size_t n = std::thread::hardware_concurrency())
|
||
|
: _shutdown{false}, _running{0}
|
||
|
{
|
||
|
// Reserve max nr. of worker space
|
||
|
_workers.reserve(n);
|
||
|
|
||
|
// Launce `n` workers
|
||
|
for (std::size_t i = 0; i < n; ++i) {
|
||
|
_workers.emplace_back([this, i]() {
|
||
|
// setup infinit loop (continue untill told to shut down or
|
||
|
// everything is done)
|
||
|
for (;;) {
|
||
|
// setup a (skopped) lock to avoid inference
|
||
|
std::unique_lock<std::mutex> lock(_mtx);
|
||
|
// wait untill ether shutdown or work is available
|
||
|
// (wait iff `!_shutdown && _jobs.empty()`)
|
||
|
_pager.wait(lock, [&]() { return _shutdown || !_jobs.empty(); });
|
||
|
|
||
|
// in case of shutdown terminate the infinit loop after all
|
||
|
// jobs have been processes
|
||
|
if (_shutdown && _jobs.empty()) {
|
||
|
break; // releases the lock
|
||
|
}
|
||
|
|
||
|
// extract a job from the job queue
|
||
|
auto job = _jobs.front();
|
||
|
_jobs.pop();
|
||
|
|
||
|
// increment the running jobs counter (before releasing the
|
||
|
// lock as the number of outstanding and running jobs needs
|
||
|
// to be precise)
|
||
|
_running += 1;
|
||
|
|
||
|
// free the lock for other workers
|
||
|
lock.unlock();
|
||
|
|
||
|
// execute the job
|
||
|
job();
|
||
|
|
||
|
// decrement the running jobs counter
|
||
|
_running -= 1;
|
||
|
|
||
|
// and report a done job (to everyone listening)
|
||
|
_callback.notify_all();
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
~ThreadPool() {
|
||
|
{
|
||
|
// set shutdown with a lock to ensure that workers note the change
|
||
|
// in setting shutdown!
|
||
|
std::lock_guard<std::mutex> lock(_mtx);
|
||
|
_shutdown = true;
|
||
|
}
|
||
|
|
||
|
// notify all workers for the change
|
||
|
_pager.notify_all();
|
||
|
|
||
|
// finally join the worker threads into the main thread
|
||
|
for (auto& thr : _workers) { thr.join(); }
|
||
|
}
|
||
|
|
||
|
// Add jobs to the job queue
|
||
|
template <typename Fun, typename ...Args>
|
||
|
void push(Fun&& job, Args&&... args) {
|
||
|
// add a new task to the job queue with a lock
|
||
|
{
|
||
|
std::unique_lock<std::mutex> lock(_mtx);
|
||
|
_jobs.push([job, args...]() { job(args...); });
|
||
|
}
|
||
|
|
||
|
// notify one waiting worker that there is work to be done
|
||
|
_pager.notify_one();
|
||
|
}
|
||
|
|
||
|
// wait till all jobs have been processes
|
||
|
void wait() {
|
||
|
// infinit loop till all threads are idle
|
||
|
for (;;) {
|
||
|
// guard against job queue retriefel
|
||
|
std::unique_lock<std::mutex> lock(_mtx);
|
||
|
|
||
|
// wait for a callback (done job) to check again but only if there
|
||
|
// are any jobs to be performed
|
||
|
_callback.wait(lock, [&]() { return _jobs.empty() && !_running; });
|
||
|
|
||
|
if (_jobs.empty() && !_running) {
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
// lock released by end of skope
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// clears the job queue
|
||
|
void clear() {
|
||
|
// lock the queue
|
||
|
std::lock_guard<std::mutex> lock(_mtx);
|
||
|
|
||
|
// and swap the jobs queue with an empty queue
|
||
|
std::queue<std::function<void()>>().swap(_jobs);
|
||
|
}
|
||
|
|
||
|
// get number of currently running jobs
|
||
|
std::size_t running_jobs() { return _running; }
|
||
|
|
||
|
// get number of queued (waiting for execution) jobs
|
||
|
std::size_t queued_jobs() { return _jobs.size(); }
|
||
|
|
||
|
// get number of worker threads
|
||
|
std::size_t workers() { return _workers.size(); }
|
||
|
|
||
|
private:
|
||
|
bool _shutdown;
|
||
|
std::size_t _running; // number of running jobs
|
||
|
std::vector<std::thread> _workers;
|
||
|
std::queue<std::function<void()>> _jobs;
|
||
|
std::condition_variable _pager; // for wayking idle workers
|
||
|
std::condition_variable _callback; // for workers reporting a done job
|
||
|
std::mutex _mtx; // mutex base for cond. variables
|
||
|
};
|
||
|
|
||
|
#endif /* THREADPOOL_INCLUDE_GUARD_H */
|