Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • copywrite_mjwhittaker
  • decide_multiset
  • inconsistent_confirm
  • io-queue
  • ir_recovery
  • ir_recovery_squashed
  • majority_decide
  • master
  • multi
  • rdma
  • recovery_only_when_told
  • synchronization
  • tapir_per_core_bench
13 results

Target

Select target project
No results found
Select Git revision
  • coltdb
  • copywrite_mjwhittaker
  • decide_multiset
  • inconsistent_confirm
  • io-queue
  • ir_recovery
  • ir_recovery_squashed
  • majority_decide
  • master
  • multi
  • rdma
  • recovery_only_when_told
  • synchronization
  • tapir_per_core_bench
14 results
Show changes

Commits on Source 46

125 files
+ 6729
1224
Compare changes
  • Side-by-side
  • Inline

Files

.gitignore

0 → 100644
+75 −0
Original line number Diff line number Diff line
.obj/

lockserver/client-main
lockserver/server-main
lockserver/lockserver-repl
store/benchmark/benchClient
store/benchmark/retwisClient
store/benchmark/terminalClient
store/strongstore/server
store/tapirstore/server
store/weakstore/server
timeserver/timeserver

lib/tests/configuration-test
lib/tests/simtransport-test
lockserver/tests/lockserver-test
replication/ir/tests/ir-test
replication/vr/tests/vr-test
store/common/backend/tests/kvstore-test
store/common/backend/tests/lockserver-test
store/common/backend/tests/versionstore-test

################################################################################
# vim
################################################################################
# Swap
[._]*.s[a-v][a-z]
[._]*.sw[a-p]
[._]s[a-v][a-z]
[._]sw[a-p]

# Session
Session.vim

# Temporary
.netrwhist
*~
# Auto-generated tag files
tags

################################################################################
# c++
################################################################################
# Prerequisites
*.d

# Compiled Object files
*.slo
*.lo
*.o
*.obj

# Precompiled Headers
*.gch
*.pch

# Compiled Dynamic libraries
*.so
*.dylib
*.dll

# Fortran module files
*.mod
*.smod

# Compiled Static libraries
*.lai
*.la
*.a
*.lib

# Executables
*.exe
*.out
*.app
+0 −3
Original line number Diff line number Diff line
[submodule "YCSB"]
	path = YCSB
	url = https://github.com/brianfrankcooper/YCSB

LICENSE

0 → 100644
+21 −0
Original line number Diff line number Diff line
The MIT License (MIT)

Copyright (c) 2019 Irene Zhang, Dan Ports, Naveen Kr. Sharma

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
+9 −2
Original line number Diff line number Diff line
@@ -7,8 +7,8 @@ CXX = g++
LD = g++
EXPAND = lib/tmpl/expand

#CFLAGS := -g -Wall -pthread -iquote.obj/gen -Wno-uninitialized -O2 -DNASSERT
CFLAGS := -g -Wall -pthread -iquote.obj/gen -Wno-uninitialized 
CFLAGS := -g -Wall -pthread -iquote.obj/gen -Wno-uninitialized -O2 -DNASSERT
#CFLAGS := -g -Wall -pthread -iquote.obj/gen -Wno-uninitialized 
CXXFLAGS := -g -std=c++0x
LDFLAGS := -levent_pthreads
## Debian package: check
@@ -131,6 +131,7 @@ include store/strongstore/Rules.mk
include store/weakstore/Rules.mk
include store/benchmark/Rules.mk
include lockserver/Rules.mk
include timeserver/Rules.mk
include libtapir/Rules.mk
##################################################################
# General rules
@@ -236,6 +237,12 @@ clean:
	$(call trace,RM,binaries,rm -f $(BINS) $(TEST_BINS))
	$(call trace,RM,objects,rm -rf .obj)

#
# Debugging
#
print-%:
	@echo '$*=$($*)'

##################################################################
# Targets
#
+7 −1
Original line number Diff line number Diff line
@@ -52,7 +52,13 @@ The repo is structured as follows:
## Compiling & Running
You can compile all of the TAPIR executables by running make in the root directory

TAPIR depends on protobufs and libevent, so you will need those development libraries installed on your machine. On Linux, this can be done through apt.
TAPIR depends on protobufs, libevent and openssl, so you will need the following development libraries:
- libprotobuf-dev
- libevent-openssl
- libevent-pthreads
- libevent-dev
- libssl-dev
- protobuf-compiler

## Contact and Questions
Please email Irene at iyzhang@cs.washington.edu, Dan at drkp@cs.washington.edu and Naveen at naveenks@cs.washington.edu
+7 −1
Original line number Diff line number Diff line
@@ -3,7 +3,8 @@ d := $(dir $(lastword $(MAKEFILE_LIST)))
SRCS += $(addprefix $(d), \
	lookup3.cc message.cc memory.cc \
	latency.cc configuration.cc transport.cc \
	udptransport.cc tcptransport.cc simtransport.cc)
	udptransport.cc tcptransport.cc simtransport.cc repltransport.cc \
	persistent_register.cc)

PROTOS += $(addprefix $(d), \
          latency-format.proto)
@@ -24,8 +25,13 @@ LIB-transport := $(o)transport.o $(LIB-message) $(LIB-configuration)

LIB-simtransport := $(o)simtransport.o $(LIB-transport)

LIB-repltransport := $(o)repltransport.o $(LIB-transport)

LIB-udptransport := $(o)udptransport.o $(LIB-transport)

LIB-tcptransport := $(o)tcptransport.o $(LIB-transport)

LIB-persistent_register := $(o)persistent_register.o $(LIB-message)

include $(d)tests/Rules.mk
Original line number Diff line number Diff line
@@ -33,10 +33,9 @@
#include "lib/configuration.h"
#include "lib/message.h"

#include <iostream>
#include <fstream>
#include <string>
#include <string.h>
#include <cstring>
#include <stdexcept>
#include <tuple>

namespace transport {

@@ -52,6 +51,12 @@ ReplicaAddress::operator==(const ReplicaAddress &other) const {
            (port == other.port));
}

bool
ReplicaAddress::operator<(const ReplicaAddress &other) const {
    auto this_t = std::forward_as_tuple(host, port);
    auto other_t = std::forward_as_tuple(other.host, other.port);
    return this_t < other_t;
}

Configuration::Configuration(const Configuration &c)
    : n(c.n), f(c.f), replicas(c.replicas), hasMulticast(c.hasMulticast)
@@ -94,52 +99,53 @@ Configuration::Configuration(std::ifstream &file)
        }

        // Get the command
        // This is pretty horrible, but C++ does promise that &line[0]
        // is going to be a mutable contiguous buffer...
        char *cmd = strtok(&line[0], " \t");
        unsigned int t1 = line.find_first_of(" \t");
        string cmd = line.substr(0, t1);

        if (strcasecmp(cmd, "f") == 0) {
            char *arg = strtok(NULL, " \t");
            if (!arg) {
        if (strcasecmp(cmd.c_str(), "f") == 0) {
            unsigned int t2 = line.find_first_not_of(" \t", t1);
            if (t2 == string::npos) {
                Panic ("'f' configuration line requires an argument");
            }
            char *strtolPtr;
            f = strtoul(arg, &strtolPtr, 0);
            if ((*arg == '\0') || (*strtolPtr != '\0')) {

            try {
                f = stoul(line.substr(t2, string::npos));
            } catch (std::invalid_argument& ia) {
                Panic("Invalid argument to 'f' configuration line");
            }
        } else if (strcasecmp(cmd, "replica") == 0) {
            char *arg = strtok(NULL, " \t");
            if (!arg) {
        } else if (strcasecmp(cmd.c_str(), "replica") == 0) {
            unsigned int t2 = line.find_first_not_of(" \t", t1);
            if (t2 == string::npos) {
                Panic ("'replica' configuration line requires an argument");
            }

            char *host = strtok(arg, ":");
            char *port = strtok(NULL, "");
            
            if (!host || !port) {
            unsigned int t3 = line.find_first_of(":", t2);
            if (t3 == string::npos) {
                Panic("Configuration line format: 'replica host:port'");
            }

            replicas.push_back(ReplicaAddress(string(host), string(port)));
        } else if (strcasecmp(cmd, "multicast") == 0) {
            char *arg = strtok(NULL, " \t");
            if (!arg) {
            string host = line.substr(t2, t3-t2);
            string port = line.substr(t3+1, string::npos);

            replicas.push_back(ReplicaAddress(host, port));
        } else if (strcasecmp(cmd.c_str(), "multicast") == 0) {
            unsigned int t2 = line.find_first_not_of(" \t", t1);
            if (t2 == string::npos) {
                Panic ("'multicast' configuration line requires an argument");
            }

            char *host = strtok(arg, ":");
            char *port = strtok(NULL, "");
            
            if (!host || !port) {
                Panic("Configuration line format: 'multicast host:port'");
            unsigned int t3 = line.find_first_of(":", t2);
            if (t3 == string::npos) {
                Panic("Configuration line format: 'replica host:port'");
            }

            multicastAddress = new ReplicaAddress(string(host),
                                                  string(port));
            string host = line.substr(t2, t3-t2);
            string port = line.substr(t3+1, string::npos);

            multicastAddress = new ReplicaAddress(host, port);
            hasMulticast = true;
        } else {
            Panic("Unknown configuration directive: %s", cmd);
            Panic("Unknown configuration directive: %s", cmd.c_str());
        }
    }

@@ -213,4 +219,23 @@ Configuration::operator==(const Configuration &other) const
    return true;
}

bool
Configuration::operator<(const Configuration &other) const {
    auto this_t = std::forward_as_tuple(n, f, replicas, hasMulticast);
    auto other_t = std::forward_as_tuple(other.n, other.f, other.replicas,
                                         other.hasMulticast);
    if (this_t < other_t) {
        return true;
    } else if (this_t == other_t) {
        if (hasMulticast) {
            return *multicastAddress < *other.multicastAddress;
        } else {
            return false;
        }
    } else {
        // this_t > other_t
        return false;
    }
}

} // namespace transport
Original line number Diff line number Diff line
@@ -52,6 +52,16 @@ struct ReplicaAddress
    inline bool operator!=(const ReplicaAddress &other) const {
        return !(*this == other);
    }
    bool operator<(const ReplicaAddress &other) const;
    bool operator<=(const ReplicaAddress &other) const {
        return *this < other || *this == other;
    }
    bool operator>(const ReplicaAddress &other) const {
        return !(*this <= other);
    }
    bool operator>=(const ReplicaAddress &other) const {
        return !(*this < other);
    }
};


@@ -72,6 +82,16 @@ public:
    inline bool operator!=(const Configuration &other) const {
        return !(*this == other);
    }
    bool operator<(const Configuration &other) const;
    bool operator<=(const Configuration &other) const {
        return *this < other || *this == other;
    }
    bool operator>(const Configuration &other) const {
        return !(*this <= other);
    }
    bool operator>=(const Configuration &other) const {
        return !(*this < other);
    }

public:
    int n;                      // number of replicas
@@ -82,7 +102,8 @@ private:
    bool hasMulticast;
};

}      // namespace replication
}      // namespace transport


namespace std {
template <> struct hash<transport::ReplicaAddress>
@@ -110,5 +131,4 @@ template <> struct hash<transport::Configuration>
};
}


#endif  /* _LIB_CONFIGURATION_H_ */
Original line number Diff line number Diff line
syntax = "proto2";

package transport.latency.format;

message LatencyDist
+10 −0
Original line number Diff line number Diff line
@@ -40,6 +40,8 @@
#include <unistd.h>
#include <sys/time.h>

#include <mutex>

#define BACKTRACE_ON_PANIC 1
#if BACKTRACE_ON_PANIC
#include <execinfo.h>
@@ -48,6 +50,8 @@
#define TIMESTAMP_BASE62 0
#define TIMESTAMP_NUMERIC 1

std::mutex message_mtx;

void __attribute__((weak))
Message_VA(enum Message_Type type,
           const char *fname, int line, const char *func,
@@ -74,6 +78,9 @@ _Message_VA(enum Message_Type type, FILE *fp,
            const char *fname, int line, const char *func,
            const char *fmt, va_list args)
{
    // Lock mutex to make sure the output is not mangled.
    message_mtx.lock();

    static int haveColor = -1;
    struct msg_desc {
        const char *prefix;
@@ -150,6 +157,9 @@ _Message_VA(enum Message_Type type, FILE *fp,
        fputs("\033[0m", fp);
    fprintf(fp, "\n");
    fflush(fp);

    // Unlock mutex.
    message_mtx.unlock();
}

void _Panic(void)
+126 −0
Original line number Diff line number Diff line
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
 *
 * persistent_register.cc: A disk-backed persistent register.
 *
 * Copyright 2013 Dan R. K. Ports  <drkp@cs.washington.edu>
 *
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use, copy,
 * modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 *
 **********************************************************************/
#include "lib/persistent_register.h"

#include <cstdio>
#include <cstring>
#include <unistd.h>

#include <fstream>
#include <memory>

#include "lib/message.h"

bool PersistentRegister::Initialized() const
{
    // Check to see if the file exists. If it doesn't, then we default to
    // returning an empty string. Refer to [1] for some ways to check if a file
    // exists in C++.
    //
    // [1]: https://stackoverflow.com/a/12774387/3187068
    std::ifstream f(filename_.c_str());
    return f.good();
}

std::string PersistentRegister::Read() const
{
    if (!Initialized()) {
        return "";
    }

    std::FILE *file = OpenFile(filename_, "rb");

    // Seek to the end of the file and get it's size.
    int success = std::fseek(file, 0, SEEK_END);
    if (success != 0) {
        Panic("Unable to fseek file %s", filename_.c_str());
    }
    long length = ftell(file);
    if (length == -1) {
        Panic("%s", std::strerror(errno));
    }

    // Seek back to the beginning of the file and read its contents. Now that
    // we know the size, we can allocate an appropriately sized buffer.
    success = std::fseek(file, 0, SEEK_SET);
    if (success != 0) {
        Panic("Unable to fseek file %s", filename_.c_str());
    }
    std::unique_ptr<char[]> buffer(new char[length]);
    std::size_t num_read = std::fread(buffer.get(), length, 1, file);
    if (num_read != 1) {
        Panic("Unable to read file %s", filename_.c_str());
    }

    CloseFile(file);
    return std::string(buffer.get(), length);
}

void PersistentRegister::Write(const std::string &s)
{
    // Perform the write.
    std::FILE *file = OpenFile(filename_, "wb");
    std::size_t num_written =
        std::fwrite(s.c_str(), sizeof(char), s.size(), file);
    if (num_written != s.size()) {
        Panic("Unable to write to file %s", filename_.c_str());
    }

    // Persist the write.
    int fd = fileno(file);
    if (fd == -1) {
        Panic("%s", std::strerror(errno));
    }
    int success = fsync(fd);
    if (success != 0) {
        Panic("%s", std::strerror(errno));
    }

    CloseFile(file);
}

std::string PersistentRegister::Filename() { return filename_; }

std::FILE *PersistentRegister::OpenFile(const std::string &filename,
                                        const std::string &mode)
{
    std::FILE *file = std::fopen(filename.c_str(), mode.c_str());
    if (file == nullptr) {
        Panic("%s", std::strerror(errno));
    }
    return file;
}

void PersistentRegister::CloseFile(std::FILE *file)
{
    int success = std::fclose(file);
    if (success != 0) {
        Panic("Unable to close file.");
    }
}
+91 −0
Original line number Diff line number Diff line
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
 *
 * persistent_register.h: A disk-backed persistent register.
 *
 * Copyright 2013 Dan R. K. Ports  <drkp@cs.washington.edu>
 *
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use, copy,
 * modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 *
 **********************************************************************/
#ifndef _LIB_PERSISTENT_REGISTER_H_
#define _LIB_PERSISTENT_REGISTER_H_

#include <cstdio>

#include <string>

// A PersistentRegister is used to read and write a string that is persisted to
// disk. It's like a database for a single string value. Here's how you might
// use it.
//
//     // Persist x to the file "x.bin".
//     PersistentRegister x("x.bin");
//
//     if (!x.Initialized()) {
//         // If x has not yet been written, write "Hello, World!".
//         x.Write("Hello, World!");
//     } else {
//         // If x has been written, read and print the value of x.
//         std::cout << x.Read() << std::endl;
//     }
//
// The first time this program is called, it will detect that x has not been
// written and will write "Hello, World!". The second time it's called, it will
// read and print "Hello, World!".
class PersistentRegister {
public:
    PersistentRegister(const std::string &filename) : filename_(filename) {}

    // Returns whether a PersistentRegister is initalized (i.e. the file into
    // which the register is persisted exists).
    bool Initialized() const;

    // Read a value from the register, or return an empty string if the
    // register is not initalized. Read panics on error.
    std::string Read() const;

    // Write a value to the register. Write panics on error.
    void Write(const std::string &s);

    // Return the filename in which the register is persisted.
    std::string Filename();

private:
    // Note that using C++ file IO, there is not really a way to ensure that
    // data has been forced to disk [1]. Thus, our implementation of
    // PersistentRegister uses C file IO so that it can use primitives like
    // fsync.
    //
    // [1]: https://stackoverflow.com/q/676787/3187068

    // `OpenFile(f, m)` calls `std::fopen(f, m)` but calls `Panic` on error.
    static std::FILE *OpenFile(const std::string &filename,
                               const std::string &mode);

    // `CloseFile(f, m)` calls `std::fclose(f)` but calls `Panic` on error.
    static void CloseFile(std::FILE *file);

    // The filename of the file that contains the persisted data.
    const std::string filename_;
};

#endif  // _LIB_PERSISTENT_REGISTER_H_

lib/repltransport.cc

0 → 100644
+272 −0
Original line number Diff line number Diff line
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
 *
 * repltransport.cc: REPL-driven step-by-step simulated transport.
 *
 * Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
 *                     Naveen Kr. Sharma <naveenks@cs.washington.edu>
 *                     Dan R. K. Ports  <drkp@cs.washington.edu>
 *
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use, copy,
 * modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 *
 **********************************************************************/
#include "lib/repltransport.h"

#include <iostream>
#include <iterator>
#include <sstream>
#include <string>
#include <tuple>
#include <utility>

namespace {

// https://stackoverflow.com/a/236803/3187068
template <typename Out>
void split(const std::string &s, char delim, Out result) {
    std::stringstream ss(s);
    std::string item;
    while (std::getline(ss, item, delim)) {
        *(result++) = item;
    }
}

// https://stackoverflow.com/a/236803/3187068
std::vector<std::string> split(const std::string &s, char delim) {
    std::vector<std::string> elems;
    split(s, delim, std::back_inserter(elems));
    return elems;
}

// https://stackoverflow.com/a/4654718/3187068
bool is_number(const std::string &s) {
    std::string::const_iterator it = s.begin();
    while (it != s.end() && std::isdigit(*it)) ++it;
    return !s.empty() && it == s.end();
}

// https://stackoverflow.com/a/1494435/3187068
void string_replace(std::string *str, const std::string &oldStr,
                    const std::string &newStr) {
    std::string::size_type pos = 0u;
    while ((pos = str->find(oldStr, pos)) != std::string::npos) {
        str->replace(pos, oldStr.length(), newStr);
        pos += newStr.length();
    }
}

}  // namespace

void ReplTransport::Register(TransportReceiver *receiver,
                             const transport::Configuration &config,
                             int replicaIdx) {
    // If replicaIdx is -1, then the registering receiver is a client.
    // Otherwise, replicaIdx is in the range [0, config.n), and the registering
    // receiver is a replica.
    bool is_client = replicaIdx == -1;

    if (is_client) {
        // Create the client's address.
        std::string port = std::to_string(client_id_);
        auto repl_addr = new ReplTransportAddress("client", std::move(port));
        receiver->SetAddress(repl_addr);
        client_id_++;

        // Register receiver.
        receivers_[*repl_addr].receiver = receiver;
    } else {
        // Set the receiver's address.
        transport::ReplicaAddress addr = config.replica(replicaIdx);
        auto repl_addr = new ReplTransportAddress(addr.host, addr.port);
        receiver->SetAddress(repl_addr);

        // Register receiver.
        receivers_[*repl_addr].receiver = receiver;
    }

    // Register with superclass.
    RegisterConfiguration(receiver, config, replicaIdx);
}

int ReplTransport::Timer(uint64_t ms, timer_callback_t cb) {
    timer_id_++;
    ASSERT(timers_.count(timer_id_) == 0);
    timers_[timer_id_] = cb;
    return timer_id_;
}

bool ReplTransport::CancelTimer(int id) {
    if (timers_.count(id) == 0) {
        return false;
    } else {
        timers_.erase(id);
        return true;
    }
}

void ReplTransport::CancelAllTimers() {
    timers_.clear();
}

bool ReplTransport::DeliverMessage(const ReplTransportAddress &addr,
                                   int index) {
    history_.push_back("transport.DeliverMessage({\"" + addr.Host() + "\", \"" +
                       addr.Port() + "\"}, " + std::to_string(index) + ");");
    ASSERT(receivers_.count(addr) != 0);
    TransportReceiverState &state = receivers_[addr];

    // If the recipient of this address hasn't yet been registered, then
    // state.receiver is null.
    if (state.receiver == nullptr) {
        return false;
    }

    // Deliver the message.
    const QueuedMessage &m = state.msgs.at(index);
    string data;
    m.msg->SerializeToString(&data);
    state.receiver->ReceiveMessage(m.src, m.msg->GetTypeName(), data);
    return true;
}

void ReplTransport::TriggerTimer(int timer_id) {
    history_.push_back("transport.TriggerTimer(" + std::to_string(timer_id) +
                       ");");
    ASSERT(timers_.count(timer_id) != 0);
    timers_[timer_id]();
}

void ReplTransport::PrintState() const {
    // Show the history.
    std::cout << "- History" << std::endl;
    for (const std::string &command : history_) {
        std::cout << "    " << command << std::endl;
    }

    // Show the timers.
    std::cout << "- Timers" << std::endl;
    for (const std::pair<const int, timer_callback_t> &p : timers_) {
        std::cout << "  - [" << p.first << "]" << std::endl;
    }

    // Show the message buffers.
    for (const std::pair<const ReplTransportAddress, TransportReceiverState>
             &p : receivers_) {
        const ReplTransportAddress &addr = p.first;
        const TransportReceiverState &state = p.second;

        std::cout << "- " << addr;
        if (state.receiver == nullptr) {
            std::cout << " [not registered]";
        }
        std::cout << std::endl;

        for (std::size_t i = 0; i < state.msgs.size(); ++i) {
            const Message *msg = state.msgs[i].msg.get();
            std::string debug = msg->DebugString();
            string_replace(&debug, "\n", "\n        ");
            std::cout << "  - [" << i << "] " << msg->GetTypeName() << std::endl
                      << "        " << debug << std::endl;
        }
    }
}

bool ReplTransport::RunOne() {
    // Parse response.
    while (true) {
        // Prompt user and read response.
        std::cout << "> " << std::flush;
        std::string line;
        std::getline(std::cin, line);
        if (std::cin.fail() || std::cin.eof()) {
            return true;
        }
        std::vector<std::string> words = split(line, ' ');

        const std::string usage =
            "Usage: quit | show | <timer_id> | <host> <port> <index>";
        if (words.size() == 1) {
            if (words[0] == "quit") {
                return true;
            }
            if (words[0] == "show") {
                PrintState();
                return false;
            }

            if (is_number(words[0])) {
                int timer_id = std::stoi(words[0]);
                TriggerTimer(timer_id);
                return false;
            } else {
                std::cout << usage << std::endl;
            }
        } else if (words.size() == 3) {
            if (!is_number(words[2])) {
                std::cout << usage << std::endl;
            } else {
                ReplTransportAddress addr(words[0], words[1]);
                int index = std::stoi(words[2]);
                if (receivers_.count(addr) == 0) {
                    std::cout << "Receiver not found." << std::endl;
                } else {
                    DeliverMessage(addr, index);
                    return false;
                }
            }
        } else {
            std::cout << usage << std::endl;
        }
    }
}

void ReplTransport::Run() {
    bool done = false;
    while (!done) {
        done = RunOne();
    }
}

bool ReplTransport::SendMessageInternal(TransportReceiver *src,
                                        const ReplTransportAddress &dst,
                                        const Message &m,
                                        bool multicast) {
    // Multicast is not supported.
    ASSERT(!multicast);

    const ReplTransportAddress &repl_addr =
        dynamic_cast<const ReplTransportAddress &>(src->GetAddress());
    std::unique_ptr<Message> msg(m.New());
    msg->CheckTypeAndMergeFrom(m);
    receivers_[dst].msgs.push_back(QueuedMessage(repl_addr, std::move(msg)));
    return true;
}

ReplTransportAddress ReplTransport::LookupAddress(
    const transport::Configuration &cfg, int replicaIdx) {
    transport::ReplicaAddress addr = cfg.replica(replicaIdx);
    return ReplTransportAddress(addr.host, addr.port);
}

const ReplTransportAddress *ReplTransport::LookupMulticastAddress(
    const transport::Configuration *cfg) {
    return nullptr;
}

lib/repltransport.h

0 → 100644
+278 −0
Original line number Diff line number Diff line
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
 *
 * repltransport.h: REPL-driven step-by-step simulated transport.
 *
 * Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
 *                     Naveen Kr. Sharma <naveenks@cs.washington.edu>
 *                     Dan R. K. Ports  <drkp@cs.washington.edu>
 *
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use, copy,
 * modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 *
 **********************************************************************/

// Distributed algorithms have to handle arbitrary message delays, message
// loss, message reordering, node failure, network partitions, etc. However,
// these failure scenarios are rare, which can make it difficult to flesh out
// all the corner cases of a distributed algorithm.
//
// Take IR for example. If we want to trigger the IR-MERGE-RECORDS function to
// run with a non-empty d and a non-empty u, we have to
//   1. propose two separate messages,
//   2. deliver one to a supermajority,
//   3. deliver the other to a simple majority,
//   4. prevent both messages from being finalized, and
//   5. trigger a view change.
//
// ReplTransport is a simulated transport, like SimulatedTransport, that lets
// users manipulate every aspect of the execution of a distributed system. When
// run, a ReplTransport opens up a REPL with which users can use to trigger an
// arbitrary timeout or deliver an arbitrary message to a receiver.
//
// For example, imagine a simple distributed system with two nodes: ping
// (localhost:8000) and pong (localhost:9000). Initially, ping sends a message
// to pong, pong replies with a message, ping replies with another message, and
// so on. If a node hasn't heard from the other after some timeout, it resends
// its message. An interaction with a ReplTransport would look something like
// this (comments inline):
//
//     $ ./ping_pong
//     > show                  # show the state
//     - History               # A history of all commands (empty at first)
//     - Timers                # A list of all timer ids
//       - [1]                 # ping's timeout
//       - [2]                 # pong's timeout
//     - localhost:8000        # ping (no pending messages)
//     - localhost:9000        # pong
//       - [0] PingMessage     # pong's pending message from ping
//
//     > localhost 9000 0      # Deliver the 0th message to pong
//     > show
//     - History               # A history of all executed commands
//         transport.DeliverMessage({"localhost", "9000"}, 0);
//     - Timers
//       - [1]
//       - [2]
//     - localhost:8000
//       - [0] PongMessage     # pings's pending message from pong
//     - localhost:9000
//       - [0] PingMessage     # Notice that this message wasn't removed. We
//                             # can deliver the same message multiple times.
//
//     > localhost 8000 0      # Deliver the 0th message to ping
//     > show
//     - History
//         transport.DeliverMessage({"localhost", "9000"}, 0);
//         transport.DeliverMessage({"localhost", "8000"}, 0);
//     - Timers
//       - [1]
//       - [2]
//     - localhost:8000
//       - [0] PongMessage
//     - localhost:9000
//       - [0] PingMessage
//       - [1] PingMessage
//
//     > 1                     # Trigger ping's timeout
//     > show
//     - History
//         transport.DeliverMessage({"localhost", "9000"}, 0);
//         transport.DeliverMessage({"localhost", "8000"}, 0);
//         transport.TriggerTimer(1);
//     - Timers
//       - [1]
//       - [2]
//     - localhost:8000
//       - [0] PongMessage
//     - localhost:9000
//       - [0] PingMessage
//       - [1] PingMessage
//       - [2] PingMessage     # ping resent its message to pong
//
//     > quit
//
// Also notice that the ReplTransport prints out a history of the executed
// commands. You can copy and paste these commands into your code to replay
// your interaction with the REPL.

#ifndef _LIB_REPLTRANSPORT_H_
#define _LIB_REPLTRANSPORT_H_

#include <functional>
#include <map>
#include <memory>
#include <ostream>
#include <string>
#include <tuple>

#include "lib/configuration.h"
#include "lib/transport.h"
#include "lib/transportcommon.h"

class ReplTransportAddress : public TransportAddress {
public:
    // Constructors.
    ReplTransportAddress() {}

    ReplTransportAddress(std::string host, std::string port)
        : host_(std::move(host)), port_(std::move(port)) {}

    ReplTransportAddress(const ReplTransportAddress &other)
        : ReplTransportAddress(other.host_, other.port_) {}

    ReplTransportAddress(ReplTransportAddress &&other)
        : ReplTransportAddress() {
        swap(*this, other);
    }

    ReplTransportAddress &operator=(ReplTransportAddress other) {
        swap(*this, other);
        return *this;
    }

    friend void swap(ReplTransportAddress &x, ReplTransportAddress &y) {
        std::swap(x.host_, y.host_);
        std::swap(x.port_, y.port_);
    }

    // Comparators.
    bool operator==(const ReplTransportAddress &other) const {
        return Key() == other.Key();
    }
    bool operator!=(const ReplTransportAddress &other) const {
        return Key() != other.Key();
    }
    bool operator<(const ReplTransportAddress &other) const {
        return Key() < other.Key();
    }
    bool operator<=(const ReplTransportAddress &other) const {
        return Key() <= other.Key();
    }
    bool operator>(const ReplTransportAddress &other) const {
        return Key() > other.Key();
    }
    bool operator>=(const ReplTransportAddress &other) const {
        return Key() >= other.Key();
    }

    // Getters.
    const std::string& Host() const {
        return host_;
    }

    const std::string& Port() const {
        return port_;
    }

    ReplTransportAddress *clone() const override {
        return new ReplTransportAddress(host_, port_);
    }

    friend std::ostream &operator<<(std::ostream &out,
                                    const ReplTransportAddress &addr) {
        out << addr.host_ << ":" << addr.port_;
        return out;
    }

private:
    std::tuple<const std::string&, const std::string&> Key() const {
        return std::forward_as_tuple(host_, port_);
    }

    std::string host_;
    std::string port_;
};

class ReplTransport : public TransportCommon<ReplTransportAddress> {
public:
    void Register(TransportReceiver *receiver,
                  const transport::Configuration &config,
                  int replicaIdx) override;
    int Timer(uint64_t ms, timer_callback_t cb) override;
    bool CancelTimer(int id) override;
    void CancelAllTimers() override;

    // DeliverMessage(addr, i) delivers the ith queued inbound message to the
    // receiver with address addr. It's possible to send a message to the
    // address of a receiver that hasn't yet registered. In this case,
    // DeliverMessage returns false. Otherwise, it returns true.
    bool DeliverMessage(const ReplTransportAddress& addr, int index);

    // Run timer with id timer_id.
    void TriggerTimer(int timer_id);

    // Launch the REPL.
    void Run();

protected:
    bool SendMessageInternal(TransportReceiver *src,
                             const ReplTransportAddress &dst, const Message &m,
                             bool multicast = false) override;
    ReplTransportAddress LookupAddress(const transport::Configuration &cfg,
                                       int replicaIdx) override;
    const ReplTransportAddress *LookupMulticastAddress(
        const transport::Configuration *cfg) override;

private:
    // Prompt the user for input and either (1) trigger a timer, (2) deliver a
    // message, or (3) quit. RunOne returns true if the user decides to quit.
    bool RunOne();

    // Pretty print the current state of the system. For example, PrintState
    // prints the queued messages for every node in the system.
    void PrintState() const;

    struct QueuedMessage {
        ReplTransportAddress src;
        std::unique_ptr<Message> msg;

        QueuedMessage(ReplTransportAddress src, std::unique_ptr<Message> msg)
            : src(std::move(src)), msg(std::move(msg)) {}
    };

    struct TransportReceiverState {
        // receiver can be null if it has queued messages but hasn't yet been
        // registered with a ReplTransport.
        TransportReceiver *receiver;

        // Queued inbound messages.
        std::vector<QueuedMessage> msgs;
    };

    // receivers_ maps a receiver r's address to r and r's queued messages.
    std::map<ReplTransportAddress, TransportReceiverState> receivers_;

    // timer_id_ is an incrementing counter used to assign timer ids.
    int timer_id_ = 0;

    // timers_ maps timer ids to timers.
    std::map<int, timer_callback_t> timers_;

    // client_id_ is an incrementing counter used to assign addresses to
    // clients. The first client gets address client:0, the next client gets
    // address client:1, etc.
    int client_id_ = 0;

    // A history of all the command issued to this ReplTransport.
    std::vector<std::string> history_;
};

#endif // _LIB_REPLTRANSPORT_H_
Original line number Diff line number Diff line
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
 *
 * udptransport.cc:
 * tcptransport.cc:
 *   message-passing network interface that uses TCP message delivery
 *   and libasync
 *
@@ -35,11 +35,9 @@
#include "lib/tcptransport.h"

#include <google/protobuf/message.h>
#include <event2/event.h>
#include <event2/buffer.h>
#include <event2/thread.h>
#include <event2/bufferevent.h>

#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <fcntl.h>
@@ -87,6 +85,7 @@ TCPTransport::LookupAddress(const transport::ReplicaAddress &addr)
{
        int res;
        struct addrinfo hints;
        memset(&hints, 0, sizeof(hints));
        hints.ai_family   = AF_INET;
        hints.ai_socktype = SOCK_STREAM;
        hints.ai_protocol = 0;
@@ -151,27 +150,20 @@ BindToPort(int fd, const string &host, const string &port)
}

TCPTransport::TCPTransport(double dropRate, double reorderRate,
			   int dscp, event_base *evbase)
			   int dscp, bool handleSignals)
{
    lastTimerId = 0;
    
    // Set up libevent
    evthread_use_pthreads();
    event_set_log_callback(LogCallback);
    event_set_fatal_callback(FatalCallback);
    libeventBase = event_base_new();

    // XXX Hack for Naveen: allow the user to specify an existing
    // libevent base. This will probably not work exactly correctly
    // for error messages or signals, but that doesn't much matter...
    if (evbase) {
        libeventBase = evbase;
    } else {
        evthread_use_pthreads();
    libeventBase = event_base_new();
    evthread_make_base_notifiable(libeventBase);
    }

    // Set up signal handler
    if (handleSignals) {
        signalEvents.push_back(evsignal_new(libeventBase, SIGTERM,
                                            SignalCallback, this));
        signalEvents.push_back(evsignal_new(libeventBase, SIGINT,
@@ -180,6 +172,7 @@ TCPTransport::TCPTransport(double dropRate, double reorderRate,
            event_add(x, NULL);
        }
    }
}

TCPTransport::~TCPTransport()
{
@@ -204,6 +197,13 @@ TCPTransport::ConnectTCP(TransportReceiver *src, const TCPTransportAddress &dst)
        PWarning("Failed to set O_NONBLOCK on outgoing TCP socket");
    }

    // Set TCP_NODELAY
    int n = 1;
    if (setsockopt(fd, IPPROTO_TCP,
                   TCP_NODELAY, (char *)&n, sizeof(n)) < 0) {
        PWarning("Failed to set TCP_NODELAY on TCP listening socket");
    }

    TCPTransportTCPListener *info = new TCPTransportTCPListener();
    info->transport = this;
    info->acceptFd = 0;
@@ -230,6 +230,15 @@ TCPTransport::ConnectTCP(TransportReceiver *src, const TCPTransportAddress &dst)
        Panic("Failed to enable bufferevent");
    }

    // Tell the receiver its address
    struct sockaddr_in sin;
    socklen_t sinsize = sizeof(sin);
    if (getsockname(fd, (sockaddr *) &sin, &sinsize) < 0) {
        PPanic("Failed to get socket name");
    }
    TCPTransportAddress *addr = new TCPTransportAddress(sin);
    src->SetAddress(addr);

    tcpOutgoing[dst] = bev;
    tcpAddresses.insert(pair<struct bufferevent*, TCPTransportAddress>(bev,dst));

@@ -271,6 +280,13 @@ TCPTransport::Register(TransportReceiver *receiver,
        PWarning("Failed to set SO_REUSEADDR on TCP listening socket");
    }

    // Set TCP_NODELAY
    n = 1;
    if (setsockopt(fd, IPPROTO_TCP,
                   TCP_NODELAY, (char *)&n, sizeof(n)) < 0) {
        PWarning("Failed to set TCP_NODELAY on TCP listening socket");
    }

    // Registering a replica. Bind socket to the designated
    // host/port
    const string &host = config.replica(replicaIdx).host;
@@ -526,6 +542,13 @@ TCPTransport::TCPAcceptCallback(evutil_socket_t fd, short what, void *arg)
            PWarning("Failed to set O_NONBLOCK");
        }

            // Set TCP_NODELAY
        int n = 1;
        if (setsockopt(newfd, IPPROTO_TCP,
                       TCP_NODELAY, (char *)&n, sizeof(n)) < 0) {
            PWarning("Failed to set TCP_NODELAY on TCP listening socket");
        }

        // Create a buffered event
        bev = bufferevent_socket_new(transport->libeventBase, newfd,
                                     BEV_OPT_CLOSE_ON_FREE);
@@ -553,8 +576,13 @@ TCPTransport::TCPReadableCallback(struct bufferevent *bev, void *arg)

    Debug("Readable on bufferevent %p", bev);
    

    while (evbuffer_get_length(evbuf) > 0) {
        uint32_t *magic;
        magic = (uint32_t *)evbuffer_pullup(evbuf, sizeof(*magic));
        if (magic == NULL) {
            return;
        }
        ASSERT(*magic == MAGIC);
    
        size_t *sz;
@@ -604,6 +632,7 @@ TCPTransport::TCPReadableCallback(struct bufferevent *bev, void *arg)
        info->receiver->ReceiveMessage(addr->second, msgType, msg);
        Debug("Done processing large %s message", msgType.c_str());
    }
}

void
TCPTransport::TCPIncomingEventCallback(struct bufferevent *bev,
Original line number Diff line number Diff line
/ -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
 *
 * udptransport.h:
 * tcptransport.h:
 *   message-passing network interface that uses UDP message delivery
 *   and libasync
 *
@@ -68,7 +68,7 @@ class TCPTransport : public TransportCommon<TCPTransportAddress>
{
public:
    TCPTransport(double dropRate = 0.0, double reogrderRate = 0.0,
                 int dscp = 0, event_base *evbase = nullptr);
                    int dscp = 0, bool handleSignals = true);
    virtual ~TCPTransport();
    void Register(TransportReceiver *receiver,
                  const transport::Configuration &config,
Original line number Diff line number Diff line
@@ -38,6 +38,7 @@
#include <event2/event.h>
#include <event2/thread.h>

#include <memory>
#include <random>

#include <arpa/inet.h>
@@ -178,9 +179,8 @@ BindToPort(int fd, const string &host, const string &port)
}

UDPTransport::UDPTransport(double dropRate, double reorderRate,
                           int dscp, event_base *evbase)
    : dropRate(dropRate), reorderRate(reorderRate),
      dscp(dscp)
        int dscp, bool handleSignals)
    : dropRate(dropRate), reorderRate(reorderRate), dscp(dscp)
{

    lastTimerId = 0;
@@ -197,20 +197,15 @@ UDPTransport::UDPTransport(double dropRate, double reorderRate,
    }
    
    // Set up libevent
    evthread_use_pthreads();
    event_set_log_callback(LogCallback);
    event_set_fatal_callback(FatalCallback);
    // XXX Hack for Naveen: allow the user to specify an existing
    // libevent base. This will probably not work exactly correctly
    // for error messages or signals, but that doesn't much matter...
    if (evbase) {
        libeventBase = evbase;
    } else {
        evthread_use_pthreads();

    libeventBase = event_base_new();
    evthread_make_base_notifiable(libeventBase);
    }

    // Set up signal handler
    if (handleSignals) {
        signalEvents.push_back(evsignal_new(libeventBase, SIGTERM,
                    SignalCallback, this));
        signalEvents.push_back(evsignal_new(libeventBase, SIGINT,
@@ -220,6 +215,7 @@ UDPTransport::UDPTransport(double dropRate, double reorderRate,
            event_add(x, NULL);
        }
    }
}

UDPTransport::~UDPTransport()
{
@@ -385,7 +381,8 @@ UDPTransport::Register(TransportReceiver *receiver,
}

static size_t
SerializeMessage(const ::google::protobuf::Message &m, char **out)
SerializeMessage(const ::google::protobuf::Message &m,
                 std::unique_ptr<char[]> *out)
{
    string data = m.SerializeAsString();
    string type = m.GetTypeName();
@@ -394,7 +391,8 @@ SerializeMessage(const ::google::protobuf::Message &m, char **out)
    ssize_t totalLen = (typeLen + sizeof(typeLen) +
                       dataLen + sizeof(dataLen));

    char *buf = new char[totalLen];
    std::unique_ptr<char[]> unique_buf(new char[totalLen]);
    char *buf = unique_buf.get();

    char *ptr = buf;
    *((size_t *) ptr) = typeLen;
@@ -410,7 +408,7 @@ SerializeMessage(const ::google::protobuf::Message &m, char **out)
    memcpy(ptr, data.c_str(), dataLen);
    ptr += dataLen;

    *out = buf;
    *out = std::move(unique_buf);
    return totalLen;
}

@@ -423,8 +421,9 @@ UDPTransport::SendMessageInternal(TransportReceiver *src,
    sockaddr_in sin = dynamic_cast<const UDPTransportAddress &>(dst).addr;

    // Serialize message
    char *buf;
    size_t msgLen = SerializeMessage(m, &buf);
    std::unique_ptr<char[]> unique_buf;
    size_t msgLen = SerializeMessage(m, &unique_buf);
    char *buf = unique_buf.get();

    int fd = fds[src];

@@ -435,7 +434,7 @@ UDPTransport::SendMessageInternal(TransportReceiver *src,
        if (sendto(fd, buf, msgLen, 0,
                   (sockaddr *)&sin, sizeof(sin)) < 0) {
            PWarning("Failed to send message");
            goto fail;
            return false;
        }
    } else {
        int numFrags = ((msgLen-1) / MAX_UDP_MESSAGE_SIZE) + 1;
@@ -463,17 +462,12 @@ UDPTransport::SendMessageInternal(TransportReceiver *src,
                       (sockaddr *)&sin, sizeof(sin)) < 0) {
                PWarning("Failed to send message fragment %ld",
                         fragStart);
                goto fail;
                return false;
            }
        }
    }

    delete [] buf;
    return true;

fail:
    delete [] buf;
    return false;
}

void
Original line number Diff line number Diff line
@@ -66,7 +66,7 @@ class UDPTransport : public TransportCommon<UDPTransportAddress>
{
public:
    UDPTransport(double dropRate = 0.0, double reorderRate = 0.0,
                 int dscp = 0, event_base *evbase = nullptr);
                    int dscp = 0, bool handleSignals = true);
    virtual ~UDPTransport();
    void Register(TransportReceiver *receiver,
                  const transport::Configuration &config,
Original line number Diff line number Diff line
d := $(dir $(lastword $(MAKEFILE_LIST)))

SRCS += $(addprefix $(d), server.cc client.cc)
SRCS += $(addprefix $(d), \
			server.cc client.cc server-main.cc client-main.cc \
			lockserver-repl.cc)

PROTOS += $(addprefix $(d), locks-proto.proto)

$(d)server: $(LIB-udptransport) $(OBJS-ir-replica) $(o)locks-proto.o $(o)server.o
$(d)server-main: $(o)server-main.o \
	$(o)locks-proto.o \
	$(o)server.o \
	$(LIB-udptransport) \
	$(OBJS-ir-replica)

$(d)client: $(LIB-udptransport) $(OBJS-ir-client) $(LIB-store-common) \
						$(o)locks-proto.o $(o)client.o
$(d)client-main: $(o)client-main.o \
	$(o)locks-proto.o \
	$(o)client.o \
	$(LIB-udptransport) \
   	$(OBJS-ir-client) \
   	$(LIB-store-common)

BINS += $(d)server $(d)client
$(d)lockserver-repl: $(o)lockserver-repl.o \
	$(o)locks-proto.o \
	$(o)server.o \
	$(o)client.o \
	$(OBJS-ir-replica) \
	$(OBJS-ir-client) \
	$(LIB-configuration) \
	$(LIB-repltransport) \
   	$(LIB-store-common) \
	$(GTEST_MAIN)

BINS += $(d)server-main $(d)client-main $(d)lockserver-repl

include $(d)tests/Rules.mk
+103 −0
Original line number Diff line number Diff line
#include <thread>

#include "lockserver/client.h"
#include "lib/udptransport.h"

namespace {

void
usage()
{
    printf("Unknown command.. Try again!\n");
    printf("Usage: exit | q | lock <key> | unlock <key>\n");
}

} // namespace

int
main(int argc, char **argv)
{
    const char *configPath = NULL;

    // Parse arguments
    int opt;
    while ((opt = getopt(argc, argv, "c:")) != -1) {
        switch (opt) {
        case 'c':
            configPath = optarg;
            break;

        default:
            fprintf(stderr, "Unknown argument %s\n", argv[optind]);
        }
    }

    if (!configPath) {
        fprintf(stderr, "option -c is required\n");
        return EXIT_FAILURE;
    }

    // Load configuration
    std::ifstream configStream(configPath);
    if (configStream.fail()) {
        Panic("Unable to read configuration file: %s\n", configPath);
    }
    transport::Configuration config(configStream);

    // Create lock client.
    UDPTransport transport(0.0, 0.0, 0);
    lockserver::LockClient locker(&transport, config);
    std::thread run_transport([&transport]() { transport.Run(); });

    char c, cmd[2048], *tok;
    int clen, status;
    string key, value;

    while (1) {
        printf(">> ");
        fflush(stdout);

        clen = 0;
        while ((c = getchar()) != '\n')
            cmd[clen++] = c;
        cmd[clen] = '\0';

        tok = strtok(cmd, " ,.-");
        if (tok == NULL) continue;

        if (strcasecmp(tok, "exit") == 0 || strcasecmp(tok, "q") == 0) {
            printf("Exiting..\n");
            break;
        } else if (strcasecmp(tok, "lock") == 0) {
            tok = strtok(NULL, " ,.-");
            if (tok == NULL) {
                usage();
                continue;
            }
            key = string(tok);
            status = locker.lock(key);

            if (status) {
                printf("Lock Successful\n");
            } else {
                printf("Failed to acquire lock..\n");
            }
        } else if (strcasecmp(tok, "unlock") == 0) {
            tok = strtok(NULL, " ,.-");
            if (tok == NULL) {
                usage();
                continue;
            }
            key = string(tok);
            locker.unlock(key);
            printf("Unlock Successful\n");
        } else {
            usage();
        }
        fflush(stdout);
    }

    transport.Stop();
    run_transport.join();
    return EXIT_SUCCESS;
}
Original line number Diff line number Diff line
@@ -30,88 +30,14 @@

#include "lockserver/client.h"

int
main(int argc, char **argv)
{
    const char *configPath = NULL;

    // Parse arguments
    int opt;
    while ((opt = getopt(argc, argv, "c:")) != -1) {
        switch (opt) {
        case 'c':
            configPath = optarg;
            break;

        default:
            fprintf(stderr, "Unknown argument %s\n", argv[optind]);
        }
    }

    if (!configPath) {
        fprintf(stderr, "option -c is required\n");
        return EXIT_FAILURE;
    }

    lockserver::LockClient locker(configPath);

    char c, cmd[2048], *tok;
    int clen, status;
    string key, value;

    while (1) {
        printf(">> ");
        fflush(stdout);

        clen = 0;
        while ((c = getchar()) != '\n')
            cmd[clen++] = c;
        cmd[clen] = '\0';

        if (clen == 0) continue;
        tok = strtok(cmd, " ,.-");

        if (strcasecmp(tok, "exit") == 0 || strcasecmp(tok, "q") == 0) {
            printf("Exiting..\n");
            break;
        } else if (strcasecmp(tok, "lock") == 0) {
            tok = strtok(NULL, " ,.-");
            key = string(tok);
            status = locker.lock(key);

            if (status) {
                printf("Lock Successful\n");
            } else {
                printf("Failed to acquire lock..\n");
            }
        } else if (strcasecmp(tok, "unlock") == 0) {
            tok = strtok(NULL, " ,.-");
            key = string(tok);
            locker.unlock(key);
            printf("Unlock Successful\n");
        } else {
            printf("Unknown command.. Try again!\n");
        }
        fflush(stdout);
    }

    return EXIT_SUCCESS;
}

namespace lockserver {

using namespace std;
using namespace proto;

LockClient::LockClient(const string &configPath) : transport(0.0, 0.0, 0)
{
    // Load configuration
    std::ifstream configStream(configPath);
    if (configStream.fail()) {
        Panic("Unable to read configuration file: %s\n", configPath.c_str());
    }
    transport::Configuration config(configStream);

LockClient::LockClient(Transport *transport,
                       const transport::Configuration &config)
    : transport(transport) {
    client_id = 0;
    while (client_id == 0) {
        random_device rd;
@@ -120,23 +46,14 @@ LockClient::LockClient(const string &configPath) : transport(0.0, 0.0, 0)
        client_id = dis(gen);
    }

    client = new replication::ir::IRClient(config, &transport, client_id);

    /* Run the transport in a new thread. */
    clientTransport = new thread(&LockClient::run_client, this);
    client = new replication::ir::IRClient(config, transport, client_id);
}

LockClient::~LockClient() { }

void
LockClient::run_client()
{
    transport.Run();
}

bool
LockClient::lock(const string &key)
{
LockClient::lock_async(const std::string &key) {
    ASSERT(waiting == nullptr);
    Debug("Sending LOCK");

    string request_str;
@@ -147,19 +64,29 @@ LockClient::lock(const string &key)
    request.SerializeToString(&request_str);

    waiting = new Promise(1000);
    transport.Timer(0, [=]() {
    transport->Timer(0, [=]() {
            client->InvokeConsensus(request_str,
                bind(&LockClient::Decide,
                    this,
                    placeholders::_1),
                bind(&LockClient::LockCallback,
                    this,
                    placeholders::_1,
                    placeholders::_2),
                bind(&LockClient::ErrorCallback,
                    this,
                    placeholders::_1,
                    placeholders::_2));
            });
}

bool
LockClient::lock_wait() {
    ASSERT(waiting != nullptr);

    int status = waiting->GetReply();
    delete waiting;
    waiting = nullptr;

    if (status == 0) {
        return true;
@@ -170,8 +97,8 @@ LockClient::lock(const string &key)
}

void
LockClient::unlock(const string &key)
{
LockClient::unlock_async(const std::string &key) {
    ASSERT(waiting == nullptr);
    Debug("Sending UNLOCK");

    string request_str;
@@ -182,31 +109,53 @@ LockClient::unlock(const string &key)
    request.SerializeToString(&request_str);

    waiting = new Promise(1000);
    transport.Timer(0, [=]() {
    transport->Timer(0, [=]() {
            client->InvokeInconsistent(request_str,
                bind(&LockClient::UnlockCallback,
                    this,
                    placeholders::_1,
                    placeholders::_2));
            });
}

void
LockClient::unlock_wait() {
    waiting->GetReply();
    delete waiting;
    waiting = nullptr;
}

bool
LockClient::lock(const string &key)
{
    lock_async(key);
    return lock_wait();
}

void
LockClient::unlock(const string &key)
{
    unlock_async(key);
    return unlock_wait();
}

string
LockClient::Decide(const set<string> &results)
LockClient::Decide(const map<string, std::size_t> &results)
{
    // If a majority say lock, we say lock.
    int success_count = 0;
    string key;
    for (string s : results) {
    for (const auto& string_and_count : results) {
        const string& s = string_and_count.first;
        const std::size_t count = string_and_count.second;

        Reply reply;
        reply.ParseFromString(s);
        key = reply.key();

        if (reply.status() == 0)
            success_count ++;
        if (reply.status() == 0) {
            success_count += count;
        }
    }

    string final_reply_str;
@@ -243,4 +192,15 @@ LockClient::UnlockCallback(const std::string &request_str, const std::string &re
    w->Reply(0);
}

void
LockClient::ErrorCallback(const std::string &request_str,
                          replication::ErrorCode err)
{
    Debug("Error Callback: %s %s", request_str.c_str(),
          replication::ErrorCodeToString(err).c_str());
    Promise *w = waiting;
    waiting = NULL;
    w->Reply(-3);  // Invalid command.
}

} // namespace lockserver
Original line number Diff line number Diff line
@@ -33,49 +33,65 @@

#include "lib/assert.h"
#include "lib/message.h"
#include "lib/udptransport.h"
#include "lib/transport.h"
#include "replication/ir/client.h"
#include "store/common/promise.h"
#include "lockserver/locks-proto.pb.h"

#include <map>
#include <set>
#include <string>
#include <thread>
#include <random>

namespace lockserver {

class LockClient
{
public:
    LockClient(const std::string &configPath);
    LockClient(Transport* transport, const transport::Configuration &config);
    ~LockClient();

    // Synchronously lock and unlock. Calling lock (or unlock) will block until
    // the lock (or unlock) request is fully processed.
    bool lock(const std::string &key);
    void unlock(const std::string &key);

    // Asynchronously lock and unlock. Calling lock_async or unlock_async will
    // not block. Calling lock_wait (or unlock_wait) will block for the
    // previous invocation of lock_async (or unlock_async) to complete.
    //
    // All async calls must be followed by a corresponding wait call. It is an
    // error to issue multiple async requests without waiting. It is also
    // erroneous to wait for a request which was never issued.
    void lock_async(const std::string &key);
    bool lock_wait();
    void unlock_async(const std::string &key);
    void unlock_wait();

private:
    /* Unique ID for this client. */
    uint64_t client_id;

    /* Transport layer and thread. */
    UDPTransport transport; 
    std::thread *clientTransport;
    Transport *transport;

    /* Function to run the transport thread. */
    void run_client();

    /* Decide function for a lock server. */
    string Decide(const std::set<string> &results);
    string Decide(const std::map<string, std::size_t> &results);

    /* IR client proxy. */
    replication::ir::IRClient *client;

    /* Promise to wait for pending operation. */
    Promise *waiting;
    Promise *waiting = nullptr;

    /* Callbacks for hearing back for an operation. */
    void LockCallback(const std::string &, const std::string &);
    void UnlockCallback(const std::string &, const std::string &);
    void ErrorCallback(const std::string &, replication::ErrorCode);
};

} // namespace lockserver
Original line number Diff line number Diff line
syntax = "proto2";

package lockserver.proto;

message Request {
+80 −0
Original line number Diff line number Diff line
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
 *
 * lockserver-repl.cc: Step-by-step lock server evaluation.
 *
 * Copyright 2013 Dan R. K. Ports  <drkp@cs.washington.edu>
 *
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use, copy,
 * modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 *
 **********************************************************************/
#include <thread>
#include <memory>

#include "lib/configuration.h"
#include "lib/repltransport.h"
#include "lockserver/client.h"
#include "lockserver/server.h"
#include "replication/ir/replica.h"

int main() {
    ReplTransport transport;
    std::vector<transport::ReplicaAddress> replica_addrs = {
        {"replica", "0"},
        {"replica", "1"},
        {"replica", "2"},
        {"replica", "3"},
        {"replica", "4"}};
    transport::Configuration config(5 /* n */, 2 /* f */, replica_addrs);

    // Clients.
    lockserver::LockClient client_a(&transport, config);
    lockserver::LockClient client_b(&transport, config);
    lockserver::LockClient client_c(&transport, config);
    client_a.lock_async("a");
    client_b.lock_async("b");
    client_c.lock_async("c");

    // Servers.
    std::vector<std::unique_ptr<lockserver::LockServer>> servers;
    std::vector<std::unique_ptr<replication::ir::IRReplica>> replicas;
    for (std::size_t i = 0; i < replica_addrs.size(); ++i) {
        auto server = std::unique_ptr<lockserver::LockServer>(
            new lockserver::LockServer());
        servers.push_back(std::move(server));
        auto replica = std::unique_ptr<replication::ir::IRReplica>(
            new replication::ir::IRReplica(config, i, &transport,
                                           servers[i].get()));
        replicas.push_back(std::move(replica));
    }

    // Launch REPL.
    transport.Run();

    // Remove persisted files.
    for (std::size_t i = 0; i < replica_addrs.size(); ++i) {
        const transport::ReplicaAddress &addr = replica_addrs[i];
        const std::string filename =
            addr.host + ":" + addr.port + "_" + std::to_string(i) + ".bin";
        int success = std::remove(filename.c_str());
        ASSERT(success == 0);
    }
}
+62 −0
Original line number Diff line number Diff line
#include "lockserver/server.h"

int
main(int argc, char **argv)
{
    int index = -1;
    const char *configPath = NULL;

    // Parse arguments
    int opt;
    char *strtolPtr;
    while ((opt = getopt(argc, argv, "c:i:")) != -1) {
        switch (opt) {
        case 'c':
            configPath = optarg;
            break;

        case 'i':
            index = strtol(optarg, &strtolPtr, 10);
            if ((*optarg == '\0') || (*strtolPtr != '\0') || (index < 0)) {
                fprintf(stderr, "option -i requires a numeric arg\n");
            }
            break;

        default:
            fprintf(stderr, "Unknown argument %s\n", argv[optind]);
        }
    }

    if (!configPath) {
        fprintf(stderr, "option -c is required\n");
        return EXIT_FAILURE;
    }

    if (index == -1) {
        fprintf(stderr, "option -i is required\n");
        return EXIT_FAILURE;
    }

    // Load configuration
    std::ifstream configStream(configPath);
    if (configStream.fail()) {
        fprintf(stderr, "unable to read configuration file: %s\n", configPath);
        return EXIT_FAILURE;
    }
    transport::Configuration config(configStream);

    if (index >= config.n) {
        fprintf(stderr, "replica index %d is out of bounds; "
                "only %d replicas defined\n", index, config.n);
        return EXIT_FAILURE;
    }

    UDPTransport transport(0.0, 0.0, 0);

    lockserver::LockServer server;
    replication::ir::IRReplica replica(config, index, &transport, &server);

    transport.Run();

    return EXIT_SUCCESS;
}
Original line number Diff line number Diff line
@@ -30,66 +30,9 @@

#include "lockserver/server.h"

int
main(int argc, char **argv)
{
    int index = -1;
    const char *configPath = NULL;

    // Parse arguments
    int opt;
    char *strtolPtr;
    while ((opt = getopt(argc, argv, "c:i:")) != -1) {
        switch (opt) {
        case 'c':
            configPath = optarg;
            break;
            
        case 'i':
            index = strtol(optarg, &strtolPtr, 10);
            if ((*optarg == '\0') || (*strtolPtr != '\0') || (index < 0)) {
                fprintf(stderr, "option -i requires a numeric arg\n");
            }
            break;
        
        default:
            fprintf(stderr, "Unknown argument %s\n", argv[optind]);
        }
    }

    if (!configPath) {
        fprintf(stderr, "option -c is required\n");
        return EXIT_FAILURE;
    }

    if (index == -1) {
        fprintf(stderr, "option -i is required\n");
        return EXIT_FAILURE;
    }

    // Load configuration
    std::ifstream configStream(configPath);
    if (configStream.fail()) {
        fprintf(stderr, "unable to read configuration file: %s\n", configPath);
        return EXIT_FAILURE;
    }
    transport::Configuration config(configStream);

    if (index >= config.n) {
        fprintf(stderr, "replica index %d is out of bounds; "
                "only %d replicas defined\n", index, config.n);
        return EXIT_FAILURE;
    }

    UDPTransport transport(0.0, 0.0, 0);

    lockserver::LockServer server;
    replication::ir::IRReplica replica(config, index, &transport, &server);
    
    transport.Run();

    return EXIT_SUCCESS;
}
#include <algorithm>
#include <iterator>
#include <unordered_set>

namespace lockserver {

@@ -173,4 +116,131 @@ LockServer::UnloggedUpcall(const string &str1, string &str2)
    Debug("Unlogged: %s\n", str1.c_str());
}

void
LockServer::Sync(const std::map<opid_t, RecordEntry>& record) {
    locks.clear();

    struct KeyLockInfo {
        std::unordered_set<uint64_t> locked;
        std::unordered_set<uint64_t> unlocked;
    };
    std::unordered_map<std::string, KeyLockInfo> key_lock_info;

    for (const std::pair<const opid_t, RecordEntry> &p : record) {
        const opid_t &opid = p.first;
        const RecordEntry &entry = p.second;

        Request request;
        request.ParseFromString(entry.request.op());
        Reply reply;
        reply.ParseFromString(entry.result);
        KeyLockInfo &info = key_lock_info[request.key()];

        Debug("Sync opid=(%lu, %lu), clientid=%lu, key=%s, type=%d, status=%d.",
              opid.first, opid.second, request.clientid(),
              request.key().c_str(), request.type(), reply.status());

        if (request.type() && reply.status() == 0) {
            // Lock.
            info.locked.insert(request.clientid());
        } else if (!request.type() && reply.status() == 0) {
            // Unlock.
            info.unlocked.insert(request.clientid());
        }
    }

    for (const std::pair<const std::string, KeyLockInfo> &p : key_lock_info) {
        const std::string &key = p.first;
        const KeyLockInfo &info = p.second;
        std::unordered_set<uint64_t> diff;
        std::set_difference(std::begin(info.locked), std::end(info.locked),
                            std::begin(info.unlocked), std::end(info.unlocked),
                            std::inserter(diff, diff.begin()));

        ASSERT(diff.size() == 0 || diff.size() == 1);
        if (diff.size() == 1) {
            uint64_t client_id = *std::begin(diff);
            Debug("Assigning lock %lu: %s", client_id, key.c_str());
            locks[key] = client_id;
        }
    }
}

std::map<opid_t, std::string>
LockServer::Merge(const std::map<opid_t, std::vector<RecordEntry>> &d,
                  const std::map<opid_t, std::vector<RecordEntry>> &u,
                  const std::map<opid_t, std::string> &majority_results_in_d) {
    // First note that d and u only contain consensus operations, and lock
    // requests are the only consensus operations (unlock is an inconsistent
    // operation), so d and u only contain lock requests. To merge, we grant
    // any majority successful lock request in d if it does not conflict with a
    // currently held lock. We do not grant any other lock request.

    std::map<opid_t, std::string> results;

    using EntryVec = std::vector<RecordEntry>;
    for (const std::pair<const opid_t, EntryVec>& p: d) {
        const opid_t &opid = p.first;
        const EntryVec &entries = p.second;

        // Get the request and reply.
        const RecordEntry &entry = *std::begin(entries);

        Request request;
        request.ParseFromString(entry.request.op());

        Reply reply;
        auto iter = majority_results_in_d.find(opid);
        ASSERT(iter != std::end(majority_results_in_d));
        reply.ParseFromString(iter->second);

        // Form the final result.
        const bool operation_successful = reply.status() == 0;
        if (operation_successful) {
            // If the lock was successful, then we acquire the lock so long as
            // it is not already held.
            const std::string &key = reply.key();
            if (locks.count(key) == 0) {
                Debug("Assigning lock %lu: %s", request.clientid(),
                      key.c_str());
                locks[key] = request.clientid();
                results[opid] = iter->second;
            } else {
                Debug("Rejecting lock %lu: %s", request.clientid(),
                      key.c_str());
                reply.set_status(-1);
                std::string s;
                reply.SerializeToString(&s);
                results[opid] = s;
            }
        } else {
            // If the lock was not successful, then we maintain this as the
            // majority result.
            results[opid] = iter->second;
        }
    }

    // We reject all lock requests in u. TODO: We could acquire a lock if
    // it is free, but it's simplest to just reject them unilaterally.
    for (const std::pair<const opid_t, EntryVec>& p: u) {
        const opid_t &opid = p.first;
        const EntryVec &entries = p.second;

        const RecordEntry &entry = *std::begin(entries);
        Request request;
        request.ParseFromString(entry.request.op());

        Debug("Rejecting lock %lu: %s", request.clientid(),
              request.key().c_str());
        Reply reply;
        reply.set_key(request.key());
        reply.set_status(-1);
        std::string s;
        reply.SerializeToString(&s);
        results[opid] = s;
    }

    return results;
}

} // namespace lockserver
Original line number Diff line number Diff line
@@ -31,15 +31,18 @@
#ifndef _IR_LOCK_SERVER_H_
#define _IR_LOCK_SERVER_H_

#include "lib/udptransport.h"
#include "replication/ir/replica.h"
#include "lockserver/locks-proto.pb.h"

#include <string>
#include <unordered_map>

#include "lib/transport.h"
#include "replication/ir/replica.h"
#include "lockserver/locks-proto.pb.h"

namespace lockserver {

using opid_t = replication::ir::opid_t;
using RecordEntry = replication::ir::RecordEntry;

class LockServer : public replication::ir::IRAppReplica
{
public:
@@ -47,13 +50,22 @@ public:
    ~LockServer();

    // Invoke inconsistent operation, no return value
    void ExecInconsistentUpcall(const string &str1);
    void ExecInconsistentUpcall(const string &str1) override;

    // Invoke consensus operation
    void ExecConsensusUpcall(const string &str1, string &str2);
    void ExecConsensusUpcall(const string &str1, string &str2) override;

    // Invoke unreplicated operation
    void UnloggedUpcall(const string &str1, string &str2);
    void UnloggedUpcall(const string &str1, string &str2) override;

    // Sync
    void Sync(const std::map<opid_t, RecordEntry>& record) override;

    // Merge
    std::map<opid_t, std::string> Merge(
        const std::map<opid_t, std::vector<RecordEntry>> &d,
        const std::map<opid_t, std::vector<RecordEntry>> &u,
        const std::map<opid_t, std::string> &majority_results_in_d) override;

private:
    std::unordered_map<std::string, uint64_t> locks;
+16 −0
Original line number Diff line number Diff line
d := $(dir $(lastword $(MAKEFILE_LIST)))

GTEST_SRCS += $(addprefix $(d), lockserver-test.cc)

$(d)lockserver-test: $(o)lockserver-test.o \
	$(o)../locks-proto.o \
	$(o)../server.o \
	$(o)../client.o \
	$(OBJS-ir-replica) \
	$(OBJS-ir-client) \
	$(LIB-configuration) \
	$(LIB-repltransport) \
   	$(LIB-store-common) \
	$(GTEST_MAIN)

TEST_BINS += $(d)lockserver-test
+343 −0
Original line number Diff line number Diff line
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
 *
 * lockserver_test.cc:
 *   test cases for lock server
 *
 * Copyright 2013 Dan R. K. Ports  <drkp@cs.washington.edu>
 *
 * Permission is hereby granted, free of charge, to any person
 * obtaining a copy of this software and associated documentation
 * files (the "Software"), to deal in the Software without
 * restriction, including without limitation the rights to use, copy,
 * modify, merge, publish, distribute, sublicense, and/or sell copies
 * of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be
 * included in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 *
 **********************************************************************/
#include <fstream>
#include <memory>
#include <thread>

#include <gtest/gtest.h>

#include "lib/configuration.h"
#include "lib/repltransport.h"
#include "lockserver/client.h"
#include "lockserver/server.h"
#include "replication/ir/replica.h"

class LockServerTest : public testing::Test {
protected:
    std::vector<transport::ReplicaAddress> replica_addrs_;
    std::unique_ptr<transport::Configuration> config_;
    ReplTransport transport_;
    std::vector<std::unique_ptr<lockserver::LockClient>> clients_;
    std::vector<std::unique_ptr<lockserver::LockServer>> servers_;
    std::vector<std::unique_ptr<replication::ir::IRReplica>> replicas_;

    LockServerTest() {
        replica_addrs_ = {{"replica", "0"},
                          {"replica", "1"},
                          {"replica", "2"},
                          {"replica", "3"},
                          {"replica", "4"}};
        config_ = std::unique_ptr<transport::Configuration>(
            new transport::Configuration(5, 2, replica_addrs_));
        RemovePersistedFiles();

        for (std::size_t i = 0; i < 3; ++i) {
            auto client = std::unique_ptr<lockserver::LockClient>(
                new lockserver::LockClient(&transport_, *config_));
            client->lock_async(std::to_string(i));
            clients_.push_back(std::move(client));
        }

        for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
            auto server = std::unique_ptr<lockserver::LockServer>(
                new lockserver::LockServer());
            servers_.push_back(std::move(server));
            auto replica = std::unique_ptr<replication::ir::IRReplica>(
                new replication::ir::IRReplica(*config_, i, &transport_,
                                               servers_[i].get()));
            replicas_.push_back(std::move(replica));
        }
    }

    virtual void TearDown() {
        RemovePersistedFiles();
    }

    virtual void RemovePersistedFiles() {
        for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
            const transport::ReplicaAddress &addr = replica_addrs_[i];
            const std::string filename =
                addr.host + ":" + addr.port + "_" + std::to_string(i) + ".bin";
            std::ifstream f(filename);
            if (f.good()) {
                int success = std::remove(filename.c_str());
                ASSERT(success == 0);
            }
        }
    }
};

// Note that these tests are all white box smoke tests. They depend on the
// low-level details of knowing exactly which timeouts are registered and which
// messages are sent. If an implementation detail is changed to make some of
// these tests fail, you should cal transport_.Run() and walk through the
// execution to trigger the desired behavior. Also, they only check to make
// sure that nothing crashes, though you can read through the Debug prints to
// make sure everything looks right.
//
// TODO: Use a ReplTransport for tests like the ones in ir-test.cc to assert
// that the correct messages are being sent.

TEST_F(LockServerTest, SuccessfulFastPathLock) {
    // Send client 0's lock request.
    transport_.TriggerTimer(1);

    // Deliver lock request to replicas.
    for (const auto &addr : replica_addrs_) {
        transport_.DeliverMessage({addr.host, addr.port}, 0);
    }

    // Deliver lock reply to client.
    for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
        transport_.DeliverMessage({"client", "0"}, i);
    }

    // Deliver finalize to replicas.
    for (const auto &addr : replica_addrs_) {
        transport_.DeliverMessage({addr.host, addr.port}, 1);
    }

    // Deliver confirm to client.
    int j = replica_addrs_.size();
    for (std::size_t i = j; i < j + replica_addrs_.size(); ++i) {
        transport_.DeliverMessage({"client", "0"}, i);
    }
}

TEST_F(LockServerTest, SuccessfulSlowPathLock) {
    // Send client 0's lock request.
    transport_.TriggerTimer(1);

    // Transition to slow path.
    transport_.TriggerTimer(clients_.size() + replica_addrs_.size() + 1);

    // Deliver lock request to replicas.
    for (const auto &addr : replica_addrs_) {
        transport_.DeliverMessage({addr.host, addr.port}, 0);
    }

    // Deliver lock reply to client.
    for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
        transport_.DeliverMessage({"client", "0"}, i);
    }

    // Deliver finalize to replicas.
    for (const auto &addr : replica_addrs_) {
        transport_.DeliverMessage({addr.host, addr.port}, 1);
    }

    // Deliver confirm to client.
    int j = replica_addrs_.size();
    for (std::size_t i = j; i < j + replica_addrs_.size(); ++i) {
        transport_.DeliverMessage({"client", "0"}, i);
    }
}

TEST_F(LockServerTest, SuccessfulViewChange) {
    // Send client 0's lock request.
    transport_.TriggerTimer(1);

    // Deliver lock request to replicas.
    for (const auto &addr : replica_addrs_) {
        transport_.DeliverMessage({addr.host, addr.port}, 0);
    }

    // Initiate view changes on all replicas.
    const std::size_t nclients = clients_.size();
    const std::size_t nreplicas = replica_addrs_.size();
    for (std::size_t i = nclients + 1; i < nclients + nreplicas + 1; ++i) {
        transport_.TriggerTimer(i);
    }

    // Deliver DoViewChangeMessages to new primary.
    const transport::ReplicaAddress& primary = replica_addrs_[1];
    for (std::size_t i = 1; i < 1 + nreplicas - 1; ++i) {
        transport_.DeliverMessage({primary.host, primary.port}, i);
    }

    // Deliver StartViewMessage to all replicas.
    for (std::size_t i = 0; i < nreplicas; ++i) {
        if (i == 1) {
            continue;
        }
        const transport::ReplicaAddress& addr = replica_addrs_[i];
        transport_.DeliverMessage({addr.host, addr.port}, nreplicas);
    }
}

TEST_F(LockServerTest, SuccessfulViewChangeNonemptyRdu) {
    const std::size_t nclients = clients_.size();
    const std::size_t nreplicas = replica_addrs_.size();
    ASSERT_GE(nclients, 3);
    ASSERT_GE(nreplicas, 3);

    // Send client 0's lock request.
    transport_.TriggerTimer(1);

    // Deliver lock request to replicas.
    for (const auto &addr : replica_addrs_) {
        transport_.DeliverMessage({addr.host, addr.port}, 0);
    }

    // Deliver lock reply to client.
    for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
        transport_.DeliverMessage({"client", "0"}, i);
    }

    // Deliver finalize to replicas.
    for (const auto &addr : replica_addrs_) {
        transport_.DeliverMessage({addr.host, addr.port}, 1);
    }

    // Send client 1's lock request.
    transport_.TriggerTimer(2);

    // Deliver lock request to first three replicas.
    for (std::size_t i = 0; i < 3; ++i) {
        const transport::ReplicaAddress &addr = replica_addrs_[i];
        transport_.DeliverMessage({addr.host, addr.port}, 2);
    }

    // Send client 2's lock request.
    transport_.TriggerTimer(3);

    // Deliver lock request to first replica.
    const transport::ReplicaAddress &addr = replica_addrs_[0];
    transport_.DeliverMessage({addr.host, addr.port}, 3);

    // View change first three replicas.
    for (std::size_t i = nclients + 1; i < nclients + 1 + 3; ++i) {
        transport_.TriggerTimer(i);
    }

    // Deliver DoViewChangeMessages to new primary.
    const transport::ReplicaAddress& primary = replica_addrs_[1];
    for (std::size_t i = 4; i < 4 + 2; ++i) {
        transport_.DeliverMessage({primary.host, primary.port}, i);
    }

    // Deliver StartViewMessage to replica 0 and 2.
    const transport::ReplicaAddress& addr0 = replica_addrs_[0];
    const transport::ReplicaAddress& addr2 = replica_addrs_[2];
    transport_.DeliverMessage({addr0.host, addr0.port}, 6);
    transport_.DeliverMessage({addr2.host, addr2.port}, 6);
}

TEST_F(LockServerTest, FinalizeConsensusReply) {
    const std::size_t nclients = clients_.size();
    const std::size_t nreplicas = replica_addrs_.size();

    // Send client 0's lock request.
    transport_.TriggerTimer(1);

    // Deliver lock request to replicas.
    for (const auto &addr : replica_addrs_) {
        transport_.DeliverMessage({addr.host, addr.port}, 0);
    }

    // Trigger view change.
    for (std::size_t i = nclients + 1; i < nclients + 1 + nreplicas; ++i) {
        transport_.TriggerTimer(i);
    }

    // Deliver DoViewChangeMessages to new primary.
    const transport::ReplicaAddress& primary = replica_addrs_[1];
    for (std::size_t i = 1; i < 1 + nreplicas - 1; ++i) {
        transport_.DeliverMessage({primary.host, primary.port}, i);
    }

    // Deliver StartViewMessage to all replicas.
    for (std::size_t i = 0; i < nreplicas; ++i) {
        if (i == 1) {
            continue;
        }
        const transport::ReplicaAddress& addr = replica_addrs_[i];
        transport_.DeliverMessage({addr.host, addr.port}, nreplicas);
    }

    // Deliver lock request to replicas.
    for (const auto &addr : replica_addrs_) {
        transport_.DeliverMessage({addr.host, addr.port}, 0);
    }

    // Deliver finalized reply to client.
    transport_.DeliverMessage({"client", "0"}, nreplicas);
}

TEST_F(LockServerTest, MismatchedConsensus) {
    const std::size_t nclients = clients_.size();
    const std::size_t nreplicas = replica_addrs_.size();

    // Send client 0's lock request.
    transport_.TriggerTimer(1);

    // Transition to slow path.
    transport_.TriggerTimer(nclients + nreplicas + 1);

    // Deliver lock request to replicas.
    for (const auto &addr : replica_addrs_) {
        transport_.DeliverMessage({addr.host, addr.port}, 0);
    }

    // Deliver lock reply to client.
    for (std::size_t i = 0; i < replica_addrs_.size(); ++i) {
        transport_.DeliverMessage({"client", "0"}, i);
    }

    // Trigger view change.
    for (std::size_t i = nclients + 1; i < nclients + 1 + nreplicas; ++i) {
        transport_.TriggerTimer(i);
    }

    // Deliver DoViewChangeMessages to new primary.
    const transport::ReplicaAddress& primary = replica_addrs_[1];
    for (std::size_t i = 2; i < 2 + nreplicas - 1; ++i) {
        transport_.DeliverMessage({primary.host, primary.port}, i);
    }

    // Deliver StartViewMessage to all replicas.
    for (std::size_t i = 0; i < nreplicas; ++i) {
        if (i == 1) {
            continue;
        }
        const transport::ReplicaAddress& addr = replica_addrs_[i];
        transport_.DeliverMessage({addr.host, addr.port}, 2 + nreplicas - 1);
    }

    // Deliver FinalizeConsensusMessage to all replicas.
    for (const auto &addr : replica_addrs_) {
        transport_.DeliverMessage({addr.host, addr.port}, 1);
    }

    // Deliver ConfirmMessages to client 0.
    for (std::size_t i = nreplicas; i < nreplicas + nreplicas; ++i) {
        transport_.DeliverMessage({"client", "0"}, i);
    }
}

proof/IR.tla

0 → 100644
+526 −0

File added.

Preview size limit exceeded, changes collapsed.

proof/IR_consensus.tla

0 → 100644
+833 −0

File added.

Preview size limit exceeded, changes collapsed.

proof/TAPIR.tla

0 → 100644
+272 −0
Original line number Diff line number Diff line
------------------------------- MODULE TAPIR -------------------------------

(***************************************************************************)
(* This is a TLA+ specification of the TAPIR algorithm.                    *)
(***************************************************************************)

EXTENDS FiniteSets, Naturals, TLC, TLAPS

Max(S) == IF S = {} THEN 0 ELSE CHOOSE i \in S: \A j \in S: j <= i

(***************************************************************************)
(* TAPIR constants:                                                        *)
(*    1. Shards: function from shard id to set of replica ids in the shard *)
(*    2. Transactions: set of all possible transactions                    *)
(*    3. nr_shards: number of shards                                       *)
(***************************************************************************)
CONSTANTS Shards, Transactions, NrShards
\* Note: assume unique number ids for replicas

(***************************************************************************)
(* IR constants & variables (description in the IR module)                 *)
(***************************************************************************)
CONSTANTS Clients, Quorums, SuperQuorums,
          max_vc, max_req, f

VARIABLES rState, rRecord, rCrtView, rLastView, rViewReplies, rNonce,
          rViewOnDisk,
          sentMsg, cCrtOp,
          cCrtOpToFinalize, cMsgCounter, cCrtOpReplies, cCrtOpConfirms,
          cState, aSuccessful, arRecord, aVisibility, gViewChangesNo
          
irReplicaVars == <<rState, rRecord, rCrtView, rViewOnDisk, rLastView,
                   rViewReplies, rNonce>>
irClientVars == <<cCrtOp,        \* current operation at a client
                  cCrtOpReplies, \* current operation replies
                  cMsgCounter,
                  cState,
                  cCrtOpToFinalize,
                  cCrtOpConfirms>>                  \* Client variables.
irAppVars == <<aSuccessful, arRecord, aVisibility>> \* Application variables.
irOtherVars == <<sentMsg, gViewChangesNo>>          \* Other variables.

IRMessageId == [cid: Clients, msgid: Nat]

(***************************************************************************)
(* TAPIR Variables/State:                                                  *)
(*        1. State at each replica:                                        *)
(*            rPrepareTxns = List of txns this replica is prepared         *)
(*                           to commit                                     *)
(*            rTxnsLog = Log of committed and aborted txns in ts order     *)
(*            rStore = Versioned store                                     *)
(*            rBkpTable = Table of txns for which this replica             *)
(*                        is the bkp coordinator                           *)
(*        2. State of communication medium:                                *)
(*            sentMsg = sent (and duplicate) messages                      *)
(*        3. State at client:                                              *)
(*            cCrtTxn = crt txn requested by the client                    *)
(*                                                                         *)
(***************************************************************************)

\* TAPIR variables & data structures
VARIABLES rPreparedTxns, rStore, rTxnsLogAborted, rTxnsLogCommitted,
          rClock, cCrtTxn, cClock

tapirReplicaVars == <<rPreparedTxns, rStore, rTxnsLogAborted,
                      rTxnsLogCommitted,
                      rClock>>
tapirClientVars == <<cCrtTxn, cClock>>

vars == <<irReplicaVars, irClientVars, irAppVars, irOtherVars,
         tapirReplicaVars, tapirClientVars>>

StoreEntry == [vs: Nat, val: Nat] \* vs = version
Store == [key: Nat,
          entries: SUBSET StoreEntry,
          latestVs: Nat,
          latestVal: Nat]

TransactionTs == [cid: Clients, clock: Nat] \* Timestamp
ReadSet == [key: Nat, val: Nat, vs: Nat]
WriteSet == [key: Nat, val: Nat]
Transaction == [rSet: SUBSET ReadSet,
                wSet: SUBSET WriteSet,
                shards: SUBSET Nat]

TypeOK == 
    /\ rStore \in [UNION {Shards[i]: i \in 1..NrShards} -> SUBSET Store]
    /\ rPreparedTxns \in [UNION {Shards[i]: i \in 1..NrShards} -> SUBSET Transaction]
    /\ rTxnsLogAborted \in [UNION {Shards[i]: i \in 1..NrShards} -> SUBSET Transaction]
    /\ rTxnsLogCommitted \in [UNION {Shards[i]: i \in 1..NrShards} -> SUBSET Transaction]
    

TAPIRResults == {"Prepare-OK", "Retry", "Prepare-Abstain", "Abort"}
TAPIROpType == {"Prepare", "ABORT", "COMMIT"}
TAPIROpBody == [opType : TAPIROpType, txn: Transaction]

TAPIRClientFail == TRUE \* state we lose at the app level
TAPIRReplicaFail == TRUE \* state we lose at the app level

\* TAPIR implementation of IR interface
TAPIRExecInconsistent(op) == TRUE
TAPIRExecConsensus(op) == IF op.type = "Consensus" THEN "Prepare-OK" ELSE "Abort"
TAPIRDecide(results) == "Prepare-OK"

TAPIRMerge(R, d, u) == R \cup d \cup
                       {[msgid |-> x.msgid, op |-> x.op, res |-> "Prepare-OK"]: x \in u}

TAPIRSync(records) == TRUE
TAPIRSuccessfulInconsistentOp(c, S, op) == TRUE
TAPIRSuccessfulConsensusOp(c, S, op, res) == TRUE


\* Initialize for all shards
InitIR ==
  /\ rState = [s \in 1..NrShards |-> [r \in Shards[s] |-> "NORMAL"]]
  /\ rRecord = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
  /\ rCrtView = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
  /\ rViewOnDisk = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
  /\ rLastView = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
  /\ rViewReplies = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
  /\ rNonce = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]
  /\ sentMsg = [s \in 1..NrShards |-> {}]
  /\ cCrtOp = [s \in 1..NrShards |-> [c \in Clients |-> <<>>]]
  /\ cCrtOpToFinalize = [s \in 1..NrShards |-> [c \in Clients |-> <<>>]]
  /\ cMsgCounter = [s \in 1..NrShards |-> [c \in Clients |-> 0]]
  /\ cCrtOpReplies = [s \in 1..NrShards |-> [c \in Clients |-> {}]]
  /\ cCrtOpConfirms = [s \in 1..NrShards |-> [c \in Clients |-> {}]]
  /\ cState = [c \in Clients |-> "NORMAL"]
  /\ aSuccessful = {}
  /\ arRecord = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
  /\ aVisibility = [s \in 1..NrShards |-> [o \in IRMessageId |-> {}]]
  /\ gViewChangesNo = [s \in 1..NrShards |-> 0]


\* IR instance per shard TODO: modify replica also
IR(s) == INSTANCE IR_consensus WITH AppClientFail <- TAPIRClientFail,
                                    AppReplicaFail <- TAPIRReplicaFail,
                                    OpBody <- TAPIROpBody,
                                    ExecInconsistent <- TAPIRExecInconsistent,
                                    ExecConsensus <- TAPIRExecConsensus,
                                    Merge <- TAPIRMerge,
                                    Sync <- TAPIRSync,
                                    SuccessfulInconsistentOp <- TAPIRSuccessfulInconsistentOp,
                                    SuccessfulConsensusOp <- TAPIRSuccessfulConsensusOp,
                                    Decide <- TAPIRDecide,
                                    Results <- TAPIRResults,
                                    Replicas <- Shards[s],
                                    Quorums <- Quorums[s],
                                    SuperQuorums <- SuperQuorums[s],
                                    S <- s

\* TAPIR messages
Message ==
       [type: {"READ"},
        key: Nat,
        dst: UNION Shards]
    \cup
       [type: {"READ-REPLY"},
        key: Nat,
        val: Nat,
        vs: Nat,      \* version
        dst: Clients]
  \cup
       [type: {"READ-VERSION"},
        key: Nat,
        vs: Nat,
        dst: UNION Shards]
  \cup 
       [type: {"READ-VERSION-REPLY"},
        key: Nat,
        vs: Nat,
        dst: Clients]

InitTAPIR == /\ cCrtTxn = [c \in Clients |-> <<>>]
             /\ cClock = [c \in Clients |-> 0]
             /\ rPreparedTxns =  [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
             /\ rStore = [r \in UNION {Shards[i]: i \in 1..NrShards} |-> {}]
             /\ rTxnsLogAborted = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
             /\ rTxnsLogCommitted = [s \in 1..NrShards |-> [r \in Shards[s] |-> {}]]
             /\ rClock = [s \in 1..NrShards |-> [r \in Shards[s] |-> 0]]

Init == InitIR /\ InitTAPIR

-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Tapir replica actions}} ^'                    *)
(***************************************************************************)
\*TAPIRReplicaReceiveRead(r) == TRUE


\*TAPIRReplicaAction(r) ==
\*    \/ /\ rState[r] = "NORMAL"
\*       /\ \/ TAPIRReplicaReceiveRead(r)


-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{Tapir client actions}} ^'                     *)
(***************************************************************************)

TAPIRClientExecuteTxn(c) ==
   \* first, resolve all reads (read from any replica and get the vs)
   \* then send prepares in all shard involved by seting the cCrtOp in the
   \*   respective IR shard instance
   \* TODO: for now just simulate this, pick a transaction from
   \*       transaction pool, get some versions from the replica
   \*       stores
   /\ cCrtTxn[c] = <<>>
   /\ \E t \in Transactions:
     LET rSet == {rse \in ReadSet:
                    /\ \E trse \in t.rSet : rse = trse
                    /\ LET
                         r == Max({r \in Shards[(rse.key % NrShards) + 1]:
                                 \E se \in rStore[r]: rse.key = se.key})
                       IN
                         /\ r /= 0
                         /\ \E se \in rStore[r]:
                           /\ rse.key = se.key
                           /\ rse.val = se.latestVal
                           /\ rse.vs = se.latestVs
                  }
         shards == {s \in 1..NrShards: 
                      \/ \E trse \in t.rSet: s = (trse.key % NrShards) + 1
                      \/ \E twse \in t.wSet: s = (twse.key % NrShards) + 1 }
     IN
       /\ Cardinality(rSet) = Cardinality(t.rSet) \* found all the reads
       /\ cCrtTxn' = [cCrtTxn EXCEPT ![c] = [rSet |-> rSet,
                                             wSet |-> t.wSet,
                                             shards |-> shards]]
   /\ UNCHANGED <<irReplicaVars, irClientVars, irOtherVars, irAppVars,
                  tapirReplicaVars, cClock>>

TAPIRClientPrepareTxn(c) ==
     /\ cCrtTxn[c] /= <<>>
     /\ \E s \in cCrtTxn[c].shards: \* prepare in shard s
                                    \* - ok if already prepared
       /\ IR(s)!ClientRequest(c, [type |-> "Consensus",
                                  body |-> [opType |-> "Prepare",
                                            txn    |-> cCrtTxn[c]]])
     /\ UNCHANGED <<irReplicaVars, irAppVars,
                    cCrtOpReplies,
                    cCrtOpConfirms,
                    cCrtOpToFinalize,
                    gViewChangesNo,
                    cState, tapirClientVars, tapirReplicaVars>>

TAPIRClientAction(c) == 
   \/ /\ cState[c] = "NORMAL"
      /\ \/ TAPIRClientExecuteTxn(c) \* for now just simulate this
                                     \* (don't send explicit READ messages)
         \/ TAPIRClientPrepareTxn(c)

-----------------------------------------------------------------------------
(***************************************************************************)
(* `^ \center \large{\textbf{High-Level Actions}} ^'                       *)
(***************************************************************************)

Next ==
     \/ \E c \in Clients: TAPIRClientAction(c)
     \/ /\ \E s \in 1..NrShards: IR(s)!Next
        /\ UNCHANGED <<tapirClientVars, tapirReplicaVars>>
     \/ \* Avoid deadlock by termination
       ((\A s \in 1..NrShards: 
          \A i \in 1..Cardinality(Shards[s]):
            rLastView[s][i] = max_vc) /\ UNCHANGED <<vars>>)

Inv == IR(1)!TypeOK /\ IR(1)!FaultTolerance

=============================================================================
\* Modification History
\* Last modified Mon Aug 31 12:55:38 PDT 2015 by aaasz
\* Created Sat Jan 31 18:31:52 PST 2015 by aaasz

proof/Test.tla

0 → 100644
+64 −0
Original line number Diff line number Diff line
--------------------------------- MODULE Test ---------------------------------
(***************************************************************************)
(* This is a TLA+ specification of the Inconsistent Replication algorithm. *)
(* (And a mechanically-checked proof of its correctness using TLAPS)       *)
(***************************************************************************)
EXTENDS Naturals, FiniteSets, TLC

VARIABLES rViewReplies, recoveredOps

OpType == {"Inconsistent", "Consensus"}
OpStatus == {"TENTATIVE", "FINALIZED"}
Operations == [type: OpType, body: Nat]

TypeOK ==
  /\ rViewReplies \in SUBSET ([lv: Nat,
                               r: SUBSET ([msgid: Nat,
                                           op: Operations,
                                           res: Nat,
                                           status: OpStatus]
                                     \cup [msgid: Nat,
                                           op: Operations,
                                           status: OpStatus]),
                               src: Nat])

A == 
  rViewReplies

B ==
  \* set of all records received in replies in A
  UNION {x.r: x \in A}

test_recoveredConensusOps_R ==
  \* any finalized consensus operation (in at least one record, in the maximum
  \* latest view)
  {x \in B:
     /\ x.op.type = "Consensus"
     /\ x.status = "FINALIZED"
     /\ LET most_updated_reply ==
             CHOOSE reply \in A:
                          /\ \E rec \in reply.r: /\ rec.msgid = x.msgid
                                                 /\ rec.status = "FINALIZED"
                          /\ \A rep \in A:
                            IF \E rec \in rep.r: /\ rec.msgid = x.msgid
                                                 /\ rec.status = "FINALIZED"
                            THEN rep.lv <= reply.lv
                            ELSE TRUE
                   IN
                     x \in most_updated_reply.r}

Init ==
  /\ rViewReplies = {[lv |-> 1, r |-> {[msgid |-> 1, op |-> [type |-> "Consensus", body |-> 1], res |-> 1, status |-> "FINALIZED"]}, src |-> 1],
                     [lv |-> 2, r |-> {[msgid |-> 1, op |-> [type |-> "Consensus", body |-> 1], res |-> 2, status |-> "FINALIZED"]}, src |-> 2],
                     [lv |-> 3, r |-> {[msgid |-> 1, op |-> [type |-> "Consensus", body |-> 1], res |-> 3, status |-> "FINALIZED"]}, src |-> 3]}
  /\ recoveredOps = {}

Next ==
  /\ recoveredOps' = test_recoveredConensusOps_R
  /\ Assert(Cardinality(recoveredOps) = 0, "Should fail")
  /\ UNCHANGED <<rViewReplies>>

=============================================================================
\* Modification History
\* Last modified Fri Apr 24 14:34:42 PDT 2015 by aaasz
\* Created Fri Dec 12 17:42:14 PST 2014 by aaasz

proof/VR.tla

0 → 100644
+763 −0

File added.

Preview size limit exceeded, changes collapsed.

proof/VR_MC_constants

0 → 100644
+8 −0
Original line number Diff line number Diff line
Quorums <- {{1,2}, {1,3}, {2,3}}
Clients <- {1}
max_req <- 1
f <- 1
max_vc <- 3
Operations <- {1}
Replicas <- {1,2,3}
max_c <- 3
Original line number Diff line number Diff line
@@ -37,6 +37,18 @@

namespace replication {

std::string ErrorCodeToString(ErrorCode err) {
    switch (err) {
        case ErrorCode::TIMEOUT:
            return "TIMEOUT";
        case ErrorCode::MISMATCHED_CONSENSUS_VIEWS:
            return "MISMATCHED_CONSENSUS_VIEWS";
        default:
            Assert(false);
            return "";
    }
}

Client::Client(const transport::Configuration &config, Transport *transport,
               uint64_t clientid)
    : config(config), transport(transport)
Original line number Diff line number Diff line
@@ -42,24 +42,46 @@

namespace replication {

// A client's request may fail for various reasons. For example, if enough
// replicas are down, a client's request may time out. An ErrorCode indicates
// the reason that a client's request failed.
enum class ErrorCode {
    // For whatever reason (failed replicas, slow network), the request took
    // too long and timed out.
    TIMEOUT,

    // For IR, if a client issues a consensus operation and receives a majority
    // of replies and confirms in different views, then the operation fails.
    MISMATCHED_CONSENSUS_VIEWS
};

std::string ErrorCodeToString(ErrorCode err);

class Client : public TransportReceiver
{
public:
    typedef std::function<void (const string &, const string &)> continuation_t;
    typedef std::function<void (const string &)> timeout_continuation_t;
    using continuation_t =
        std::function<void(const string &request, const string &reply)>;
    using error_continuation_t =
        std::function<void(const string &request, ErrorCode err)>;

    static const uint32_t DEFAULT_UNLOGGED_OP_TIMEOUT = 1000; // milliseconds

    Client(const transport::Configuration &config, Transport *transport,
           uint64_t clientid = 0);
    virtual ~Client();
    virtual void Invoke(const string &request,
                        continuation_t continuation) = 0;
    virtual void InvokeUnlogged(int replicaIdx,

    virtual void Invoke(
        const string &request,
        continuation_t continuation,
        error_continuation_t error_continuation = nullptr) = 0;
    virtual void InvokeUnlogged(
        int replicaIdx,
        const string &request,
        continuation_t continuation,
                                timeout_continuation_t timeoutContinuation = nullptr,
        error_continuation_t error_continuation = nullptr,
        uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT) = 0;

    virtual void ReceiveMessage(const TransportAddress &remote,
                                const string &type,
                                const string &data);
Original line number Diff line number Diff line
@@ -83,6 +83,19 @@ public:
        }
    }

    const std::map<int, MSGTYPE> *
    CheckForQuorum()
    {
        for (const auto &p : messages) {
            const IDTYPE &vs = p.first;
            const std::map<int, MSGTYPE> *quorum = CheckForQuorum(vs);
            if (quorum != nullptr) {
                return quorum;
            }
        }
        return nullptr;
    }

    const std::map<int, MSGTYPE> *
    AddAndCheckForQuorum(IDTYPE vs, int replicaIdx, const MSGTYPE &msg)
    {
Original line number Diff line number Diff line
@@ -12,7 +12,7 @@ OBJS-ir-client := $(o)ir-proto.o $(o)client.o \

OBJS-ir-replica := $(o)record.o $(o)replica.o $(o)ir-proto.o \
                   $(OBJS-replica) $(LIB-message) \
                   $(LIB-configuration)
                   $(LIB-configuration) $(LIB-persistent_register)

include $(d)tests/Rules.mk
Original line number Diff line number Diff line
@@ -38,7 +38,10 @@
#include "replication/ir/ir-proto.pb.h"

#include <functional>
#include <map>
#include <memory>
#include <set>
#include <unordered_map>

namespace replication {
namespace ir {
@@ -46,59 +49,169 @@ namespace ir {
class IRClient : public Client
{
public:
    typedef std::function<string (const std::set<string> &)> decide_t; 
    using result_set_t = std::map<string, std::size_t>;
    using decide_t = std::function<string(const result_set_t &)>;

    IRClient(const transport::Configuration &config,
             Transport *transport,
             uint64_t clientid = 0);
    virtual ~IRClient();
    virtual void Invoke(const string &request,
                        continuation_t continuation);
    virtual void InvokeInconsistent(const string &request,
                                    continuation_t continuation);
    virtual void InvokeConsensus(const string &request,
                                 decide_t decide,
                                 continuation_t continuation);
    virtual void InvokeUnlogged(int replicaIdx,

    virtual void Invoke(
        const string &request,
        continuation_t continuation,
                                timeout_continuation_t timeoutContinuation = nullptr,
                                uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT);
    virtual void ReceiveMessage(const TransportAddress &remote,
                                const string &type, const string &data);
        error_continuation_t error_continuation = nullptr) override;
    virtual void InvokeUnlogged(
        int replicaIdx,
        const string &request,
        continuation_t continuation,
        error_continuation_t error_continuation = nullptr,
        uint32_t timeout = DEFAULT_UNLOGGED_OP_TIMEOUT) override;
    virtual void ReceiveMessage(
        const TransportAddress &remote,
        const string &type,
        const string &data) override;

protected:
    uint64_t view;
    uint64_t lastReqId;
    QuorumSet<viewstamp_t, proto::ReplyInconsistentMessage> inconsistentReplyQuorum;
    QuorumSet<viewstamp_t, proto::ReplyConsensusMessage> consensusReplyQuorum;
    QuorumSet<viewstamp_t, proto::ConfirmMessage> confirmQuorum;
    virtual void InvokeInconsistent(
        const string &request,
        continuation_t continuation,
        error_continuation_t error_continuation = nullptr);
    virtual void InvokeConsensus(
        const string &request,
        decide_t decide,
        continuation_t continuation,
        error_continuation_t error_continuation = nullptr);

    struct PendingRequest
    {
protected:
    struct PendingRequest {
        string request;
        uint64_t clientReqId;
        decide_t decide;
        continuation_t continuation;
        timeout_continuation_t timeoutContinuation;
        bool continuationInvoked = false;
        std::unique_ptr<Timeout> timer;
        QuorumSet<viewstamp_t, proto::ConfirmMessage> confirmQuorum;

        inline PendingRequest(string request, uint64_t clientReqId,
                              continuation_t continuation)
            : request(request), clientReqId(clientReqId),
              continuation(continuation) { }
                              continuation_t continuation,
                              std::unique_ptr<Timeout> timer, int quorumSize)
            : request(request),
              clientReqId(clientReqId),
              continuation(continuation),
              timer(std::move(timer)),
              confirmQuorum(quorumSize){};
        virtual ~PendingRequest(){};
    };

    struct PendingUnloggedRequest : public PendingRequest {
        error_continuation_t error_continuation;

        inline PendingUnloggedRequest(
            string request, uint64_t clientReqId, continuation_t continuation,
            error_continuation_t error_continuation,
            std::unique_ptr<Timeout> timer)
            : PendingRequest(request, clientReqId, continuation,
                             std::move(timer), 1),
              error_continuation(error_continuation){};
    };

    struct PendingInconsistentRequest : public PendingRequest {
        QuorumSet<viewstamp_t, proto::ReplyInconsistentMessage>
            inconsistentReplyQuorum;

        inline PendingInconsistentRequest(string request, uint64_t clientReqId,
                                          continuation_t continuation,
                                          std::unique_ptr<Timeout> timer,
                                          int quorumSize)
            : PendingRequest(request, clientReqId, continuation,
                             std::move(timer), quorumSize),
              inconsistentReplyQuorum(quorumSize){};
    };
    PendingRequest *pendingInconsistentRequest;
    PendingRequest *pendingConsensusRequest;
    PendingRequest *pendingUnloggedRequest;
    Timeout *inconsistentRequestTimeout;
    Timeout *consensusRequestTimeout;
    Timeout *confirmationTimeout;
    Timeout *unloggedRequestTimeout;

    void SendInconsistent();
    void ResendInconsistent();
    void ConsensusSlowPath();
    void ResendConfirmation();

    struct PendingConsensusRequest : public PendingRequest {
        QuorumSet<opnum_t, proto::ReplyConsensusMessage> consensusReplyQuorum;
        decide_t decide;
        string decideResult;
        const std::size_t quorumSize;
        const std::size_t superQuorumSize;
        bool on_slow_path;
        error_continuation_t error_continuation;

        // The timer to give up on the fast path and transition to the slow
        // path. After this timer is run for the first time, it is nulled.
        std::unique_ptr<Timeout> transition_to_slow_path_timer;

        // The view for which a majority result (or finalized result) was
        // found. The view of a majority of confirms must match this view.
        uint64_t reply_consensus_view = 0;

        // True when a consensus request has already received a quorum or super
        // quorum of replies and has already transitioned into the confirm
        // phase.
        bool sent_confirms = false;

        inline PendingConsensusRequest(
            string request, uint64_t clientReqId, continuation_t continuation,
            std::unique_ptr<Timeout> timer,
            std::unique_ptr<Timeout> transition_to_slow_path_timer,
            int quorumSize, int superQuorum, decide_t decide,
            error_continuation_t error_continuation)
            : PendingRequest(request, clientReqId, continuation,
                             std::move(timer), quorumSize),
              consensusReplyQuorum(quorumSize),
              decide(decide),
              quorumSize(quorumSize),
              superQuorumSize(superQuorum),
              on_slow_path(false),
              error_continuation(error_continuation),
              transition_to_slow_path_timer(
                  std::move(transition_to_slow_path_timer)){};
    };

    uint64_t lastReqId;
    std::unordered_map<uint64_t, PendingRequest *> pendingReqs;

    void SendInconsistent(const PendingInconsistentRequest *req);
    void ResendInconsistent(const uint64_t reqId);
    void SendConsensus(const PendingConsensusRequest *req);
    void ResendConsensus(const uint64_t reqId);

    // `TransitionToConsensusSlowPath` is called after a timeout to end the
    // possibility of taking the fast path and transition into taking the slow
    // path.
    void TransitionToConsensusSlowPath(const uint64_t reqId);

    // HandleSlowPathConsensus is called in one of two scenarios:
    //
    //   1. A finalized ReplyConsensusMessage was received. In this case, we
    //      immediately enter the slow path and use the finalized result. If
    //      finalized is true, req has already been populated with the
    //      finalized result.
    //   2. We're in the slow path and receive a majority of
    //      ReplyConsensusMessages in the same view. In this case, we call
    //      decide to determine the final result.
    //
    // In either case, HandleSlowPathConsensus intitiates the finalize phase of
    // a consensus request.
    void HandleSlowPathConsensus(
        const uint64_t reqid,
        const std::map<int, proto::ReplyConsensusMessage> &msgs,
        const bool finalized_result_found,
        PendingConsensusRequest *req);

    // HandleFastPathConsensus is called when we're on the fast path and
    // receive a super quorum of responses from the same view.
    // HandleFastPathConsensus will check to see if there is a superquorum of
    // matching responses. If there is, it will return to the user and
    // asynchronously intitiate the finalize phase of a consensus request.
    // Otherwise, it transitions into the slow path which will also initiate
    // the finalize phase of a consensus request, but not yet return to the
    // user.
    void HandleFastPathConsensus(
        const uint64_t reqid,
        const std::map<int, proto::ReplyConsensusMessage> &msgs,
        PendingConsensusRequest *req);

    void ResendConfirmation(const uint64_t reqId, bool isConsensus);
    void HandleInconsistentReply(const TransportAddress &remote,
                                 const proto::ReplyInconsistentMessage &msg);
    void HandleConsensusReply(const TransportAddress &remote,
@@ -107,7 +220,7 @@ protected:
                       const proto::ConfirmMessage &msg);
    void HandleUnloggedReply(const TransportAddress &remote,
                             const proto::UnloggedReplyMessage &msg);
    void UnloggedRequestTimeoutCallback();
    void UnloggedRequestTimeoutCallback(const uint64_t reqId);
};

} // namespace replication::ir
Original line number Diff line number Diff line
syntax = "proto2";

import "replication/common/request.proto";

package replication.ir.proto;
@@ -7,6 +9,40 @@ message OpID {
    required uint64 clientreqid = 2;
}

// For the view change and recovery protocol, a replica stores two things on
// disk: (1) its current view and (2) the latest view during which it was
// NORMAL. Replicas pack this information into this proto buf and serialize it
// to disk.
message PersistedViewInfo {
  required uint64 view = 1;
  required uint64 latest_normal_view = 2;
}

enum RecordEntryState {
    RECORD_STATE_TENTATIVE = 0;
    RECORD_STATE_FINALIZED = 1;
}

enum RecordEntryType {
    RECORD_TYPE_INCONSISTENT = 0;
    RECORD_TYPE_CONSENSUS = 1;
}

message RecordEntryProto {
  required uint64 view = 1;
  required OpID opid = 2;
  required RecordEntryState state = 3;
  required RecordEntryType type = 4;
  required bytes op = 5;
  required bytes result = 6;
}

// TODO: Currently, replicas send entire records to one another. Figure out if
// there is a more efficient way to exchange records.
message RecordProto {
  repeated RecordEntryProto entry = 1;
}

message ProposeInconsistentMessage {
    required replication.Request req = 1;
}
@@ -15,6 +51,7 @@ message ReplyInconsistentMessage {
    required uint64 view = 1;
    required uint32 replicaIdx = 2;
    required OpID opid = 3;
    required bool finalized = 4;
}

message FinalizeInconsistentMessage {
@@ -36,6 +73,7 @@ message ReplyConsensusMessage {
    required uint32 replicaIdx = 2;
    required OpID opid = 3;
    required bytes result = 4;
    required bool finalized = 5;
}

message FinalizeConsensusMessage {
@@ -43,10 +81,24 @@ message FinalizeConsensusMessage {
    required bytes result = 2;
}

message DoViewChangeMessage {
    required uint32 replicaIdx = 1;
    // record is optional because a replica only sends its record to the
    // leader of the new view.
    optional RecordProto record = 2;
    required uint64 new_view = 3;
    required uint64 latest_normal_view = 4;
}
message StartViewMessage {
    required RecordProto record = 1;
    required uint64 new_view = 2;
}

message UnloggedRequestMessage {
    required replication.UnloggedRequest req = 1;
}

message UnloggedReplyMessage {
    required bytes reply = 1;
    required uint64 clientreqid = 2;
}
Original line number Diff line number Diff line
@@ -29,35 +29,52 @@
 **********************************************************************/

#include "replication/ir/record.h"

#include <utility>

#include "lib/assert.h"

namespace replication {
namespace ir {

RecordEntry &
Record::Add(view_t view, opid_t opid, const Request &request,
            RecordEntryState state)
{
    RecordEntry entry;
    entry.view = view;
    entry.opid = opid;
    entry.request = request;
    entry.state = state;
Record::Record(const proto::RecordProto &record_proto) {
    for (const proto::RecordEntryProto &entry_proto : record_proto.entry()) {
        const view_t view = entry_proto.view();
        const opid_t opid = std::make_pair(entry_proto.opid().clientid(),
                                     entry_proto.opid().clientreqid());
        Request request;
        request.set_op(entry_proto.op());
        request.set_clientid(entry_proto.opid().clientid());
        request.set_clientreqid(entry_proto.opid().clientreqid());
        proto::RecordEntryState state = entry_proto.state();
        proto::RecordEntryType type = entry_proto.type();
        const std::string& result = entry_proto.result();
        Add(view, opid, request, state, type, result);
    }
}

RecordEntry &
Record::Add(const RecordEntry& entry) {
    // Make sure this isn't a duplicate
    ASSERT(entries.count(opid) == 0);
    ASSERT(entries.count(entry.opid) == 0);
    entries[entry.opid] = entry;
    return entries[entry.opid];
}

    entries[opid] = entry;
    return entries[opid];
RecordEntry &
Record::Add(view_t view, opid_t opid, const Request &request,
            proto::RecordEntryState state, proto::RecordEntryType type)
{
    return Add(RecordEntry(view, opid, state, type, request, ""));
}

RecordEntry &
Record::Add(view_t view, opid_t opid, const Request &request,
            RecordEntryState state, const string &result)
            proto::RecordEntryState state, proto::RecordEntryType type,
            const string &result)
{
    RecordEntry entry = Add(view, opid, request, state);
    RecordEntry &entry = Add(view, opid, request, state, type);
    entry.result = result;

    return entries[opid];
}

@@ -76,7 +93,7 @@ Record::Find(opid_t opid)


bool
Record::SetStatus(opid_t op, RecordEntryState state)
Record::SetStatus(opid_t op, proto::RecordEntryState state)
{
    RecordEntry *entry = Find(op);
    if (entry == NULL) {
@@ -123,5 +140,26 @@ Record::Empty() const
    return entries.empty();
}

void
Record::ToProto(proto::RecordProto *proto) const
{
    for (const std::pair<const opid_t, RecordEntry> &p : entries) {
        const RecordEntry &entry = p.second;
        proto::RecordEntryProto *entry_proto = proto->add_entry();

        entry_proto->set_view(entry.view);
        entry_proto->mutable_opid()->set_clientid(entry.opid.first);
        entry_proto->mutable_opid()->set_clientreqid(entry.opid.second);
        entry_proto->set_state(entry.state);
        entry_proto->set_type(entry.type);
        entry_proto->set_op(entry.request.op());
        entry_proto->set_result(entry.result);
    }
}

const std::map<opid_t, RecordEntry> &Record::Entries() const {
    return entries;
}

} // namespace ir
} // namespace replication
Original line number Diff line number Diff line
@@ -31,41 +31,47 @@
#ifndef _IR_RECORD_H_
#define _IR_RECORD_H_

#include "replication/common/request.pb.h"
#include <map>
#include <string>
#include <utility>

#include "lib/assert.h"
#include "lib/message.h"
#include "lib/transport.h"
#include "replication/common/request.pb.h"
#include "replication/common/viewstamp.h"

#include <map>
#include <string>
#include <utility>
#include "replication/ir/ir-proto.pb.h"

namespace replication {
namespace ir {

enum RecordEntryState {
    RECORD_STATE_TENTATIVE,
    RECORD_STATE_FINALIZED
};

typedef std::pair<uint64_t, uint64_t> opid_t;

struct RecordEntry
{
    view_t view;
    opid_t opid;
    RecordEntryState state;
    proto::RecordEntryState state;
    proto::RecordEntryType type;
    Request request;
    std::string result;

    RecordEntry() { result = ""; }
    RecordEntry(const RecordEntry &x)
        : view(x.view), opid(x.opid), state(x.state), request(x.request),
        : view(x.view),
          opid(x.opid),
          state(x.state),
          type(x.type),
          request(x.request),
          result(x.result) {}
    RecordEntry(view_t view, opid_t opid, RecordEntryState state,
                const Request &request, const std::string &result)
        : view(view), opid(opid), state(state), request(request),
    RecordEntry(view_t view, opid_t opid, proto::RecordEntryState state,
                proto::RecordEntryType type, const Request &request,
                const std::string &result)
        : view(view),
          opid(opid),
          state(state),
          type(type),
          request(request),
          result(result) {}
    virtual ~RecordEntry() {}
};
@@ -73,19 +79,41 @@ struct RecordEntry
class Record
{
public:
    // Use the copy-and-swap idiom to make Record moveable but not copyable
    // [1]. We make it non-copyable to avoid unnecessary copies.
    //
    // [1]: https://stackoverflow.com/a/3279550/3187068
    Record(){};
    RecordEntry & Add(view_t view, opid_t opid, const Request &request, RecordEntryState state);
    RecordEntry & Add(view_t view, opid_t opid, const Request &request, RecordEntryState state, const std::string &result);
    Record(const proto::RecordProto &record_proto);
    Record(Record &&other) : Record() { swap(*this, other); }
    Record(const Record &) = delete;
    Record &operator=(const Record &) = delete;
    Record &operator=(Record &&other) {
        swap(*this, other);
        return *this;
    }
    friend void swap(Record &x, Record &y) {
        std::swap(x.entries, y.entries);
    }

    RecordEntry &Add(const RecordEntry& entry);
    RecordEntry &Add(view_t view, opid_t opid, const Request &request,
                     proto::RecordEntryState state,
                     proto::RecordEntryType type);
    RecordEntry &Add(view_t view, opid_t opid, const Request &request,
                     proto::RecordEntryState state, proto::RecordEntryType type,
                     const std::string &result);
    RecordEntry *Find(opid_t opid);
    bool SetStatus(opid_t opid, RecordEntryState state);
    bool SetStatus(opid_t opid, proto::RecordEntryState state);
    bool SetResult(opid_t opid, const std::string &result);
    bool SetRequest(opid_t opid, const Request &req);
    void Remove(opid_t opid);
    bool Empty() const;
    void ToProto(proto::RecordProto *proto) const;
    const std::map<opid_t, RecordEntry> &Entries() const;

private:
    std::map<opid_t, RecordEntry> entries;

};

}      // namespace ir
Original line number Diff line number Diff line
@@ -349,16 +349,16 @@ void
VRReplica::ReceiveMessage(const TransportAddress &remote,
                          const string &type, const string &data)
{
    static RequestMessage request;
    static UnloggedRequestMessage unloggedRequest;
    static PrepareMessage prepare;
    static PrepareOKMessage prepareOK;
    static CommitMessage commit;
    static RequestStateTransferMessage requestStateTransfer;
    static StateTransferMessage stateTransfer;
    static StartViewChangeMessage startViewChange;
    static DoViewChangeMessage doViewChange;
    static StartViewMessage startView;
    RequestMessage request;
    UnloggedRequestMessage unloggedRequest;
    PrepareMessage prepare;
    PrepareOKMessage prepareOK;
    CommitMessage commit;
    RequestStateTransferMessage requestStateTransfer;
    StateTransferMessage stateTransfer;
    StartViewChangeMessage startViewChange;
    DoViewChangeMessage doViewChange;
    StartViewMessage startView;
    
    if (type == request.GetTypeName()) {
        request.ParseFromString(data);
@@ -513,6 +513,7 @@ VRReplica::HandleUnloggedRequest(const TransportAddress &remote,
    Debug("Received unlogged request %s", (char *)msg.req().op().c_str());

    ExecuteUnlogged(msg.req(), reply);
    reply.set_clientreqid(msg.req().clientreqid());

    if (!(transport->SendMessage(this, remote, reply)))
        Warning("Failed to send reply message");
Original line number Diff line number Diff line
syntax = "proto2";

import "replication/common/request.proto";

package replication.vr.proto;
@@ -19,6 +21,7 @@ message UnloggedRequestMessage {

message UnloggedReplyMessage {
    required bytes reply = 1;
    required uint64 clientreqid = 2;
}

message PrepareMessage {
Original line number Diff line number Diff line
@@ -28,7 +28,7 @@
 *
 **********************************************************************/

#include "kvstore.h"
#include "store/common/backend/kvstore.h"

using namespace std;

File changed.

Preview size limit exceeded, changes collapsed.

+118 −0

File added.

Preview size limit exceeded, changes collapsed.

+4 −0
Original line number Diff line number Diff line
f 1
replica breakout:51735
replica pitfall:51735
replica qbert:51735

ycsb-t/README.md

0 → 100644
+10 −0

File added.

Preview size limit exceeded, changes collapsed.

ycsb-t/checkstyle.xml

0 → 100644
+188 −0

File added.

Preview size limit exceeded, changes collapsed.

ycsb-t/run-tapir.sh

0 → 100755
+20 −0

File added.

Preview size limit exceeded, changes collapsed.

ycsb-t/tapir.sh

deleted100755 → 0
+0 −3

File deleted.

Preview size limit exceeded, changes collapsed.