Skip to content
Snippets Groups Projects
server.cc 10.9 KiB
Newer Older
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
 *
 * store/txnstore/server.cc:
 *   Implementation of a single transactional key-value server.
 *
 * Copyright 2015 Irene Zhang <iyzhang@cs.washington.edu>
 *                Naveen Kr. Sharma <nksharma@cs.washington.edu>
 *
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use, copy,
 * modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 *
 **********************************************************************/

#include "server.h"
namespace txnstore {

using namespace proto;

Server::Server(Mode mode, uint64_t skew, uint64_t error) : mode(mode)
{
    timeServer = TrueTime(skew, error);

    switch (mode) {
    case MODE_LOCK:
    case MODE_SPAN_LOCK:
        store = new txnstore::LockStore();
        break;
    case MODE_OCC:
    case MODE_SPAN_OCC:
        store = new txnstore::OCCStore();
        break;
    default:
        NOT_REACHABLE();
    }
}

Server::~Server()
{
    delete store;
}

void
Server::LeaderUpcall(opnum_t opnum, const string &str1, bool &replicate, string &str2)
{
    Debug("Received LeaderUpcall: %lu %s", opnum, str1.c_str());

    Request request;
    Reply reply;
    int status;
    
    request.ParseFromString(str1);

    switch (request.op()) {
    case txnstore::proto::Request::GET:
        if (request.get().has_timestamp()) {
            pair<Timestamp, string> val;
            status = store->Get(request.txnid(), request.get().key(),
                               request.get().timestamp(), val);
            if (status == 0) {
                reply.set_value(val.second);
            }
        } else {
            pair<Timestamp, string> val;
            status = store->Get(request.txnid(), request.get().key(), val);
            if (status == 0) {
                reply.set_value(val.second);
                reply.set_timestamp(val.first.getTimestamp());
            }
        }
        replicate = false;
        reply.set_status(status);
        reply.SerializeToString(&str2);
        break;
    case txnstore::proto::Request::PREPARE:
        // Prepare is the only case that is conditionally run at the leader
        status = store->Prepare(request.txnid(),
                                Transaction(request.prepare().txn()));

        // if prepared, then replicate result
        if (status == 0) {
            replicate = true;
            // get a prepare timestamp and send along to replicas
            if (mode == MODE_SPAN_LOCK || mode == MODE_SPAN_OCC) {
                request.mutable_prepare()->set_timestamp(timeServer.GetTime());
            }
            request.SerializeToString(&str2);
        } else {
            // if abort, don't replicate
            replicate = false;
            reply.set_status(status);
            reply.SerializeToString(&str2);
        }
        break;
    case txnstore::proto::Request::COMMIT:
        replicate = true;
        str2 = str1;
        break;
    case txnstore::proto::Request::ABORT:
        replicate = true;
        str2 = str1;
        break;
    default:
        Panic("Unrecognized operation.");
    }
}

/* Gets called when a command is issued using client.Invoke(...) to this
 * replica group. 
 * opnum is the operation number.
 * str1 is the request string passed by the client.
 * str2 is the reply which will be sent back to the client.
 */
void
Server::ReplicaUpcall(opnum_t opnum,
              const string &str1,
              string &str2)
{
    Debug("Received Upcall: %lu %s", opnum, str1.c_str());
    Request request;
    Reply reply;
    int status = 0;
    
    request.ParseFromString(str1);

    switch (request.op()) {
    case txnstore::proto::Request::GET:
    case txnstore::proto::Request::PREPARE:
        // get a prepare timestamp and return to client
        store->Prepare(request.txnid(),
                       Transaction(request.prepare().txn()));
        if (mode == MODE_SPAN_LOCK || mode == MODE_SPAN_OCC) {
            reply.set_timestamp(request.prepare().timestamp());
        }
        break;
    case txnstore::proto::Request::COMMIT:
        store->Commit(request.txnid(), request.commit().timestamp());
        break;
    case txnstore::proto::Request::ABORT:
        store->Abort(request.txnid(), Transaction(request.abort().txn()));
        break;
    default:
        Panic("Unrecognized operation.");
    }
    reply.set_status(status);
    reply.SerializeToString(&str2);
}

void
Server::UnloggedUpcall(const string &str1, string &str2)
{
    Request request;
    Reply reply;
    int status;
    
    request.ParseFromString(str1);

    ASSERT(request.op() == txnstore::proto::Request::GET);

    if (request.get().has_timestamp()) {
        pair<Timestamp, string> val;
        status = store->Get(request.txnid(), request.get().key(),
                            request.get().timestamp(), val);
        if (status == 0) {
            reply.set_value(val.second);
        }
    } else {
        pair<Timestamp, string> val;
        status = store->Get(request.txnid(), request.get().key(), val);
        if (status == 0) {
            reply.set_value(val.second);
            reply.set_timestamp(val.first.getTimestamp());
        }
    }
    
    reply.set_status(status);
    reply.SerializeToString(&str2);
}

void
Server::Load(const string &key, const string &value, const Timestamp timestamp)
{
    store->Load(key, value, timestamp);
}

} // namespace txnstore

int
main(int argc, char **argv)
{
    int index = -1;
    unsigned int myShard=0, maxShard=1, nKeys=1;
    const char *configPath = NULL;
    const char *keyPath = NULL;
    int64_t skew = 0, error = 0;
    txnstore::Mode mode;

    // Parse arguments
    int opt;
    while ((opt = getopt(argc, argv, "c:i:m:e:s:f:n:N:k:")) != -1) {
        switch (opt) {
        case 'c':
            configPath = optarg;
            break;
            
        case 'i':
        {
            char *strtolPtr;
            index = strtoul(optarg, &strtolPtr, 10);
            if ((*optarg == '\0') || (*strtolPtr != '\0') || (index < 0))
            {
                fprintf(stderr, "option -i requires a numeric arg\n");
            }
            break;
        }
        
        case 'm':
        {
            if (strcasecmp(optarg, "lock") == 0) {
                mode = txnstore::MODE_LOCK;
            } else if (strcasecmp(optarg, "occ") == 0) {
                mode = txnstore::MODE_OCC;
            } else if (strcasecmp(optarg, "span-lock") == 0) {
                mode = txnstore::MODE_SPAN_LOCK;
            } else if (strcasecmp(optarg, "span-occ") == 0) {
                mode = txnstore::MODE_SPAN_OCC;
            } else {
                fprintf(stderr, "unknown mode '%s'\n", optarg);
            }
            break;
        }

        case 's':
        {
            char *strtolPtr;
            skew = strtoul(optarg, &strtolPtr, 10);
            if ((*optarg == '\0') || (*strtolPtr != '\0') || (skew < 0))
            {
                fprintf(stderr, "option -s requires a numeric arg\n");
            }
            break;
        }

        case 'e':
        {
            char *strtolPtr;
            error = strtoul(optarg, &strtolPtr, 10);
            if ((*optarg == '\0') || (*strtolPtr != '\0') || (error < 0))
            {
                fprintf(stderr, "option -e requires a numeric arg\n");
            }
            break;
        }

        case 'k':
        {
            char *strtolPtr;
            nKeys = strtoul(optarg, &strtolPtr, 10);
            if ((*optarg == '\0') || (*strtolPtr != '\0'))
            {
                fprintf(stderr, "option -e requires a numeric arg\n");
            }
            break;
        }

        case 'n':
        {
            char *strtolPtr;
            myShard = strtoul(optarg, &strtolPtr, 10);
            if ((*optarg == '\0') || (*strtolPtr != '\0'))
            {
                fprintf(stderr, "option -e requires a numeric arg\n");
            }
            break;
        }

        case 'N':
        {
            char *strtolPtr;
            maxShard = strtoul(optarg, &strtolPtr, 10);
            if ((*optarg == '\0') || (*strtolPtr != '\0') || (maxShard <= 0))
            {
                fprintf(stderr, "option -e requires a numeric arg\n");
            }
            break;
        }

        case 'f':   // Load keys from file
        {
            keyPath = optarg;
            break;
        }

        default:
            fprintf(stderr, "Unknown argument %s\n", argv[optind]);
        }


    }

    if (!configPath) {
        fprintf(stderr, "option -c is required\n");
    }

    if (index == -1) {
        fprintf(stderr, "option -i is required\n");
    }

    if (mode == txnstore::MODE_UNKNOWN) {
        fprintf(stderr, "option -m is required\n");
    }

    // Load configuration
    std::ifstream configStream(configPath);
    if (configStream.fail()) {
        fprintf(stderr, "unable to read configuration file: %s\n", configPath);
    }
    transport::Configuration config(configStream);

    if (index >= config.n) {
        fprintf(stderr, "replica index %d is out of bounds; "
                "only %d replicas defined\n", index, config.n);
    }

    UDPTransport transport(0.0, 0.0, 0);

    txnstore::Server server(mode, skew, error);
    replication::vr::VRReplica replica(config, index, &transport, 1, &server);
    
    if (keyPath) {
        string key;
        ifstream in;
        in.open(keyPath);
        if (!in) {
            fprintf(stderr, "Could not read keys from: %s\n", keyPath);
            exit(0);
        }

        for (unsigned int i = 0; i < nKeys; i++) {
            getline(in, key);
            
            uint64_t hash = 5381;
            const char* str = key.c_str();
            for (unsigned int j = 0; j < key.length(); j++) {
                hash = ((hash << 5) + hash) + (uint64_t)str[j];
            }

            if (hash % maxShard == myShard) {
                server.Load(key, "null", Timestamp());
            }
        }
        in.close();
    }

    transport.Run();

    return 0;
}