Skip to content
Snippets Groups Projects
Commit 380e38d3 authored by Winston Jodjana's avatar Winston Jodjana
Browse files

file read is done

parent 47a35182
No related branches found
No related tags found
No related merge requests found
......@@ -9,6 +9,8 @@
#include "threadpool.h"
#include "server_socket.h"
#include "FileParser.h" // File reading
#define BUFFER_SIZE 4096
static const int num_threads = 16;
......@@ -21,12 +23,12 @@ static void HttpServer_ThrFn(ThreadPool::Task *t);
// information about its usage.
void Usage(char *progname);
int main(int argc, char **argv) {
int main(int argc, char **argv)
{
// Check for valid number of arguments
if (argc != 3)
Usage(argv[0]);
// Parse the port number, and check
// for failure
unsigned short port = 0;
......@@ -36,29 +38,32 @@ int main(int argc, char **argv) {
// bind to a socket for listening for incoming clients
// and check for failure
int listen_fd = Listen(argv[1]);
if (listen_fd <= 0) {
if (listen_fd <= 0)
{
// Failed to bind/listen to a socket.
std::cerr << "Couldn't bind to any addresses." << std::endl;
return EXIT_FAILURE;
}
// loop and wait for client connections
while (1) {
while (1)
{
struct sockaddr_storage caddr;
socklen_t caddr_len = sizeof(caddr);
int client_fd = accept(listen_fd,
reinterpret_cast<struct sockaddr *>(&caddr),
&caddr_len);
reinterpret_cast<struct sockaddr *>(&caddr),
&caddr_len);
// check if we successfully accepted a client connection
if (client_fd < 0) {
std::cerr << "Failure on accept: " << strerror(errno) << std::endl;
close(listen_fd);
return EXIT_FAILURE;
if (client_fd < 0)
{
std::cerr << "Failure on accept: " << strerror(errno) << std::endl;
close(listen_fd);
return EXIT_FAILURE;
}
// TODO: initialize shared data structure
// TODO: modify logic to add threapool
// TODO: modify logic to add threapool
// TODO: use server_socket class
ServerSocket server_socket(port, argv[2]);
ThreadPool tp(num_threads);
......@@ -67,56 +72,59 @@ int main(int argc, char **argv) {
2) initialize stuff for communication with threadpool
3) read from client and write to thread
4) get back info from thread
*/
unsigned char buf[BUFFER_SIZE];
while (1) {
// TODO: Spawn workers when you see a client connect
// Read file path, clean it up, pass it to worker thread
//
HttpServerTask *hst = new HttpServerTask(HttpServer_ThrFn);
// TODO: change to new fields
hst->base_dir = static_file_dir_path_;
hst->indices = &indices_;
if (!server_socket.Accept(&hst->client_fd,
&hst->c_addr,
&hst->c_port,
&hst->c_dns,
&hst->s_addr,
&hst->s_dns)) {
while (1)
{
// TODO: Spawn workers when you see a client connect
// Read file path, clean it up, pass it to worker thread
//
HttpServerTask *hst = new HttpServerTask(HttpServer_ThrFn);
// TODO: change to new fields
hst->base_dir = static_file_dir_path_;
hst->indices = &indices_;
if (!server_socket.Accept(&hst->client_fd,
&hst->c_addr,
&hst->c_port,
&hst->c_dns,
&hst->s_addr,
&hst->s_dns))
{
// The accept failed for some reason, so quit out of the server.
// (Will happen when kill command is used to shut down the server.)
break;
}
}
//
// The accept succeeded; dispatch it.
tp.Dispatch(hst);
//
// The accept succeeded; dispatch it.
tp.Dispatch(hst);
}
// read from client fd and write to stdout
while (1) {
int rlen = WrappedRead(client_fd, buf, BUFFER_SIZE);
// check for reading failure
// on error, clean up and exit with failure
if (rlen <= 0) {
while (1)
{
int rlen = WrappedRead(client_fd, buf, BUFFER_SIZE);
// check for reading failure
// on error, clean up and exit with failure
if (rlen <= 0)
{
break;
}
}
int wlen = WrappedWrite(STDOUT_FILENO, buf, rlen);
// check that we wrote the same amount as we read in
// on error, clean up and exit with failure
if (wlen != rlen) {
int wlen = WrappedWrite(STDOUT_FILENO, buf, rlen);
// check that we wrote the same amount as we read in
// on error, clean up and exit with failure
if (wlen != rlen)
{
break;
}
}
}
close(client_fd);
}
// clean up
......@@ -124,17 +132,24 @@ int main(int argc, char **argv) {
return EXIT_SUCCESS;
}
static void HttpServer_ThrFn(ThreadPool::Task *t) {
static void HttpServer_ThrFn(ThreadPool::Task *t)
{
// TODO: Worker thread logic
// Input: Shared struct with cond. var., buffer, etc
// 1. A worker function that receives the name of a file (absolute file path),
// 2. reads the file content into the address space of the process,
// 3. and arranges to return (a pointer to) the file content to whoever dispatched the thread to the worker function.
// 1. A worker function that receives the name of a file (absolute file path),
HttpServerTask *hst = static_cast<HttpServerTask *>(t);
string fullfile = hst->work->filepath;
int file_len;
char *file = ReadFileToString(fullfile.c_str(), &file_len);
// 2. reads the file content into the address space of the process,
// 3. and arranges to return (a pointer to) the file content to whoever dispatched the thread to the worker function.
// TODO: Tell main thread that task is done, main thread needs to close the connection
}
void Usage(char *progname) {
void Usage(char *progname)
{
std::cerr << "usage: " << progname << " port" << std::endl;
exit(EXIT_FAILURE);
}
/*
* Copyright ©2020 Travis McGaha. All rights reserved. Permission is
* hereby granted to students registered for University of Washington
* CSE 333 for use solely during Summer Quarter 2020 for purposes of
* the course. No other use, copying, distribution, or modification
* is permitted without prior written consent. Copyrights for
* third-party components of this work must be honored. Instructors
* interested in reusing these course materials should contact the
* author.
*/
// Feature test macro enabling strdup (c.f., Linux Programming Interface p. 63)
#define _XOPEN_SOURCE 600
#include "./FileParser.h"
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <ctype.h>
#include <string.h>
///////////////////////////////////////////////////////////////////////////////
// Constants and declarations of internal helper functions
#define ASCII_UPPER_BOUND 0x7F
///////////////////////////////////////////////////////////////////////////////
// Publically-exported functions
char *ReadFileToString(const char *filename, int *size) {
struct stat filestat;
char *buf;
int fd;
ssize_t numread;
size_t left_to_read;
// STEP 1.
// Use the stat system call to fetch a "struct stat" that describes
// properties of the file. ("man 2 stat"). You can assume we're on a 64-bit
// system, with a 64-bit off_t field.
if (stat(filename, &filestat) == -1) { // stat failed
return NULL;
}
// STEP 2.
// Make sure this is a "regular file" and not a directory or something else
// (use the S_ISREG macro described in "man 2 stat").
if (!S_ISREG(filestat.st_mode)) { // not a regular file
return NULL;
}
// STEP 3.
// Attempt to open the file for reading (see also "man 2 open").
fd = open(filename, O_RDONLY);
if (fd == -1) {
close(fd);
return NULL;
}
// STEP 4.
// Allocate space for the file, plus 1 extra byte to
// '\0'-terminate the string.
*size = filestat.st_size + 1;
buf = (char *) malloc(*size * sizeof(char));
if (buf == NULL) { // malloc failed
close(fd);
return NULL;
}
// STEP 5.
// Read in the file contents using the read() system call (see also
// "man 2 read"), being sure to handle the case that read() returns -1 and
// errno is either EAGAIN or EINTR. Also, note that it is not an error for
// read to return fewer bytes than what you asked for; you'll need to put
// read() inside a while loop, looping until you've read to the end of file
// or a non-recoverable error. Read the man page for read() carefully, in
// particular what the return values -1 and 0 imply.
left_to_read = filestat.st_size;
while (left_to_read > 0) {
numread = read(fd, buf, left_to_read);
if (numread == -1) {
// either interrupted or an actual error
if (errno != EINTR || errno != EAGAIN) {
// Some kind of error.
free(buf);
close(fd);
return NULL;
}
} else {
left_to_read -= numread;
}
}
// Great, we're done! We hit the end of the file and we read
// filestat.st_size - left_to_read bytes. Close the file descriptor returned
// by open() and return through the "size" output parameter how many bytes
// we read.
close(fd);
*size = filestat.st_size - left_to_read;
// Null-terminate the string.
buf[*size] = '\0';
return buf;
}
\ No newline at end of file
#ifndef FILEPARSER_H_
#define FILEPARSER_H_
// Reads the full contents of "filename" into memory, malloc'ing space for
// its contents and returning a pointer to the allocated memory. No special
// escaping/handling is provided if the file contains null (ie, zero-valued)
// bytes.
//
// Arguments:
// - filename: a string containing the pathname of the file to read
// - size: if successful, an output parameter through which we return the size
// of the file in bytes, assuming we successfully read it in.
//
// Returns:
// - NULL, if the read fails
// - Otherwise, the file's content. This function will add '\0' to the end
// of the file; however, the number of bytes returned through 'size' doesn't
// include the final '\0'. The caller is responsible for freeing the
// returned pointer.
char* ReadFileToString(const char *filename, int *size);
#endif // FILEPARSER_H_
\ No newline at end of file
......@@ -7,7 +7,8 @@
// are born into.
void *ThreadLoop(void *t_pool);
ThreadPool::ThreadPool(uint32_t num_threads) {
ThreadPool::ThreadPool(uint32_t num_threads)
{
// Initialize our member variables.
num_threads_running_ = 0;
terminate_threads_ = false;
......@@ -20,7 +21,8 @@ ThreadPool::ThreadPool(uint32_t num_threads) {
// Spawn the threads one by one, passing them a pointer to self
// as the argument to the thread start routine.
// Verify550(pthread_mutex_lock(&q_lock_) == 0);
for (uint32_t i = 0; i < num_threads; i++) {
for (uint32_t i = 0; i < num_threads; i++)
{
Verify550(pthread_create(&(thread_array_[i]),
nullptr,
&ThreadLoop,
......@@ -28,9 +30,10 @@ ThreadPool::ThreadPool(uint32_t num_threads) {
}
// Wait for all of the threads to be born and initialized.
while (num_threads_running_ != num_threads) {
while (num_threads_running_ != num_threads)
{
Verify550(pthread_mutex_unlock(&q_lock_) == 0);
sleep(1); // give another thread the chance to acquire the lock
sleep(1); // give another thread the chance to acquire the lock
Verify550(pthread_mutex_lock(&q_lock_) == 0);
}
Verify550(pthread_mutex_unlock(&q_lock_) == 0);
......@@ -40,7 +43,8 @@ ThreadPool::ThreadPool(uint32_t num_threads) {
// work.
}
ThreadPool:: ~ThreadPool() {
ThreadPool::~ThreadPool()
{
Verify550(pthread_mutex_lock(&q_lock_) == 0);
uint32_t num_threads = num_threads_running_;
......@@ -48,7 +52,8 @@ ThreadPool:: ~ThreadPool() {
terminate_threads_ = true;
// Join with the running threads 1-by-1 until they have all died.
for (uint32_t i = 0; i < num_threads; i++) {
for (uint32_t i = 0; i < num_threads; i++)
{
// Use a sledgehammer and broadcast every loop iteration, just to
// be extra-certain that worker threads wake up and see the terminate flag.
Verify550(pthread_cond_broadcast(&q_cond_) == 0);
......@@ -60,14 +65,16 @@ ThreadPool:: ~ThreadPool() {
// All of the worker threads are dead, so clean up the thread
// structures.
Verify550(num_threads_running_ == 0);
if (thread_array_ != nullptr) {
if (thread_array_ != nullptr)
{
delete[] thread_array_;
}
thread_array_ = nullptr;
Verify550(pthread_mutex_unlock(&q_lock_) == 0);
// Empty the task queue, serially issuing any remaining work.
while (!work_queue_.empty()) {
while (!work_queue_.empty())
{
Task *nextTask = work_queue_.front();
work_queue_.pop_front();
nextTask->func_(nextTask);
......@@ -75,7 +82,8 @@ ThreadPool:: ~ThreadPool() {
}
// Enqueue a Task for dispatch.
void ThreadPool::Dispatch(Task *t) {
void ThreadPool::Dispatch(Task *t)
{
Verify550(pthread_mutex_lock(&q_lock_) == 0);
Verify550(terminate_threads_ == false);
work_queue_.push_back(t);
......@@ -87,7 +95,8 @@ void ThreadPool::Dispatch(Task *t) {
// wait for a signal on the work queue condition variable, then they
// grab work off the queue. Threads return (i.e., terminate)
// when they notice that terminate_threads_ is true.
void *ThreadLoop(void *t_pool) {
void *ThreadLoop(void *t_pool)
{
ThreadPool *pool = static_cast<ThreadPool *>(t_pool);
// Grab the lock, increment the thread count so that the ThreadPool
......@@ -96,12 +105,14 @@ void *ThreadLoop(void *t_pool) {
pool->num_threads_running_++;
// This is our main thread work loop.
while (pool->terminate_threads_ == false) {
while (pool->terminate_threads_ == false)
{
// Wait to be signaled that something has happened.
Verify550(pthread_cond_wait(&(pool->q_cond_), &(pool->q_lock_)) == 0);
// Keep trying to dequeue work until the work queue is empty.
while (!pool->work_queue_.empty() && (pool->terminate_threads_ == false)) {
while (!pool->work_queue_.empty() && (pool->terminate_threads_ == false))
{
ThreadPool::Task *nextTask = pool->work_queue_.front();
pool->work_queue_.pop_front();
......
#ifndef THREADPOOL_H_
#define THREADPOOL_H_
extern "C" {
#include <pthread.h> // for the pthread threading/mutex functions
extern "C"
{
#include <pthread.h> // for the pthread threading/mutex functions
}
#include <stdint.h> // for uint32_t, etc.
#include <list> // for std::list
#include <stdint.h> // for uint32_t, etc.
#include <list> // for std::list
#include "./verify550.h" // For asserts
class ThreadPool {
public:
// Standard library import
#include <bits/stdc++.h>
using namespace std;
class ThreadPool
{
public:
// Construct a new ThreadPool with a certain number of worker
// threads. Arguments:
//
......@@ -28,11 +34,12 @@ class ThreadPool {
class Task;
typedef void (*thread_task_fn)(Task *arg);
class Task {
public:
class Task
{
public:
// "f" is the task function that a worker thread should invoke to
// process the task.
explicit Task(thread_task_fn func) : func_(func) { }
explicit Task(thread_task_fn func) : func_(func) {}
// The dispatch function.
thread_task_fn func_;
......@@ -45,7 +52,7 @@ class ThreadPool {
// A lock and condition variable that worker threads and the
// Dispatch function use to guard the Task queue.
pthread_mutex_t q_lock_;
pthread_cond_t q_cond_;
pthread_cond_t q_cond_;
// The queue of Tasks waiting to be dispatched to a worker thread.
std::list<Task *> work_queue_;
......@@ -62,23 +69,27 @@ class ThreadPool {
// terminates, they decrement it.
uint32_t num_threads_running_;
private:
private:
// The pthreads pthread_t structures representing each thread.
pthread_t *thread_array_;
};
class HttpServerTask : public ThreadPool::Task {
public:
class HttpServerTask : public ThreadPool::Task
{
public:
explicit HttpServerTask(ThreadPool::thread_task_fn f)
: ThreadPool::Task(f) { }
Work* work;
: ThreadPool::Task(f) {}
Work *work; // Struct to hold data for passing between main and worker thread
};
struct Work {
char *buffer; // Buffer for main thread to read file contents from worker thread
int written; // Number of bytes (char) that the worker thread has written to the buffer
pthread_cond_t buffer_filled; // Cond. var. for when the main thread should read from the buffer and write back the file contents to the client
pthread_cond_t task_finished; // Cond. var. for when the worker thread has finished reading all the file contents
struct Work
{
string filepath; // String for the filepath of the file the client is requesting for
char *buffer; // Buffer for main thread to read file contents from worker thread
int bytes_written; // Number of bytes (char) that the worker thread has written to the buffer
pthread_cond_t buffer_filled; // Cond. var. for when the main thread should read from the buffer and write back the file contents to the client
pthread_cond_t task_finished; // Cond. var. for when the worker thread has finished reading all the file contents
};
#endif // THREADPOOL_H_
#endif // THREADPOOL_H_
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment