Alexandria 2.25.0
SDC-CH common library for the Euclid project
ThreadPool.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2012-2021 Euclid Science Ground Segment
3 *
4 * This library is free software; you can redistribute it and/or modify it under
5 * the terms of the GNU Lesser General Public License as published by the Free
6 * Software Foundation; either version 3.0 of the License, or (at your option)
7 * any later version.
8 *
9 * This library is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11 * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12 * details.
13 *
14 * You should have received a copy of the GNU Lesser General Public License
15 * along with this library; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
27#include <algorithm>
28#include <numeric>
29
30namespace Euclid {
31
32namespace {
33
34class Worker {
35
36public:
37 Worker(std::mutex& queue_mutex, std::deque<ThreadPool::Task>& queue, std::atomic<bool>& run_flag,
38 std::atomic<bool>& sleeping_flag, std::atomic<bool>& done_flag, unsigned int empty_queue_wait_time,
39 std::exception_ptr& exception_ptr)
40 : m_queue_mutex(queue_mutex)
41 , m_queue(queue)
42 , m_run_flag(run_flag)
43 , m_sleeping_flag(sleeping_flag)
44 , m_done_flag(done_flag)
45 , m_empty_queue_wait_time(empty_queue_wait_time)
46 , m_exception_ptr(exception_ptr) {}
47
48 void operator()() {
49 while (m_run_flag.get() && m_exception_ptr == nullptr) {
50 // Check if there is anything it the queue to be done and get it
51 std::unique_ptr<ThreadPool::Task> task_ptr = nullptr;
53 if (!m_queue.get().empty()) {
54 task_ptr = Euclid::make_unique<ThreadPool::Task>(m_queue.get().front());
55 m_queue.get().pop_front();
56 }
57 lock.unlock();
58
59 // If we have some work to do, do it. Otherwise sleep for some time.
60 if (task_ptr) {
61 try {
62 (*task_ptr)();
63 } catch (...) {
65 }
66 } else {
67 m_sleeping_flag.get() = true;
69 m_sleeping_flag.get() = false;
70 }
71 }
72 // Indicate that the worker is done
73 m_sleeping_flag.get() = true;
74 m_done_flag.get() = true;
75 m_run_flag.get() = false;
76 }
77
78private:
86};
87
88} // end of anonymous namespace
89
90ThreadPool::ThreadPool(unsigned int thread_count, unsigned int empty_queue_wait_time)
91 : m_worker_run_flags(thread_count)
92 , m_worker_sleeping_flags(thread_count)
93 , m_worker_done_flags(thread_count)
94 , m_empty_queue_wait_time(empty_queue_wait_time) {
95 for (unsigned int i = 0; i < thread_count; ++i) {
96 m_worker_run_flags.at(i) = true;
97 m_worker_sleeping_flags.at(i) = false;
98 m_worker_done_flags.at(i) = false;
101 }
102}
103
104namespace {
105
106void waitWorkers(std::vector<std::atomic<bool>>& worker_flags, unsigned int wait_time) {
107 // Now wait until all the workers have finish any current tasks
108 for (auto& flag : worker_flags) {
109 while (!flag) {
111 }
112 }
113}
114
115} // namespace
116
118 if (m_exception_ptr) {
119 if (rethrow) {
121 } else {
122 return true;
123 }
124 }
125 return false;
126}
127
128size_t ThreadPool::queued() const {
130 return m_queue.size();
131}
132
133size_t ThreadPool::running() const {
136 return m_worker_sleeping_flags.size() - sleeping;
137}
138
141 return m_worker_done_flags.size() - done;
142}
143
145 // Wait for the queue to be empty
146 bool queue_is_empty = false;
147 while (!queue_is_empty && m_exception_ptr == nullptr) {
149 queue_is_empty = m_queue.empty();
150 lock.unlock();
151 if (!queue_is_empty) {
153 }
154 }
155 // Wait for the workers to finish the currently executing tasks
157 // Check if any worker finished with an exception
158 checkForException(true);
159}
160
162 // Stop all the workers. They will stop right after they finish the task
163 // they already run.
165 // Now wait until all the workers have finish any current tasks
167 for (auto& worker : m_workers) {
168 worker.join();
169 }
170}
171
175 task();
176 } else {
177 m_queue.emplace_back(std::move(task));
178 }
179}
180
181} // namespace Euclid
std::reference_wrapper< std::mutex > m_queue_mutex
Definition: ThreadPool.cpp:79
std::reference_wrapper< std::atomic< bool > > m_run_flag
Definition: ThreadPool.cpp:81
std::reference_wrapper< std::atomic< bool > > m_sleeping_flag
Definition: ThreadPool.cpp:82
std::reference_wrapper< std::deque< ThreadPool::Task > > m_queue
Definition: ThreadPool.cpp:80
std::reference_wrapper< std::atomic< bool > > m_done_flag
Definition: ThreadPool.cpp:83
unsigned int m_empty_queue_wait_time
Definition: ThreadPool.cpp:84
std::reference_wrapper< std::exception_ptr > m_exception_ptr
Definition: ThreadPool.cpp:85
T accumulate(T... args)
T at(T... args)
T begin(T... args)
void submit(Task task)
Submit a task to be executed.
Definition: ThreadPool.cpp:172
std::deque< Task > m_queue
Definition: ThreadPool.h:114
size_t running() const
Return the number of running tasks.
Definition: ThreadPool.cpp:133
std::vector< std::atomic< bool > > m_worker_sleeping_flags
Definition: ThreadPool.h:111
size_t queued() const
Return the number of queued tasks.
Definition: ThreadPool.cpp:128
unsigned int m_empty_queue_wait_time
Definition: ThreadPool.h:115
std::vector< std::thread > m_workers
Definition: ThreadPool.h:113
std::mutex m_queue_mutex
Definition: ThreadPool.h:109
std::vector< std::atomic< bool > > m_worker_run_flags
Definition: ThreadPool.h:110
std::vector< std::atomic< bool > > m_worker_done_flags
Definition: ThreadPool.h:112
bool checkForException(bool rethrow=false)
Checks if any task has thrown an exception and optionally rethrows it.
Definition: ThreadPool.cpp:117
virtual ~ThreadPool()
Definition: ThreadPool.cpp:161
std::exception_ptr m_exception_ptr
Definition: ThreadPool.h:116
size_t activeThreads() const
Return the number of active workers (either running or sleeping)
Definition: ThreadPool.cpp:139
ThreadPool(unsigned int thread_count=std::thread::hardware_concurrency(), unsigned int empty_queue_wait_time=50)
Constructs a new ThreadPool.
Definition: ThreadPool.cpp:90
T current_exception(T... args)
T emplace_back(T... args)
T empty(T... args)
T end(T... args)
T fill(T... args)
T lock(T... args)
T move(T... args)
T rethrow_exception(T... args)
T size(T... args)
T sleep_for(T... args)