new_task_queue support

This commit is contained in:
yhirose 2019-08-03 13:15:05 +09:00
parent 47312e6df9
commit 579ff1a0a6

View file

@ -265,12 +265,20 @@ private:
std::string buffer;
};
class ThreadsTaskQueue {
class TaskQueue {
public:
TaskQueue() {}
virtual ~TaskQueue() {}
virtual void enque(std::function<void(void)> fn) = 0;
virtual void shutdown() = 0;
};
class ThreadsTaskQueue : public TaskQueue {
public:
ThreadsTaskQueue() : running_threads_(0) {}
~ThreadsTaskQueue() {}
virtual ~ThreadsTaskQueue() {}
void enque(std::function<void(void)> fn) {
virtual void enque(std::function<void(void)> fn) override {
std::thread([=]() {
{
std::lock_guard<std::mutex> guard(running_threads_mutex_);
@ -286,7 +294,7 @@ public:
}).detach();
}
void shutdown() {
virtual void shutdown() override {
for (;;) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::lock_guard<std::mutex> guard(running_threads_mutex_);
@ -299,8 +307,6 @@ private:
int running_threads_;
};
typedef ThreadsTaskQueue TaskQueue;
class Server {
public:
typedef std::function<void(const Request &, Response &)> Handler;
@ -336,6 +342,8 @@ public:
bool is_running() const;
void stop();
std::function<TaskQueue*(void)> new_task_queue;
protected:
bool process_request(Stream &strm, bool last_connection,
bool &connection_close,
@ -373,8 +381,6 @@ private:
Handlers options_handlers_;
Handler error_handler_;
Logger logger_;
TaskQueue task_queue_;
};
class Client {
@ -2226,40 +2232,48 @@ inline int Server::bind_internal(const char *host, int port, int socket_flags) {
inline bool Server::listen_internal() {
auto ret = true;
is_running_ = true;
for (;;) {
if (svr_sock_ == INVALID_SOCKET) {
// The server socket was closed by 'stop' method.
break;
{
std::unique_ptr<TaskQueue> task_queue;
if (new_task_queue) {
task_queue.reset(new_task_queue());
} else {
task_queue.reset(new ThreadsTaskQueue());
}
auto val = detail::select_read(svr_sock_, 0, 100000);
if (val == 0) { // Timeout
continue;
}
socket_t sock = accept(svr_sock_, nullptr, nullptr);
if (sock == INVALID_SOCKET) {
if (svr_sock_ != INVALID_SOCKET) {
detail::close_socket(svr_sock_);
ret = false;
} else {
; // The server socket was closed by user.
for (;;) {
if (svr_sock_ == INVALID_SOCKET) {
// The server socket was closed by 'stop' method.
break;
}
break;
auto val = detail::select_read(svr_sock_, 0, 100000);
if (val == 0) { // Timeout
continue;
}
socket_t sock = accept(svr_sock_, nullptr, nullptr);
if (sock == INVALID_SOCKET) {
if (svr_sock_ != INVALID_SOCKET) {
detail::close_socket(svr_sock_);
ret = false;
} else {
; // The server socket was closed by user.
}
break;
}
task_queue->enque([=]() { read_and_close_socket(sock); });
}
task_queue_.enque([=]() { read_and_close_socket(sock); });
task_queue->shutdown();
}
task_queue_.shutdown();
is_running_ = false;
return ret;
}