std::condition_variable::notify_one/all() should be called after unlocking mutex (#1448)

* Move next job in task queue rather than copy

* Notify waiting thread after unlocking mutex

* Add unit test for TaskQueue

* Don't use C++14 feature in test code
This commit is contained in:
Jiwoo Park 2022-12-10 07:37:48 +09:00 committed by GitHub
parent 8f32271e8c
commit 58cffd3223
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 4 deletions

View file

@ -549,8 +549,11 @@ public:
~ThreadPool() override = default; ~ThreadPool() override = default;
void enqueue(std::function<void()> fn) override { void enqueue(std::function<void()> fn) override {
std::unique_lock<std::mutex> lock(mutex_); {
jobs_.push_back(std::move(fn)); std::unique_lock<std::mutex> lock(mutex_);
jobs_.push_back(std::move(fn));
}
cond_.notify_one(); cond_.notify_one();
} }
@ -559,9 +562,10 @@ public:
{ {
std::unique_lock<std::mutex> lock(mutex_); std::unique_lock<std::mutex> lock(mutex_);
shutdown_ = true; shutdown_ = true;
cond_.notify_all();
} }
cond_.notify_all();
// Join... // Join...
for (auto &t : threads_) { for (auto &t : threads_) {
t.join(); t.join();
@ -583,7 +587,7 @@ private:
if (pool_.shutdown_ && pool_.jobs_.empty()) { break; } if (pool_.shutdown_ && pool_.jobs_.empty()) { break; }
fn = pool_.jobs_.front(); fn = std::move(pool_.jobs_.front());
pool_.jobs_.pop_front(); pool_.jobs_.pop_front();
} }

View file

@ -5,6 +5,7 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include <future> #include <future>
#include <memory>
#include <sstream> #include <sstream>
#include <stdexcept> #include <stdexcept>
#include <thread> #include <thread>
@ -5522,3 +5523,18 @@ TEST(SocketStream, is_writable_INET) {
ASSERT_EQ(0, close(disconnected_svr_sock)); ASSERT_EQ(0, close(disconnected_svr_sock));
} }
#endif // #ifndef _WIN32 #endif // #ifndef _WIN32
TEST(TaskQueueTest, IncreaseAtomicInteger) {
static constexpr unsigned int number_of_task{1000000};
std::atomic_uint count{0};
std::unique_ptr<TaskQueue> task_queue{
new ThreadPool{CPPHTTPLIB_THREAD_POOL_COUNT}};
for (unsigned int i = 0; i < number_of_task; ++i) {
task_queue->enqueue(
[&count] { count.fetch_add(1, std::memory_order_relaxed); });
}
EXPECT_NO_THROW(task_queue->shutdown());
EXPECT_EQ(number_of_task, count.load());
}