Skip to content
Snippets Groups Projects
Commit c101e17c authored by Mengqi Chen's avatar Mengqi Chen
Browse files

add more files

parent 91b598d6
No related branches found
No related tags found
No related merge requests found
......@@ -15,7 +15,7 @@ void Usage(char *progname);
int main(int argc, char **argv) {
// Check for valid number of arguments
if (argc != 2)
if (argc != 3)
Usage(argv[0]);
// Parse the port number, and check
......@@ -33,40 +33,65 @@ int main(int argc, char **argv) {
return EXIT_FAILURE;
}
// accept a client connection
struct sockaddr_storage caddr;
socklen_t caddr_len = sizeof(caddr);
int client_fd = accept(listen_fd,
reinterpret_cast<struct sockaddr *>(&caddr),
&caddr_len);
// loop and wait for client connections
while (1) {
struct sockaddr_storage caddr;
socklen_t caddr_len = sizeof(caddr);
int client_fd = accept(listen_fd,
reinterpret_cast<struct sockaddr *>(&caddr),
&caddr_len);
// check if we successfully accepted a client connection
if (client_fd < 0) {
std::cerr << "Failure on accept: " << strerror(errno) << std::endl;
close(listen_fd);
return EXIT_FAILURE;
}
// check if we successfully accepted a client connection
if (client_fd < 0) {
std::cerr << "Failure on accept: " << strerror(errno) << std::endl;
close(listen_fd);
return EXIT_FAILURE;
}
// TODO: modify logic to add threapool
// read from client fd and write to stdout
unsigned char buf[BUFFER_SIZE];
ThreadPool tp(kNumThreads);
while (1) {
int rlen = WrappedRead(client_fd, buf, BUFFER_SIZE);
// check for reading failure
// on error, clean up and exit with failure
if (rlen <= 0) {
HttpServerTask *hst = new HttpServerTask(HttpServer_ThrFn);
hst->base_dir = static_file_dir_path_;
hst->indices = &indices_;
if (!socket_.Accept(&hst->client_fd,
&hst->c_addr,
&hst->c_port,
&hst->c_dns,
&hst->s_addr,
&hst->s_dns)) {
// The accept failed for some reason, so quit out of the server.
// (Will happen when kill command is used to shut down the server.)
break;
}
// The accept succeeded; dispatch it.
tp.Dispatch(hst);
}
int wlen = WrappedWrite(STDOUT_FILENO, buf, rlen);
// check that we wrote the same amount as we read in
// on error, clean up and exit with failure
if (wlen != rlen) {
break;
// read from client fd and write to stdout
unsigned char buf[BUFFER_SIZE];
while (1) {
int rlen = WrappedRead(client_fd, buf, BUFFER_SIZE);
// check for reading failure
// on error, clean up and exit with failure
if (rlen <= 0) {
break;
}
int wlen = WrappedWrite(STDOUT_FILENO, buf, rlen);
// check that we wrote the same amount as we read in
// on error, clean up and exit with failure
if (wlen != rlen) {
break;
}
}
close(client_fd);
}
// clean up
close(client_fd);
close(listen_fd);
return EXIT_SUCCESS;
}
......
/*
* Copyright ©2021 Travis McGaha. All rights reserved. Permission is
* hereby granted to students registered for University of Washington
* CSE 333 for use solely during Spring Quarter 2021 for purposes of
* the course. No other use, copying, distribution, or modification
* is permitted without prior written consent. Copyrights for
* third-party components of this work must be honored. Instructors
* interested in reusing these course materials should contact the
* author.
*/
#include <unistd.h>
#include <iostream>
#include "./threadpool.h"
namespace hw4 {
// This is the thread start routine, i.e., the function that threads
// are born into.
void *ThreadLoop(void *t_pool);
ThreadPool::ThreadPool(uint32_t num_threads) {
// Initialize our member variables.
num_threads_running_ = 0;
terminate_threads_ = false;
Verify333(pthread_mutex_init(&q_lock_, nullptr) == 0);
Verify333(pthread_cond_init(&q_cond_, nullptr) == 0);
// Allocate the array of pthread structures.
thread_array_ = new pthread_t[num_threads];
// Spawn the threads one by one, passing them a pointer to self
// as the argument to the thread start routine.
Verify333(pthread_mutex_lock(&q_lock_) == 0);
for (uint32_t i = 0; i < num_threads; i++) {
Verify333(pthread_create(&(thread_array_[i]),
nullptr,
&ThreadLoop,
static_cast<void *>(this)) == 0);
}
// Wait for all of the threads to be born and initialized.
while (num_threads_running_ != num_threads) {
Verify333(pthread_mutex_unlock(&q_lock_) == 0);
sleep(1); // give another thread the chance to acquire the lock
Verify333(pthread_mutex_lock(&q_lock_) == 0);
}
Verify333(pthread_mutex_unlock(&q_lock_) == 0);
// Done! The thread pool is ready, and all of the worker threads
// are initialized and waiting on q_cond_ to be notified of available
// work.
}
ThreadPool:: ~ThreadPool() {
Verify333(pthread_mutex_lock(&q_lock_) == 0);
uint32_t num_threads = num_threads_running_;
// Tell all of the worker threads to terminate.
terminate_threads_ = true;
// Join with the running threads 1-by-1 until they have all died.
for (uint32_t i = 0; i < num_threads; i++) {
// Use a sledgehammer and broadcast every loop iteration, just to
// be extra-certain that worker threads wake up and see the terminate flag.
Verify333(pthread_cond_broadcast(&q_cond_) == 0);
Verify333(pthread_mutex_unlock(&q_lock_) == 0);
Verify333(pthread_join(thread_array_[i], nullptr) == 0);
Verify333(pthread_mutex_lock(&q_lock_) == 0);
}
// All of the worker threads are dead, so clean up the thread
// structures.
Verify333(num_threads_running_ == 0);
if (thread_array_ != nullptr) {
delete[] thread_array_;
}
thread_array_ = nullptr;
Verify333(pthread_mutex_unlock(&q_lock_) == 0);
// Empty the task queue, serially issuing any remaining work.
while (!work_queue_.empty()) {
Task *nextTask = work_queue_.front();
work_queue_.pop_front();
nextTask->func_(nextTask);
}
}
// Enqueue a Task for dispatch.
void ThreadPool::Dispatch(Task *t) {
Verify333(pthread_mutex_lock(&q_lock_) == 0);
Verify333(terminate_threads_ == false);
work_queue_.push_back(t);
Verify333(pthread_cond_signal(&q_cond_) == 0);
Verify333(pthread_mutex_unlock(&q_lock_) == 0);
}
// This is the main loop that all worker threads are born into. They
// wait for a signal on the work queue condition variable, then they
// grab work off the queue. Threads return (i.e., terminate)
// when they notice that terminate_threads_ is true.
void *ThreadLoop(void *t_pool) {
ThreadPool *pool = static_cast<ThreadPool *>(t_pool);
// Grab the lock, increment the thread count so that the ThreadPool
// constructor knows this new thread is alive.
Verify333(pthread_mutex_lock(&(pool->q_lock_)) == 0);
pool->num_threads_running_++;
// This is our main thread work loop.
while (pool->terminate_threads_ == false) {
// Wait to be signaled that something has happened.
Verify333(pthread_cond_wait(&(pool->q_cond_), &(pool->q_lock_)) == 0);
// Keep trying to dequeue work until the work queue is empty.
while (!pool->work_queue_.empty() && (pool->terminate_threads_ == false)) {
ThreadPool::Task *nextTask = pool->work_queue_.front();
pool->work_queue_.pop_front();
// We picked up a Task, so invoke the task function with the
// lock released, then check so see if more tasks are waiting to
// be picked up.
Verify333(pthread_mutex_unlock(&(pool->q_lock_)) == 0);
nextTask->func_(nextTask);
Verify333(pthread_mutex_lock(&(pool->q_lock_)) == 0);
}
}
// All done, exit.
pool->num_threads_running_--;
Verify333(pthread_mutex_unlock(&(pool->q_lock_)) == 0);
return nullptr;
}
} // namespace hw4
/*
* Copyright ©2021 Travis McGaha. All rights reserved. Permission is
* hereby granted to students registered for University of Washington
* CSE 333 for use solely during Spring Quarter 2021 for purposes of
* the course. No other use, copying, distribution, or modification
* is permitted without prior written consent. Copyrights for
* third-party components of this work must be honored. Instructors
* interested in reusing these course materials should contact the
* author.
*/
#ifndef THREADPOOL_H_
#define THREADPOOL_H_
extern "C" {
#include <pthread.h> // for the pthread threading/mutex functions
}
#include <stdint.h> // for uint32_t, etc.
#include <list> // for std::list
namespace hw4 {
// A ThreadPool is, well, a pool of threads. ;) A ThreadPool is an
// abstraction that allows customers to dispatch tasks to a set of
// worker threads. Tasks are queued, and as a worker thread becomes
// available, it pulls a task off the queue and invokes a function
// pointer in the task to process it. When it is done processing the
// task, the thread returns to the pool to receive and process the next
// available task.
class ThreadPool {
public:
// Construct a new ThreadPool with a certain number of worker
// threads. Arguments:
//
// - num_threads: the number of threads in the pool.
explicit ThreadPool(uint32_t num_threads);
virtual ~ThreadPool();
// This inner class defines what a Task is. A worker thread will
// pull a task off the task queue and invoke the thread_task_fn
// function pointer inside of it, passing it the Task* itself as an
// argument. The thread_task_fn takes ownership of the Task and
// must arrange to delete the task when it is done. Customers will
// probably want to subclass Task to add task-specific fields to it.
class Task;
typedef void (*thread_task_fn)(Task *arg);
class Task {
public:
// "f" is the task function that a worker thread should invoke to
// process the task.
explicit Task(thread_task_fn func) : func_(func) { }
// The dispatch function.
thread_task_fn func_;
};
// Customers use Dispatch() to enqueue a Task for dispatch to a
// worker thread.
void Dispatch(Task *t);
// A lock and condition variable that worker threads and the
// Dispatch function use to guard the Task queue.
pthread_mutex_t q_lock_;
pthread_cond_t q_cond_;
// The queue of Tasks waiting to be dispatched to a worker thread.
std::list<Task *> work_queue_;
// This should be set to "true" when it is time for the worker
// threads to terminate, i.e., when the ThreadPool is
// destroyed. A worker thread will check this variable before
// picking up its next piece of work; if it is true, the worker
// threads will terminate.
bool terminate_threads_;
// This variable stores how many threads are currently running. As
// worker threads are born, they increment it, and as worker threads
// terminates, they decrement it.
uint32_t num_threads_running_;
private:
// The pthreads pthread_t structures representing each thread.
pthread_t *thread_array_;
};
// TODO: could probably change this to just handle i/o, and have things like
// - fd(to communicate with main thread), filename, and access to shared memory data structure
class HttpServerTask : public ThreadPool::Task {
public:
explicit HttpServerTask(ThreadPool::thread_task_fn f)
: ThreadPool::Task(f) { }
int client_fd;
uint16_t c_port;
std::string c_addr, c_dns, s_addr, s_dns;
std::string base_dir;
std::list<std::string>* indices;
};
static void HttpServer_ThrFn(ThreadPool::Task *t) {
// Cast back our HttpServerTask structure with all of our new
// client's information in it.
unique_ptr<HttpServerTask> hst(static_cast<HttpServerTask *>(t));
cout << " client " << hst->c_dns << ":" << hst->c_port << " "
<< "(IP address " << hst->c_addr << ")" << " connected." << endl;
// Read in the next request, process it, write the response.
// Use the HttpConnection class to read and process the next
// request from our current client, then write out our response. If
// the client sends a "Connection: close\r\n" header, then shut down
// the connection -- we're done.
//
// Hint: the client can make multiple requests on our single connection,
// so we should keep the connection open between requests rather than
// creating/destroying the same connection repeatedly.
// STEP 1:
HttpConnection cn(hst->client_fd);
HttpRequest rq;
bool done = false;
while (!done) {
if (!cn.GetNextRequest(&rq)) {
break;
}
HttpResponse resp = ProcessRequest(rq, hst->base_dir, *(hst->indices));
cn.WriteResponse(resp);
// If the client sent a "Connection: close" header, shut down
// the connection.
if (rq.GetHeaderValue("connection") == "close")
done = true;
}
}
} // namespace hw4
#endif // THREADPOOL_H_
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment