Skip to content
Snippets Groups Projects
Commit 36f1e869 authored by Winston Jodjana's avatar Winston Jodjana
Browse files

threadpool cleanup

parent c101e17c
No related branches found
No related tags found
No related merge requests found
/*
* Copyright ©2021 Travis McGaha. All rights reserved. Permission is
* hereby granted to students registered for University of Washington
* CSE 333 for use solely during Spring Quarter 2021 for purposes of
* the course. No other use, copying, distribution, or modification
* is permitted without prior written consent. Copyrights for
* third-party components of this work must be honored. Instructors
* interested in reusing these course materials should contact the
* author.
*/
#include <unistd.h>
#include <iostream>
#include "./threadpool.h"
namespace hw4 {
// This is the thread start routine, i.e., the function that threads
// are born into.
void *ThreadLoop(void *t_pool);
......@@ -24,17 +11,17 @@ ThreadPool::ThreadPool(uint32_t num_threads) {
// Initialize our member variables.
num_threads_running_ = 0;
terminate_threads_ = false;
Verify333(pthread_mutex_init(&q_lock_, nullptr) == 0);
Verify333(pthread_cond_init(&q_cond_, nullptr) == 0);
Verify550(pthread_mutex_init(&q_lock_, nullptr) == 0);
Verify550(pthread_cond_init(&q_cond_, nullptr) == 0);
// Allocate the array of pthread structures.
thread_array_ = new pthread_t[num_threads];
// Spawn the threads one by one, passing them a pointer to self
// as the argument to the thread start routine.
Verify333(pthread_mutex_lock(&q_lock_) == 0);
// Verify550(pthread_mutex_lock(&q_lock_) == 0);
for (uint32_t i = 0; i < num_threads; i++) {
Verify333(pthread_create(&(thread_array_[i]),
Verify550(pthread_create(&(thread_array_[i]),
nullptr,
&ThreadLoop,
static_cast<void *>(this)) == 0);
......@@ -42,11 +29,11 @@ ThreadPool::ThreadPool(uint32_t num_threads) {
// Wait for all of the threads to be born and initialized.
while (num_threads_running_ != num_threads) {
Verify333(pthread_mutex_unlock(&q_lock_) == 0);
Verify550(pthread_mutex_unlock(&q_lock_) == 0);
sleep(1); // give another thread the chance to acquire the lock
Verify333(pthread_mutex_lock(&q_lock_) == 0);
Verify550(pthread_mutex_lock(&q_lock_) == 0);
}
Verify333(pthread_mutex_unlock(&q_lock_) == 0);
Verify550(pthread_mutex_unlock(&q_lock_) == 0);
// Done! The thread pool is ready, and all of the worker threads
// are initialized and waiting on q_cond_ to be notified of available
......@@ -54,7 +41,7 @@ ThreadPool::ThreadPool(uint32_t num_threads) {
}
ThreadPool:: ~ThreadPool() {
Verify333(pthread_mutex_lock(&q_lock_) == 0);
Verify550(pthread_mutex_lock(&q_lock_) == 0);
uint32_t num_threads = num_threads_running_;
// Tell all of the worker threads to terminate.
......@@ -64,20 +51,20 @@ ThreadPool:: ~ThreadPool() {
for (uint32_t i = 0; i < num_threads; i++) {
// Use a sledgehammer and broadcast every loop iteration, just to
// be extra-certain that worker threads wake up and see the terminate flag.
Verify333(pthread_cond_broadcast(&q_cond_) == 0);
Verify333(pthread_mutex_unlock(&q_lock_) == 0);
Verify333(pthread_join(thread_array_[i], nullptr) == 0);
Verify333(pthread_mutex_lock(&q_lock_) == 0);
Verify550(pthread_cond_broadcast(&q_cond_) == 0);
Verify550(pthread_mutex_unlock(&q_lock_) == 0);
Verify550(pthread_join(thread_array_[i], nullptr) == 0);
Verify550(pthread_mutex_lock(&q_lock_) == 0);
}
// All of the worker threads are dead, so clean up the thread
// structures.
Verify333(num_threads_running_ == 0);
Verify550(num_threads_running_ == 0);
if (thread_array_ != nullptr) {
delete[] thread_array_;
}
thread_array_ = nullptr;
Verify333(pthread_mutex_unlock(&q_lock_) == 0);
Verify550(pthread_mutex_unlock(&q_lock_) == 0);
// Empty the task queue, serially issuing any remaining work.
while (!work_queue_.empty()) {
......@@ -89,11 +76,11 @@ ThreadPool:: ~ThreadPool() {
// Enqueue a Task for dispatch.
void ThreadPool::Dispatch(Task *t) {
Verify333(pthread_mutex_lock(&q_lock_) == 0);
Verify333(terminate_threads_ == false);
Verify550(pthread_mutex_lock(&q_lock_) == 0);
Verify550(terminate_threads_ == false);
work_queue_.push_back(t);
Verify333(pthread_cond_signal(&q_cond_) == 0);
Verify333(pthread_mutex_unlock(&q_lock_) == 0);
Verify550(pthread_cond_signal(&q_cond_) == 0);
Verify550(pthread_mutex_unlock(&q_lock_) == 0);
}
// This is the main loop that all worker threads are born into. They
......@@ -105,13 +92,13 @@ void *ThreadLoop(void *t_pool) {
// Grab the lock, increment the thread count so that the ThreadPool
// constructor knows this new thread is alive.
Verify333(pthread_mutex_lock(&(pool->q_lock_)) == 0);
Verify550(pthread_mutex_lock(&(pool->q_lock_)) == 0);
pool->num_threads_running_++;
// This is our main thread work loop.
while (pool->terminate_threads_ == false) {
// Wait to be signaled that something has happened.
Verify333(pthread_cond_wait(&(pool->q_cond_), &(pool->q_lock_)) == 0);
Verify550(pthread_cond_wait(&(pool->q_cond_), &(pool->q_lock_)) == 0);
// Keep trying to dequeue work until the work queue is empty.
while (!pool->work_queue_.empty() && (pool->terminate_threads_ == false)) {
......@@ -121,16 +108,14 @@ void *ThreadLoop(void *t_pool) {
// We picked up a Task, so invoke the task function with the
// lock released, then check so see if more tasks are waiting to
// be picked up.
Verify333(pthread_mutex_unlock(&(pool->q_lock_)) == 0);
Verify550(pthread_mutex_unlock(&(pool->q_lock_)) == 0);
nextTask->func_(nextTask);
Verify333(pthread_mutex_lock(&(pool->q_lock_)) == 0);
Verify550(pthread_mutex_lock(&(pool->q_lock_)) == 0);
}
}
// All done, exit.
pool->num_threads_running_--;
Verify333(pthread_mutex_unlock(&(pool->q_lock_)) == 0);
Verify550(pthread_mutex_unlock(&(pool->q_lock_)) == 0);
return nullptr;
}
} // namespace hw4
/*
* Copyright ©2021 Travis McGaha. All rights reserved. Permission is
* hereby granted to students registered for University of Washington
* CSE 333 for use solely during Spring Quarter 2021 for purposes of
* the course. No other use, copying, distribution, or modification
* is permitted without prior written consent. Copyrights for
* third-party components of this work must be honored. Instructors
* interested in reusing these course materials should contact the
* author.
*/
#ifndef THREADPOOL_H_
#define THREADPOOL_H_
......@@ -19,15 +8,7 @@ extern "C" {
#include <stdint.h> // for uint32_t, etc.
#include <list> // for std::list
namespace hw4 {
// A ThreadPool is, well, a pool of threads. ;) A ThreadPool is an
// abstraction that allows customers to dispatch tasks to a set of
// worker threads. Tasks are queued, and as a worker thread becomes
// available, it pulls a task off the queue and invokes a function
// pointer in the task to process it. When it is done processing the
// task, the thread returns to the pool to receive and process the next
// available task.
#include "./verify550.h" // For asserts
class ThreadPool {
public:
// Construct a new ThreadPool with a certain number of worker
......@@ -85,58 +66,4 @@ class ThreadPool {
pthread_t *thread_array_;
};
// TODO: could probably change this to just handle i/o, and have things like
// - fd(to communicate with main thread), filename, and access to shared memory data structure
class HttpServerTask : public ThreadPool::Task {
public:
explicit HttpServerTask(ThreadPool::thread_task_fn f)
: ThreadPool::Task(f) { }
int client_fd;
uint16_t c_port;
std::string c_addr, c_dns, s_addr, s_dns;
std::string base_dir;
std::list<std::string>* indices;
};
static void HttpServer_ThrFn(ThreadPool::Task *t) {
// Cast back our HttpServerTask structure with all of our new
// client's information in it.
unique_ptr<HttpServerTask> hst(static_cast<HttpServerTask *>(t));
cout << " client " << hst->c_dns << ":" << hst->c_port << " "
<< "(IP address " << hst->c_addr << ")" << " connected." << endl;
// Read in the next request, process it, write the response.
// Use the HttpConnection class to read and process the next
// request from our current client, then write out our response. If
// the client sends a "Connection: close\r\n" header, then shut down
// the connection -- we're done.
//
// Hint: the client can make multiple requests on our single connection,
// so we should keep the connection open between requests rather than
// creating/destroying the same connection repeatedly.
// STEP 1:
HttpConnection cn(hst->client_fd);
HttpRequest rq;
bool done = false;
while (!done) {
if (!cn.GetNextRequest(&rq)) {
break;
}
HttpResponse resp = ProcessRequest(rq, hst->base_dir, *(hst->indices));
cn.WriteResponse(resp);
// If the client sent a "Connection: close" header, shut down
// the connection.
if (rq.GetHeaderValue("connection") == "close")
done = true;
}
}
} // namespace hw4
#endif // THREADPOOL_H_
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "./verify550.h"
void VerificationFailure(const char *exp, const char *file,
const char *baseFile, int line) {
if (!strcmp(file, baseFile)) {
fprintf(stderr,
"Verify550(%s) failed in file %s, line %d\n", exp, file, line);
} else {
fprintf(stderr,
"Verify550(%s) failed in file %s (included from %s), line %d\n",
exp, file, baseFile, line);
}
exit(EXIT_FAILURE);
}
\ No newline at end of file
#ifndef VERIFY550_H_
#define VERIFY550_H_
#include <stdint.h> // for uint32_t, etc.
// http://www.acm.uiuc.edu/sigops/roll_your_own/2.a.html
void VerificationFailure(const char *exp, const char *file,
const char *basefile, int line);
#define Verify550(exp) if (exp) ; \
else VerificationFailure(#exp, __FILE__, __BASE_FILE__, __LINE__)
#endif // VERIFY550_H_
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment