Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • syslab/tapir
  • aaasz/tapir
  • ashmrtnz/tapir
3 results
Show changes
Commits on Source (22)
  • Irene Zhang's avatar
    Merge pull request #7 from maximecaron/master · 177425b0
    Irene Zhang authored
    fix building on Apple clang
    177425b0
  • Michael Whittaker's avatar
    A couple small lockserver client improvements. · 679fcb9f
    Michael Whittaker authored
    1. Previously, typing ` `, `lock`, or `unlock` into a lockserver
       client's REPL would cause the client to crash because of some
       `strtok` calls that were returning `NULL`. Now, this bug is fixed;
       `strtok` returns are checked to be `NULL`, and no input should crash
       the client.
    2. Previously, typing in an unrecognized command into the lockserver
       client's repl would print `Unknown command.. Try again!`. Now, it
       also prints out the set of legal commands `Usage: exit | q | lock
       <key> | unlock <key>`. This makes it a bit easier for someone
       tinkering around with TAPIR for the first time (like me) to know what
       to type.
    679fcb9f
  • Irene Zhang's avatar
    Merge pull request #8 from mwhittaker/lockserver_client_fixes · 46488d1b
    Irene Zhang authored
    A couple small lockserver client improvements.
    Unverified
    46488d1b
  • Naveen Kr. Sharma's avatar
    bug with nkeys and client->stats() · b4238459
    Naveen Kr. Sharma authored
    b4238459
  • Naveen Kr. Sharma's avatar
  • irene's avatar
    c56a1c7d
  • Michael Whittaker's avatar
    Added a gitignore to the project. · f113e42e
    Michael Whittaker authored
    Previously, directories like `.obj/` and files like `libtapir.so`
    weren't being ignored by git. Now they are! This .gitignore ignores vim,
    c++, and project specific stuff.
    f113e42e
  • Michael Whittaker's avatar
    Added debugging rule to Makefile. · 48807ff4
    Michael Whittaker authored
    Added a debugging Makefile rule stolen from [1]. With this rule, you can
    run make print-<VAR> to print out the contents of <VAR>. For example,
    
            make print-OBJS
            make print-PROTOOBJS
            make print-BINS
    
    [1]: https://blog.melski.net/2010/11/30/makefile-hacks-print-the-value-of-any-variable/
    48807ff4
  • Michael Whittaker's avatar
    Removed `goto fail` code with `unique_ptr`. · 4ceab25d
    Michael Whittaker authored
    Previously, `UDPTransport::SendMessageInternal` dynamically allocated a
    `char[]` and used a `goto fail` to make sure that it was properly
    deleted. Something like:
    
    ```c++
        char *buf = new char[100];
        if (...) {
            ...
            goto fail;
        } else if (...) {
            ...
            goto fail;
        } else {
            ...
        }
    
    fail:
         delete [] buf;
         return false;
    ```
    
    Now, the array is stored in a `unique_ptr` so that it's properly
    deallocated when the function returns, without needing the goto fail.
    4ceab25d
  • Irene Y Zhang's avatar
    Merge branch 'no_goto_fail' into 'master' · 26dbe6c2
    Irene Y Zhang authored
    Removed `goto fail` code with `unique_ptr`.
    
    See merge request !3
    26dbe6c2
  • Irene Y Zhang's avatar
    Merge branch 'makefile-print' into 'master' · 806863e6
    Irene Y Zhang authored
    Added debugging rule to Makefile.
    
    See merge request !2
    806863e6
  • Irene Y Zhang's avatar
    Merge branch 'gitignore' into 'master' · c6572926
    Irene Y Zhang authored
    Added a gitignore to the project.
    
    See merge request !1
    c6572926
  • Irene Y Zhang's avatar
    46667e69
  • Irene Y Zhang's avatar
    Update README.md with more dependencies · 0f9069e6
    Irene Y Zhang authored
    0f9069e6
  • Michael Whittaker's avatar
    Recovery & view change implementation + bug fixes. · 0cd92291
    Michael Whittaker authored
    Overview
    ========
    This PR implements the view change and recovery portion of IR. It also
    fixes a couple of important bugs and introduces a new mechanism with
    with to unit test IR.
    
    What's New?
    ===========
    This PR includes the following, in roughly descending order of
    importance:
    
    - The main contribution of this PR is the implementation of view changes
      and recovery. The implementation required some of the following
      things:
        - I introduce DoViewChangeMessage and StartViewMessage protobufs as
          well as protobufs for serializing records.
        - IR replicas have logic to periodically initiate a view change or
          initiate a view change upon recovery. They also have logic to
          handle DoViewChangeMessage and StartViewMessage messages.
        - IR replicas persist their view information, which is needed for
          recovery, using PeristentRegisters which persist data on disk.
        - When a client issues a PropseInconsistentMessage or
          ProposeConsistentMessage, the reply indicating whether the request
          has already been finalized. If it has, the clients respond
          appropriately.
        - For conensus requests, clients make sure that the majority of
          replies and majority of confirms come from the same view.
    - I fixed a slow path/fast path bug in IR clients. Previously, after a
      timeout, clients would transition from the fast path into the slow
      path. But, they would not wait for a majority of responses from
      replicas. Instead, they would call decide on whatever responses they
      had at the moment.
    - Previously, decide functions took in a set of replies, but they needed
      to take in a multiset of replies so that the decide function could do
      things like count the frequency of each reply.
    - I introduced a new simulated transport ReplTransport. It's pretty
      neat. It launches a command line REPL that you can use to manually
      control the execution of the system. You decide what timers to trigger
      and what messages to deliver. Then, you can save your execution as a
      unit test to run for later. I used the ReplTransport a lot to debug
      the view change and recovery algorithms since a lot of the corner
      cases are hard to trigger with the existing transport layers. I added
      some ReplTransport unit tests for the lock server.
    - I implemented Sync and Merge for the lock server. The lock server is
      now fully functional.
    - I generalized timeout callbacks to error callbacks which are invoked
      whenever an error (not just a timeout) is encountered. This was
      necessary to nicely handle some of the failure scenarios introduced by
      view changes (e.g. a client gets majority replies and majority
      confirms in different views for a consensus request).
    - I performed some miscellaneous cleanup here and there, fixing
      whitespace, changing raw pointers to smart pointers, stuff like that.
    
    What's Left?
    ============
    There are still some things left that this PR doesn't implement:
    
    - Currently during a view change, replicas send their entire records to
      one another. I'm guessing there are more efficient ways to transfer
      records between replicas, similar to how VR has some optimization
      tricks for log shipping.
    - I have not implemented Sync or Merge for TAPIR, only for the lock
      server.
    - Clients do not notify replicas of view changes when they receive
      replies from older views. Similarly, a replica never detects that its
      in a stale view or requests a master record from replica in a higher
      view. It just eventually does a view change to stay up-to-date.
    - None of the code has been profiled or optimized or anything like that.
      This PR focuses only on correctness, not performance.
    - There could be more unit tests.
    0cd92291
  • Irene Y Zhang's avatar
    Merge branch 'ir_recovery_squashed' into 'master' · 2d7aec71
    Irene Y Zhang authored
    Recovery & view change implementation + bug fixes.
    
    See merge request !7
    2d7aec71
  • Adriana Szekeres's avatar
    Fixed computation of fast quorum · 369d523d
    Adriana Szekeres authored
    369d523d
  • Adriana Szekeres's avatar
    2a91edde
  • Irene Y Zhang's avatar
  • Naveen Kr. Sharma's avatar
    99ptile latency · 72ced4e0
    Naveen Kr. Sharma authored
    72ced4e0
  • Irene Y Zhang's avatar
    adding OSS License · 35120562
    Irene Y Zhang authored
    35120562
  • Irene Y Zhang's avatar
    adding co authors · 656d1711
    Irene Y Zhang authored
    656d1711
Showing with 1396 additions and 199 deletions
.obj/
lockserver/client-main
lockserver/server-main
lockserver/lockserver-repl
store/benchmark/benchClient
store/benchmark/retwisClient
store/benchmark/terminalClient
store/strongstore/server
store/tapirstore/server
store/weakstore/server
timeserver/timeserver
lib/tests/configuration-test
lib/tests/simtransport-test
lockserver/tests/lockserver-test
replication/ir/tests/ir-test
replication/vr/tests/vr-test
store/common/backend/tests/kvstore-test
store/common/backend/tests/lockserver-test
store/common/backend/tests/versionstore-test
################################################################################
# vim
################################################################################
# Swap
[._]*.s[a-v][a-z]
[._]*.sw[a-p]
[._]s[a-v][a-z]
[._]sw[a-p]
# Session
Session.vim
# Temporary
.netrwhist
*~
# Auto-generated tag files
tags
################################################################################
# c++
################################################################################
# Prerequisites
*.d
# Compiled Object files
*.slo
*.lo
*.o
*.obj
# Precompiled Headers
*.gch
*.pch
# Compiled Dynamic libraries
*.so
*.dylib
*.dll
# Fortran module files
*.mod
*.smod
# Compiled Static libraries
*.lai
*.la
*.a
*.lib
# Executables
*.exe
*.out
*.app
The MIT License (MIT)
Copyright (c) 2019 Irene Zhang, Dan Ports, Naveen Kr. Sharma
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
......@@ -7,8 +7,8 @@ CXX = g++
LD = g++
EXPAND = lib/tmpl/expand
#CFLAGS := -g -Wall -pthread -iquote.obj/gen -Wno-uninitialized -O2 -DNASSERT
CFLAGS := -g -Wall -pthread -iquote.obj/gen -Wno-uninitialized
CFLAGS := -g -Wall -pthread -iquote.obj/gen -Wno-uninitialized -O2 -DNASSERT
#CFLAGS := -g -Wall -pthread -iquote.obj/gen -Wno-uninitialized
CXXFLAGS := -g -std=c++0x
LDFLAGS := -levent_pthreads
## Debian package: check
......@@ -237,6 +237,12 @@ clean:
$(call trace,RM,binaries,rm -f $(BINS) $(TEST_BINS))
$(call trace,RM,objects,rm -rf .obj)
#
# Debugging
#
print-%:
@echo '$*=$($*)'
##################################################################
# Targets
#
......
......@@ -52,7 +52,13 @@ The repo is structured as follows:
## Compiling & Running
You can compile all of the TAPIR executables by running make in the root directory
TAPIR depends on protobufs and libevent, so you will need those development libraries installed on your machine. On Linux, this can be done through apt.
TAPIR depends on protobufs, libevent and openssl, so you will need the following development libraries:
- libprotobuf-dev
- libevent-openssl
- libevent-pthreads
- libevent-dev
- libssl-dev
- protobuf-compiler
## Contact and Questions
Please email Irene at iyzhang@cs.washington.edu, Dan at drkp@cs.washington.edu and Naveen at naveenks@cs.washington.edu
......@@ -3,7 +3,8 @@ d := $(dir $(lastword $(MAKEFILE_LIST)))
SRCS += $(addprefix $(d), \
lookup3.cc message.cc memory.cc \
latency.cc configuration.cc transport.cc \
udptransport.cc tcptransport.cc simtransport.cc)
udptransport.cc tcptransport.cc simtransport.cc repltransport.cc \
persistent_register.cc)
PROTOS += $(addprefix $(d), \
latency-format.proto)
......@@ -24,8 +25,13 @@ LIB-transport := $(o)transport.o $(LIB-message) $(LIB-configuration)
LIB-simtransport := $(o)simtransport.o $(LIB-transport)
LIB-repltransport := $(o)repltransport.o $(LIB-transport)
LIB-udptransport := $(o)udptransport.o $(LIB-transport)
LIB-tcptransport := $(o)tcptransport.o $(LIB-transport)
LIB-persistent_register := $(o)persistent_register.o $(LIB-message)
include $(d)tests/Rules.mk
......@@ -35,6 +35,7 @@
#include <cstring>
#include <stdexcept>
#include <tuple>
namespace transport {
......@@ -50,6 +51,12 @@ ReplicaAddress::operator==(const ReplicaAddress &other) const {
(port == other.port));
}
bool
ReplicaAddress::operator<(const ReplicaAddress &other) const {
auto this_t = std::forward_as_tuple(host, port);
auto other_t = std::forward_as_tuple(other.host, other.port);
return this_t < other_t;
}
Configuration::Configuration(const Configuration &c)
: n(c.n), f(c.f), replicas(c.replicas), hasMulticast(c.hasMulticast)
......@@ -59,7 +66,7 @@ Configuration::Configuration(const Configuration &c)
multicastAddress = new ReplicaAddress(*c.multicastAddress);
}
}
Configuration::Configuration(int n, int f,
std::vector<ReplicaAddress> replicas,
ReplicaAddress *multicastAddress)
......@@ -80,7 +87,7 @@ Configuration::Configuration(std::ifstream &file)
f = -1;
hasMulticast = false;
multicastAddress = NULL;
while (!file.eof()) {
// Read a line
string line;
......@@ -108,7 +115,7 @@ Configuration::Configuration(std::ifstream &file)
}
} else if (strcasecmp(cmd.c_str(), "replica") == 0) {
unsigned int t2 = line.find_first_not_of(" \t", t1);
if (t2 == string::npos) {
if (t2 == string::npos) {
Panic ("'replica' configuration line requires an argument");
}
......@@ -208,8 +215,27 @@ Configuration::operator==(const Configuration &other) const
return false;
}
}
return true;
}
bool
Configuration::operator<(const Configuration &other) const {
auto this_t = std::forward_as_tuple(n, f, replicas, hasMulticast);
auto other_t = std::forward_as_tuple(other.n, other.f, other.replicas,
other.hasMulticast);
if (this_t < other_t) {
return true;
} else if (this_t == other_t) {
if (hasMulticast) {
return *multicastAddress < *other.multicastAddress;
} else {
return false;
}
} else {
// this_t > other_t
return false;
}
}
} // namespace transport
......@@ -52,6 +52,16 @@ struct ReplicaAddress
inline bool operator!=(const ReplicaAddress &other) const {
return !(*this == other);
}
bool operator<(const ReplicaAddress &other) const;
bool operator<=(const ReplicaAddress &other) const {
return *this < other || *this == other;
}
bool operator>(const ReplicaAddress &other) const {
return !(*this <= other);
}
bool operator>=(const ReplicaAddress &other) const {
return !(*this < other);
}
};
......@@ -69,10 +79,20 @@ public:
int QuorumSize() const;
int FastQuorumSize() const;
bool operator==(const Configuration &other) const;
inline bool operator!= (const Configuration &other) const {
inline bool operator!=(const Configuration &other) const {
return !(*this == other);
}
bool operator<(const Configuration &other) const;
bool operator<=(const Configuration &other) const {
return *this < other || *this == other;
}
bool operator>(const Configuration &other) const {
return !(*this <= other);
}
bool operator>=(const Configuration &other) const {
return !(*this < other);
}
public:
int n; // number of replicas
int f; // number of failures tolerated
......
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* persistent_register.cc: A disk-backed persistent register.
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "lib/persistent_register.h"
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <fstream>
#include <memory>
#include "lib/message.h"
bool PersistentRegister::Initialized() const
{
// Check to see if the file exists. If it doesn't, then we default to
// returning an empty string. Refer to [1] for some ways to check if a file
// exists in C++.
//
// [1]: https://stackoverflow.com/a/12774387/3187068
std::ifstream f(filename_.c_str());
return f.good();
}
std::string PersistentRegister::Read() const
{
if (!Initialized()) {
return "";
}
std::FILE *file = OpenFile(filename_, "rb");
// Seek to the end of the file and get it's size.
int success = std::fseek(file, 0, SEEK_END);
if (success != 0) {
Panic("Unable to fseek file %s", filename_.c_str());
}
long length = ftell(file);
if (length == -1) {
Panic("%s", std::strerror(errno));
}
// Seek back to the beginning of the file and read its contents. Now that
// we know the size, we can allocate an appropriately sized buffer.
success = std::fseek(file, 0, SEEK_SET);
if (success != 0) {
Panic("Unable to fseek file %s", filename_.c_str());
}
std::unique_ptr<char[]> buffer(new char[length]);
std::size_t num_read = std::fread(buffer.get(), length, 1, file);
if (num_read != 1) {
Panic("Unable to read file %s", filename_.c_str());
}
CloseFile(file);
return std::string(buffer.get(), length);
}
void PersistentRegister::Write(const std::string &s)
{
// Perform the write.
std::FILE *file = OpenFile(filename_, "wb");
std::size_t num_written =
std::fwrite(s.c_str(), sizeof(char), s.size(), file);
if (num_written != s.size()) {
Panic("Unable to write to file %s", filename_.c_str());
}
// Persist the write.
int fd = fileno(file);
if (fd == -1) {
Panic("%s", std::strerror(errno));
}
int success = fsync(fd);
if (success != 0) {
Panic("%s", std::strerror(errno));
}
CloseFile(file);
}
std::string PersistentRegister::Filename() { return filename_; }
std::FILE *PersistentRegister::OpenFile(const std::string &filename,
const std::string &mode)
{
std::FILE *file = std::fopen(filename.c_str(), mode.c_str());
if (file == nullptr) {
Panic("%s", std::strerror(errno));
}
return file;
}
void PersistentRegister::CloseFile(std::FILE *file)
{
int success = std::fclose(file);
if (success != 0) {
Panic("Unable to close file.");
}
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* persistent_register.h: A disk-backed persistent register.
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#ifndef _LIB_PERSISTENT_REGISTER_H_
#define _LIB_PERSISTENT_REGISTER_H_
#include <cstdio>
#include <string>
// A PersistentRegister is used to read and write a string that is persisted to
// disk. It's like a database for a single string value. Here's how you might
// use it.
//
// // Persist x to the file "x.bin".
// PersistentRegister x("x.bin");
//
// if (!x.Initialized()) {
// // If x has not yet been written, write "Hello, World!".
// x.Write("Hello, World!");
// } else {
// // If x has been written, read and print the value of x.
// std::cout << x.Read() << std::endl;
// }
//
// The first time this program is called, it will detect that x has not been
// written and will write "Hello, World!". The second time it's called, it will
// read and print "Hello, World!".
class PersistentRegister {
public:
PersistentRegister(const std::string &filename) : filename_(filename) {}
// Returns whether a PersistentRegister is initalized (i.e. the file into
// which the register is persisted exists).
bool Initialized() const;
// Read a value from the register, or return an empty string if the
// register is not initalized. Read panics on error.
std::string Read() const;
// Write a value to the register. Write panics on error.
void Write(const std::string &s);
// Return the filename in which the register is persisted.
std::string Filename();
private:
// Note that using C++ file IO, there is not really a way to ensure that
// data has been forced to disk [1]. Thus, our implementation of
// PersistentRegister uses C file IO so that it can use primitives like
// fsync.
//
// [1]: https://stackoverflow.com/q/676787/3187068
// `OpenFile(f, m)` calls `std::fopen(f, m)` but calls `Panic` on error.
static std::FILE *OpenFile(const std::string &filename,
const std::string &mode);
// `CloseFile(f, m)` calls `std::fclose(f)` but calls `Panic` on error.
static void CloseFile(std::FILE *file);
// The filename of the file that contains the persisted data.
const std::string filename_;
};
#endif // _LIB_PERSISTENT_REGISTER_H_
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* repltransport.cc: REPL-driven step-by-step simulated transport.
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include "lib/repltransport.h"
#include <iostream>
#include <iterator>
#include <sstream>
#include <string>
#include <tuple>
#include <utility>
namespace {
// https://stackoverflow.com/a/236803/3187068
template <typename Out>
void split(const std::string &s, char delim, Out result) {
std::stringstream ss(s);
std::string item;
while (std::getline(ss, item, delim)) {
*(result++) = item;
}
}
// https://stackoverflow.com/a/236803/3187068
std::vector<std::string> split(const std::string &s, char delim) {
std::vector<std::string> elems;
split(s, delim, std::back_inserter(elems));
return elems;
}
// https://stackoverflow.com/a/4654718/3187068
bool is_number(const std::string &s) {
std::string::const_iterator it = s.begin();
while (it != s.end() && std::isdigit(*it)) ++it;
return !s.empty() && it == s.end();
}
// https://stackoverflow.com/a/1494435/3187068
void string_replace(std::string *str, const std::string &oldStr,
const std::string &newStr) {
std::string::size_type pos = 0u;
while ((pos = str->find(oldStr, pos)) != std::string::npos) {
str->replace(pos, oldStr.length(), newStr);
pos += newStr.length();
}
}
} // namespace
void ReplTransport::Register(TransportReceiver *receiver,
const transport::Configuration &config,
int replicaIdx) {
// If replicaIdx is -1, then the registering receiver is a client.
// Otherwise, replicaIdx is in the range [0, config.n), and the registering
// receiver is a replica.
bool is_client = replicaIdx == -1;
if (is_client) {
// Create the client's address.
std::string port = std::to_string(client_id_);
auto repl_addr = new ReplTransportAddress("client", std::move(port));
receiver->SetAddress(repl_addr);
client_id_++;
// Register receiver.
receivers_[*repl_addr].receiver = receiver;
} else {
// Set the receiver's address.
transport::ReplicaAddress addr = config.replica(replicaIdx);
auto repl_addr = new ReplTransportAddress(addr.host, addr.port);
receiver->SetAddress(repl_addr);
// Register receiver.
receivers_[*repl_addr].receiver = receiver;
}
// Register with superclass.
RegisterConfiguration(receiver, config, replicaIdx);
}
int ReplTransport::Timer(uint64_t ms, timer_callback_t cb) {
timer_id_++;
ASSERT(timers_.count(timer_id_) == 0);
timers_[timer_id_] = cb;
return timer_id_;
}
bool ReplTransport::CancelTimer(int id) {
if (timers_.count(id) == 0) {
return false;
} else {
timers_.erase(id);
return true;
}
}
void ReplTransport::CancelAllTimers() {
timers_.clear();
}
bool ReplTransport::DeliverMessage(const ReplTransportAddress &addr,
int index) {
history_.push_back("transport.DeliverMessage({\"" + addr.Host() + "\", \"" +
addr.Port() + "\"}, " + std::to_string(index) + ");");
ASSERT(receivers_.count(addr) != 0);
TransportReceiverState &state = receivers_[addr];
// If the recipient of this address hasn't yet been registered, then
// state.receiver is null.
if (state.receiver == nullptr) {
return false;
}
// Deliver the message.
const QueuedMessage &m = state.msgs.at(index);
string data;
m.msg->SerializeToString(&data);
state.receiver->ReceiveMessage(m.src, m.msg->GetTypeName(), data);
return true;
}
void ReplTransport::TriggerTimer(int timer_id) {
history_.push_back("transport.TriggerTimer(" + std::to_string(timer_id) +
");");
ASSERT(timers_.count(timer_id) != 0);
timers_[timer_id]();
}
void ReplTransport::PrintState() const {
// Show the history.
std::cout << "- History" << std::endl;
for (const std::string &command : history_) {
std::cout << " " << command << std::endl;
}
// Show the timers.
std::cout << "- Timers" << std::endl;
for (const std::pair<const int, timer_callback_t> &p : timers_) {
std::cout << " - [" << p.first << "]" << std::endl;
}
// Show the message buffers.
for (const std::pair<const ReplTransportAddress, TransportReceiverState>
&p : receivers_) {
const ReplTransportAddress &addr = p.first;
const TransportReceiverState &state = p.second;
std::cout << "- " << addr;
if (state.receiver == nullptr) {
std::cout << " [not registered]";
}
std::cout << std::endl;
for (std::size_t i = 0; i < state.msgs.size(); ++i) {
const Message *msg = state.msgs[i].msg.get();
std::string debug = msg->DebugString();
string_replace(&debug, "\n", "\n ");
std::cout << " - [" << i << "] " << msg->GetTypeName() << std::endl
<< " " << debug << std::endl;
}
}
}
bool ReplTransport::RunOne() {
// Parse response.
while (true) {
// Prompt user and read response.
std::cout << "> " << std::flush;
std::string line;
std::getline(std::cin, line);
if (std::cin.fail() || std::cin.eof()) {
return true;
}
std::vector<std::string> words = split(line, ' ');
const std::string usage =
"Usage: quit | show | <timer_id> | <host> <port> <index>";
if (words.size() == 1) {
if (words[0] == "quit") {
return true;
}
if (words[0] == "show") {
PrintState();
return false;
}
if (is_number(words[0])) {
int timer_id = std::stoi(words[0]);
TriggerTimer(timer_id);
return false;
} else {
std::cout << usage << std::endl;
}
} else if (words.size() == 3) {
if (!is_number(words[2])) {
std::cout << usage << std::endl;
} else {
ReplTransportAddress addr(words[0], words[1]);
int index = std::stoi(words[2]);
if (receivers_.count(addr) == 0) {
std::cout << "Receiver not found." << std::endl;
} else {
DeliverMessage(addr, index);
return false;
}
}
} else {
std::cout << usage << std::endl;
}
}
}
void ReplTransport::Run() {
bool done = false;
while (!done) {
done = RunOne();
}
}
bool ReplTransport::SendMessageInternal(TransportReceiver *src,
const ReplTransportAddress &dst,
const Message &m,
bool multicast) {
// Multicast is not supported.
ASSERT(!multicast);
const ReplTransportAddress &repl_addr =
dynamic_cast<const ReplTransportAddress &>(src->GetAddress());
std::unique_ptr<Message> msg(m.New());
msg->CheckTypeAndMergeFrom(m);
receivers_[dst].msgs.push_back(QueuedMessage(repl_addr, std::move(msg)));
return true;
}
ReplTransportAddress ReplTransport::LookupAddress(
const transport::Configuration &cfg, int replicaIdx) {
transport::ReplicaAddress addr = cfg.replica(replicaIdx);
return ReplTransportAddress(addr.host, addr.port);
}
const ReplTransportAddress *ReplTransport::LookupMulticastAddress(
const transport::Configuration *cfg) {
return nullptr;
}
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* repltransport.h: REPL-driven step-by-step simulated transport.
*
* Copyright 2013-2015 Irene Zhang <iyzhang@cs.washington.edu>
* Naveen Kr. Sharma <naveenks@cs.washington.edu>
* Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
// Distributed algorithms have to handle arbitrary message delays, message
// loss, message reordering, node failure, network partitions, etc. However,
// these failure scenarios are rare, which can make it difficult to flesh out
// all the corner cases of a distributed algorithm.
//
// Take IR for example. If we want to trigger the IR-MERGE-RECORDS function to
// run with a non-empty d and a non-empty u, we have to
// 1. propose two separate messages,
// 2. deliver one to a supermajority,
// 3. deliver the other to a simple majority,
// 4. prevent both messages from being finalized, and
// 5. trigger a view change.
//
// ReplTransport is a simulated transport, like SimulatedTransport, that lets
// users manipulate every aspect of the execution of a distributed system. When
// run, a ReplTransport opens up a REPL with which users can use to trigger an
// arbitrary timeout or deliver an arbitrary message to a receiver.
//
// For example, imagine a simple distributed system with two nodes: ping
// (localhost:8000) and pong (localhost:9000). Initially, ping sends a message
// to pong, pong replies with a message, ping replies with another message, and
// so on. If a node hasn't heard from the other after some timeout, it resends
// its message. An interaction with a ReplTransport would look something like
// this (comments inline):
//
// $ ./ping_pong
// > show # show the state
// - History # A history of all commands (empty at first)
// - Timers # A list of all timer ids
// - [1] # ping's timeout
// - [2] # pong's timeout
// - localhost:8000 # ping (no pending messages)
// - localhost:9000 # pong
// - [0] PingMessage # pong's pending message from ping
//
// > localhost 9000 0 # Deliver the 0th message to pong
// > show
// - History # A history of all executed commands
// transport.DeliverMessage({"localhost", "9000"}, 0);
// - Timers
// - [1]
// - [2]
// - localhost:8000
// - [0] PongMessage # pings's pending message from pong
// - localhost:9000
// - [0] PingMessage # Notice that this message wasn't removed. We
// # can deliver the same message multiple times.
//
// > localhost 8000 0 # Deliver the 0th message to ping
// > show
// - History
// transport.DeliverMessage({"localhost", "9000"}, 0);
// transport.DeliverMessage({"localhost", "8000"}, 0);
// - Timers
// - [1]
// - [2]
// - localhost:8000
// - [0] PongMessage
// - localhost:9000
// - [0] PingMessage
// - [1] PingMessage
//
// > 1 # Trigger ping's timeout
// > show
// - History
// transport.DeliverMessage({"localhost", "9000"}, 0);
// transport.DeliverMessage({"localhost", "8000"}, 0);
// transport.TriggerTimer(1);
// - Timers
// - [1]
// - [2]
// - localhost:8000
// - [0] PongMessage
// - localhost:9000
// - [0] PingMessage
// - [1] PingMessage
// - [2] PingMessage # ping resent its message to pong
//
// > quit
//
// Also notice that the ReplTransport prints out a history of the executed
// commands. You can copy and paste these commands into your code to replay
// your interaction with the REPL.
#ifndef _LIB_REPLTRANSPORT_H_
#define _LIB_REPLTRANSPORT_H_
#include <functional>
#include <map>
#include <memory>
#include <ostream>
#include <string>
#include <tuple>
#include "lib/configuration.h"
#include "lib/transport.h"
#include "lib/transportcommon.h"
class ReplTransportAddress : public TransportAddress {
public:
// Constructors.
ReplTransportAddress() {}
ReplTransportAddress(std::string host, std::string port)
: host_(std::move(host)), port_(std::move(port)) {}
ReplTransportAddress(const ReplTransportAddress &other)
: ReplTransportAddress(other.host_, other.port_) {}
ReplTransportAddress(ReplTransportAddress &&other)
: ReplTransportAddress() {
swap(*this, other);
}
ReplTransportAddress &operator=(ReplTransportAddress other) {
swap(*this, other);
return *this;
}
friend void swap(ReplTransportAddress &x, ReplTransportAddress &y) {
std::swap(x.host_, y.host_);
std::swap(x.port_, y.port_);
}
// Comparators.
bool operator==(const ReplTransportAddress &other) const {
return Key() == other.Key();
}
bool operator!=(const ReplTransportAddress &other) const {
return Key() != other.Key();
}
bool operator<(const ReplTransportAddress &other) const {
return Key() < other.Key();
}
bool operator<=(const ReplTransportAddress &other) const {
return Key() <= other.Key();
}
bool operator>(const ReplTransportAddress &other) const {
return Key() > other.Key();
}
bool operator>=(const ReplTransportAddress &other) const {
return Key() >= other.Key();
}
// Getters.
const std::string& Host() const {
return host_;
}
const std::string& Port() const {
return port_;
}
ReplTransportAddress *clone() const override {
return new ReplTransportAddress(host_, port_);
}
friend std::ostream &operator<<(std::ostream &out,
const ReplTransportAddress &addr) {
out << addr.host_ << ":" << addr.port_;
return out;
}
private:
std::tuple<const std::string&, const std::string&> Key() const {
return std::forward_as_tuple(host_, port_);
}
std::string host_;
std::string port_;
};
class ReplTransport : public TransportCommon<ReplTransportAddress> {
public:
void Register(TransportReceiver *receiver,
const transport::Configuration &config,
int replicaIdx) override;
int Timer(uint64_t ms, timer_callback_t cb) override;
bool CancelTimer(int id) override;
void CancelAllTimers() override;
// DeliverMessage(addr, i) delivers the ith queued inbound message to the
// receiver with address addr. It's possible to send a message to the
// address of a receiver that hasn't yet registered. In this case,
// DeliverMessage returns false. Otherwise, it returns true.
bool DeliverMessage(const ReplTransportAddress& addr, int index);
// Run timer with id timer_id.
void TriggerTimer(int timer_id);
// Launch the REPL.
void Run();
protected:
bool SendMessageInternal(TransportReceiver *src,
const ReplTransportAddress &dst, const Message &m,
bool multicast = false) override;
ReplTransportAddress LookupAddress(const transport::Configuration &cfg,
int replicaIdx) override;
const ReplTransportAddress *LookupMulticastAddress(
const transport::Configuration *cfg) override;
private:
// Prompt the user for input and either (1) trigger a timer, (2) deliver a
// message, or (3) quit. RunOne returns true if the user decides to quit.
bool RunOne();
// Pretty print the current state of the system. For example, PrintState
// prints the queued messages for every node in the system.
void PrintState() const;
struct QueuedMessage {
ReplTransportAddress src;
std::unique_ptr<Message> msg;
QueuedMessage(ReplTransportAddress src, std::unique_ptr<Message> msg)
: src(std::move(src)), msg(std::move(msg)) {}
};
struct TransportReceiverState {
// receiver can be null if it has queued messages but hasn't yet been
// registered with a ReplTransport.
TransportReceiver *receiver;
// Queued inbound messages.
std::vector<QueuedMessage> msgs;
};
// receivers_ maps a receiver r's address to r and r's queued messages.
std::map<ReplTransportAddress, TransportReceiverState> receivers_;
// timer_id_ is an incrementing counter used to assign timer ids.
int timer_id_ = 0;
// timers_ maps timer ids to timers.
std::map<int, timer_callback_t> timers_;
// client_id_ is an incrementing counter used to assign addresses to
// clients. The first client gets address client:0, the next client gets
// address client:1, etc.
int client_id_ = 0;
// A history of all the command issued to this ReplTransport.
std::vector<std::string> history_;
};
#endif // _LIB_REPLTRANSPORT_H_
......@@ -38,7 +38,7 @@
SimulatedTransportAddress::SimulatedTransportAddress(int addr)
: addr(addr)
{
}
int
......@@ -70,7 +70,7 @@ SimulatedTransport::SimulatedTransport()
SimulatedTransport::~SimulatedTransport()
{
}
void
......@@ -99,12 +99,12 @@ SimulatedTransport::SendMessageInternal(TransportReceiver *src,
bool multicast)
{
ASSERT(!multicast);
int dst = dstAddr.addr;
Message *msg = m.New();
msg->CheckTypeAndMergeFrom(m);
int srcAddr =
dynamic_cast<const SimulatedTransportAddress &>(src->GetAddress()).addr;
......@@ -119,11 +119,11 @@ SimulatedTransport::SendMessageInternal(TransportReceiver *src,
return true;
}
}
string msgData;
msg->SerializeToString(&msgData);
delete msg;
QueuedMessage q(dst, srcAddr, m.GetTypeName(), msgData);
if (delay == 0) {
......@@ -133,7 +133,7 @@ SimulatedTransport::SendMessageInternal(TransportReceiver *src,
queue.push_back(q);
});
}
return true;
return true;
}
SimulatedTransportAddress
......@@ -144,7 +144,7 @@ SimulatedTransport::LookupAddress(const transport::Configuration &cfg,
// idx match. This is the least efficient possible way to do this,
// but that's why this is the simulated transport not the real
// one... (And we only have to do this once at runtime.)
for (auto & kv : configurations) {
if (*(kv.second) == cfg) {
......@@ -157,7 +157,7 @@ SimulatedTransport::LookupAddress(const transport::Configuration &cfg,
}
}
}
Panic("No replica %d was registered", idx);
}
......@@ -171,7 +171,7 @@ void
SimulatedTransport::Run()
{
LookupAddresses();
do {
// Process queue
while (!queue.empty()) {
......@@ -190,7 +190,7 @@ SimulatedTransport::Run()
timers.erase(iter);
cb();
}
// ...then retry to see if there are more queued messages to
// deliver first
} while (!queue.empty() || (processTimers && !timers.empty()));
......@@ -234,7 +234,7 @@ SimulatedTransport::CancelTimer(int id)
iter++;
}
}
return found;
}
......
......@@ -85,6 +85,7 @@ TCPTransport::LookupAddress(const transport::ReplicaAddress &addr)
{
int res;
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = 0;
......@@ -196,6 +197,13 @@ TCPTransport::ConnectTCP(TransportReceiver *src, const TCPTransportAddress &dst)
PWarning("Failed to set O_NONBLOCK on outgoing TCP socket");
}
// Set TCP_NODELAY
int n = 1;
if (setsockopt(fd, IPPROTO_TCP,
TCP_NODELAY, (char *)&n, sizeof(n)) < 0) {
PWarning("Failed to set TCP_NODELAY on TCP listening socket");
}
TCPTransportTCPListener *info = new TCPTransportTCPListener();
info->transport = this;
info->acceptFd = 0;
......@@ -222,6 +230,15 @@ TCPTransport::ConnectTCP(TransportReceiver *src, const TCPTransportAddress &dst)
Panic("Failed to enable bufferevent");
}
// Tell the receiver its address
struct sockaddr_in sin;
socklen_t sinsize = sizeof(sin);
if (getsockname(fd, (sockaddr *) &sin, &sinsize) < 0) {
PPanic("Failed to get socket name");
}
TCPTransportAddress *addr = new TCPTransportAddress(sin);
src->SetAddress(addr);
tcpOutgoing[dst] = bev;
tcpAddresses.insert(pair<struct bufferevent*, TCPTransportAddress>(bev,dst));
......@@ -525,6 +542,13 @@ TCPTransport::TCPAcceptCallback(evutil_socket_t fd, short what, void *arg)
PWarning("Failed to set O_NONBLOCK");
}
// Set TCP_NODELAY
int n = 1;
if (setsockopt(newfd, IPPROTO_TCP,
TCP_NODELAY, (char *)&n, sizeof(n)) < 0) {
PWarning("Failed to set TCP_NODELAY on TCP listening socket");
}
// Create a buffered event
bev = bufferevent_socket_new(transport->libeventBase, newfd,
BEV_OPT_CLOSE_ON_FREE);
......@@ -552,59 +576,62 @@ TCPTransport::TCPReadableCallback(struct bufferevent *bev, void *arg)
Debug("Readable on bufferevent %p", bev);
uint32_t *magic;
magic = (uint32_t *)evbuffer_pullup(evbuf, sizeof(*magic));
if (magic == NULL) {
return;
}
ASSERT(*magic == MAGIC);
size_t *sz;
unsigned char *x = evbuffer_pullup(evbuf, sizeof(*magic) + sizeof(*sz));
sz = (size_t *) (x + sizeof(*magic));
if (x == NULL) {
return;
}
size_t totalSize = *sz;
ASSERT(totalSize < 1073741826);
if (evbuffer_get_length(evbuf) < totalSize) {
Debug("Don't have %ld bytes for a message yet, only %ld",
totalSize, evbuffer_get_length(evbuf));
return;
}
Debug("Receiving %ld byte message", totalSize);
char buf[totalSize];
size_t copied = evbuffer_remove(evbuf, buf, totalSize);
ASSERT(copied == totalSize);
// Parse message
char *ptr = buf + sizeof(*sz) + sizeof(*magic);
size_t typeLen = *((size_t *)ptr);
ptr += sizeof(size_t);
ASSERT((size_t)(ptr-buf) < totalSize);
ASSERT((size_t)(ptr+typeLen-buf) < totalSize);
string msgType(ptr, typeLen);
ptr += typeLen;
size_t msgLen = *((size_t *)ptr);
ptr += sizeof(size_t);
ASSERT((size_t)(ptr-buf) < totalSize);
while (evbuffer_get_length(evbuf) > 0) {
uint32_t *magic;
magic = (uint32_t *)evbuffer_pullup(evbuf, sizeof(*magic));
if (magic == NULL) {
return;
}
ASSERT(*magic == MAGIC);
ASSERT((size_t)(ptr+msgLen-buf) <= totalSize);
string msg(ptr, msgLen);
ptr += msgLen;
auto addr = transport->tcpAddresses.find(bev);
ASSERT(addr != transport->tcpAddresses.end());
// Dispatch
info->receiver->ReceiveMessage(addr->second, msgType, msg);
Debug("Done processing large %s message", msgType.c_str());
size_t *sz;
unsigned char *x = evbuffer_pullup(evbuf, sizeof(*magic) + sizeof(*sz));
sz = (size_t *) (x + sizeof(*magic));
if (x == NULL) {
return;
}
size_t totalSize = *sz;
ASSERT(totalSize < 1073741826);
if (evbuffer_get_length(evbuf) < totalSize) {
Debug("Don't have %ld bytes for a message yet, only %ld",
totalSize, evbuffer_get_length(evbuf));
return;
}
Debug("Receiving %ld byte message", totalSize);
char buf[totalSize];
size_t copied = evbuffer_remove(evbuf, buf, totalSize);
ASSERT(copied == totalSize);
// Parse message
char *ptr = buf + sizeof(*sz) + sizeof(*magic);
size_t typeLen = *((size_t *)ptr);
ptr += sizeof(size_t);
ASSERT((size_t)(ptr-buf) < totalSize);
ASSERT((size_t)(ptr+typeLen-buf) < totalSize);
string msgType(ptr, typeLen);
ptr += typeLen;
size_t msgLen = *((size_t *)ptr);
ptr += sizeof(size_t);
ASSERT((size_t)(ptr-buf) < totalSize);
ASSERT((size_t)(ptr+msgLen-buf) <= totalSize);
string msg(ptr, msgLen);
ptr += msgLen;
auto addr = transport->tcpAddresses.find(bev);
ASSERT(addr != transport->tcpAddresses.end());
// Dispatch
info->receiver->ReceiveMessage(addr->second, msgType, msg);
Debug("Done processing large %s message", msgType.c_str());
}
}
void
......
......@@ -38,6 +38,7 @@
#include <event2/event.h>
#include <event2/thread.h>
#include <memory>
#include <random>
#include <arpa/inet.h>
......@@ -380,7 +381,8 @@ UDPTransport::Register(TransportReceiver *receiver,
}
static size_t
SerializeMessage(const ::google::protobuf::Message &m, char **out)
SerializeMessage(const ::google::protobuf::Message &m,
std::unique_ptr<char[]> *out)
{
string data = m.SerializeAsString();
string type = m.GetTypeName();
......@@ -389,8 +391,9 @@ SerializeMessage(const ::google::protobuf::Message &m, char **out)
ssize_t totalLen = (typeLen + sizeof(typeLen) +
dataLen + sizeof(dataLen));
char *buf = new char[totalLen];
std::unique_ptr<char[]> unique_buf(new char[totalLen]);
char *buf = unique_buf.get();
char *ptr = buf;
*((size_t *) ptr) = typeLen;
ptr += sizeof(size_t);
......@@ -404,8 +407,8 @@ SerializeMessage(const ::google::protobuf::Message &m, char **out)
ASSERT(ptr+dataLen-buf == totalLen);
memcpy(ptr, data.c_str(), dataLen);
ptr += dataLen;
*out = buf;
*out = std::move(unique_buf);
return totalLen;
}
......@@ -418,11 +421,12 @@ UDPTransport::SendMessageInternal(TransportReceiver *src,
sockaddr_in sin = dynamic_cast<const UDPTransportAddress &>(dst).addr;
// Serialize message
char *buf;
size_t msgLen = SerializeMessage(m, &buf);
std::unique_ptr<char[]> unique_buf;
size_t msgLen = SerializeMessage(m, &unique_buf);
char *buf = unique_buf.get();
int fd = fds[src];
// XXX All of this assumes that the socket is going to be
// available for writing, which since it's a UDP socket it ought
// to be.
......@@ -430,7 +434,7 @@ UDPTransport::SendMessageInternal(TransportReceiver *src,
if (sendto(fd, buf, msgLen, 0,
(sockaddr *)&sin, sizeof(sin)) < 0) {
PWarning("Failed to send message");
goto fail;
return false;
}
} else {
int numFrags = ((msgLen-1) / MAX_UDP_MESSAGE_SIZE) + 1;
......@@ -453,22 +457,17 @@ UDPTransport::SendMessageInternal(TransportReceiver *src,
*((size_t *)ptr) = msgLen;
ptr += sizeof(size_t);
memcpy(ptr, &buf[fragStart], fragLen);
if (sendto(fd, fragBuf, fragLen + fragHeaderLen, 0,
(sockaddr *)&sin, sizeof(sin)) < 0) {
PWarning("Failed to send message fragment %ld",
fragStart);
goto fail;
return false;
}
}
}
}
delete [] buf;
return true;
fail:
delete [] buf;
return false;
}
void
......
d := $(dir $(lastword $(MAKEFILE_LIST)))
SRCS += $(addprefix $(d), server.cc client.cc)
SRCS += $(addprefix $(d), \
server.cc client.cc server-main.cc client-main.cc \
lockserver-repl.cc)
PROTOS += $(addprefix $(d), locks-proto.proto)
$(d)server: $(LIB-udptransport) $(OBJS-ir-replica) $(o)locks-proto.o $(o)server.o
$(d)server-main: $(o)server-main.o \
$(o)locks-proto.o \
$(o)server.o \
$(LIB-udptransport) \
$(OBJS-ir-replica)
$(d)client: $(LIB-udptransport) $(OBJS-ir-client) $(LIB-store-common) \
$(o)locks-proto.o $(o)client.o
$(d)client-main: $(o)client-main.o \
$(o)locks-proto.o \
$(o)client.o \
$(LIB-udptransport) \
$(OBJS-ir-client) \
$(LIB-store-common)
BINS += $(d)server $(d)client
$(d)lockserver-repl: $(o)lockserver-repl.o \
$(o)locks-proto.o \
$(o)server.o \
$(o)client.o \
$(OBJS-ir-replica) \
$(OBJS-ir-client) \
$(LIB-configuration) \
$(LIB-repltransport) \
$(LIB-store-common) \
$(GTEST_MAIN)
BINS += $(d)server-main $(d)client-main $(d)lockserver-repl
include $(d)tests/Rules.mk
#include <thread>
#include "lockserver/client.h"
#include "lib/udptransport.h"
namespace {
void
usage()
{
printf("Unknown command.. Try again!\n");
printf("Usage: exit | q | lock <key> | unlock <key>\n");
}
} // namespace
int
main(int argc, char **argv)
{
const char *configPath = NULL;
// Parse arguments
int opt;
while ((opt = getopt(argc, argv, "c:")) != -1) {
switch (opt) {
case 'c':
configPath = optarg;
break;
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
}
}
if (!configPath) {
fprintf(stderr, "option -c is required\n");
return EXIT_FAILURE;
}
// Load configuration
std::ifstream configStream(configPath);
if (configStream.fail()) {
Panic("Unable to read configuration file: %s\n", configPath);
}
transport::Configuration config(configStream);
// Create lock client.
UDPTransport transport(0.0, 0.0, 0);
lockserver::LockClient locker(&transport, config);
std::thread run_transport([&transport]() { transport.Run(); });
char c, cmd[2048], *tok;
int clen, status;
string key, value;
while (1) {
printf(">> ");
fflush(stdout);
clen = 0;
while ((c = getchar()) != '\n')
cmd[clen++] = c;
cmd[clen] = '\0';
tok = strtok(cmd, " ,.-");
if (tok == NULL) continue;
if (strcasecmp(tok, "exit") == 0 || strcasecmp(tok, "q") == 0) {
printf("Exiting..\n");
break;
} else if (strcasecmp(tok, "lock") == 0) {
tok = strtok(NULL, " ,.-");
if (tok == NULL) {
usage();
continue;
}
key = string(tok);
status = locker.lock(key);
if (status) {
printf("Lock Successful\n");
} else {
printf("Failed to acquire lock..\n");
}
} else if (strcasecmp(tok, "unlock") == 0) {
tok = strtok(NULL, " ,.-");
if (tok == NULL) {
usage();
continue;
}
key = string(tok);
locker.unlock(key);
printf("Unlock Successful\n");
} else {
usage();
}
fflush(stdout);
}
transport.Stop();
run_transport.join();
return EXIT_SUCCESS;
}
......@@ -30,88 +30,14 @@
#include "lockserver/client.h"
int
main(int argc, char **argv)
{
const char *configPath = NULL;
// Parse arguments
int opt;
while ((opt = getopt(argc, argv, "c:")) != -1) {
switch (opt) {
case 'c':
configPath = optarg;
break;
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
}
}
if (!configPath) {
fprintf(stderr, "option -c is required\n");
return EXIT_FAILURE;
}
lockserver::LockClient locker(configPath);
char c, cmd[2048], *tok;
int clen, status;
string key, value;
while (1) {
printf(">> ");
fflush(stdout);
clen = 0;
while ((c = getchar()) != '\n')
cmd[clen++] = c;
cmd[clen] = '\0';
if (clen == 0) continue;
tok = strtok(cmd, " ,.-");
if (strcasecmp(tok, "exit") == 0 || strcasecmp(tok, "q") == 0) {
printf("Exiting..\n");
break;
} else if (strcasecmp(tok, "lock") == 0) {
tok = strtok(NULL, " ,.-");
key = string(tok);
status = locker.lock(key);
if (status) {
printf("Lock Successful\n");
} else {
printf("Failed to acquire lock..\n");
}
} else if (strcasecmp(tok, "unlock") == 0) {
tok = strtok(NULL, " ,.-");
key = string(tok);
locker.unlock(key);
printf("Unlock Successful\n");
} else {
printf("Unknown command.. Try again!\n");
}
fflush(stdout);
}
return EXIT_SUCCESS;
}
namespace lockserver {
using namespace std;
using namespace proto;
LockClient::LockClient(const string &configPath) : transport(0.0, 0.0, 0)
{
// Load configuration
std::ifstream configStream(configPath);
if (configStream.fail()) {
Panic("Unable to read configuration file: %s\n", configPath.c_str());
}
transport::Configuration config(configStream);
LockClient::LockClient(Transport *transport,
const transport::Configuration &config)
: transport(transport) {
client_id = 0;
while (client_id == 0) {
random_device rd;
......@@ -120,23 +46,14 @@ LockClient::LockClient(const string &configPath) : transport(0.0, 0.0, 0)
client_id = dis(gen);
}
client = new replication::ir::IRClient(config, &transport, client_id);
/* Run the transport in a new thread. */
clientTransport = new thread(&LockClient::run_client, this);
client = new replication::ir::IRClient(config, transport, client_id);
}
LockClient::~LockClient() { }
void
LockClient::run_client()
{
transport.Run();
}
bool
LockClient::lock(const string &key)
{
LockClient::lock_async(const std::string &key) {
ASSERT(waiting == nullptr);
Debug("Sending LOCK");
string request_str;
......@@ -147,19 +64,29 @@ LockClient::lock(const string &key)
request.SerializeToString(&request_str);
waiting = new Promise(1000);
transport.Timer(0, [=]() {
transport->Timer(0, [=]() {
client->InvokeConsensus(request_str,
bind(&LockClient::Decide,
this,
placeholders::_1),
bind(&LockClient::LockCallback,
this,
placeholders::_1,
placeholders::_2),
bind(&LockClient::ErrorCallback,
this,
placeholders::_1,
placeholders::_2));
});
}
bool
LockClient::lock_wait() {
ASSERT(waiting != nullptr);
int status = waiting->GetReply();
delete waiting;
waiting = nullptr;
if (status == 0) {
return true;
......@@ -170,8 +97,8 @@ LockClient::lock(const string &key)
}
void
LockClient::unlock(const string &key)
{
LockClient::unlock_async(const std::string &key) {
ASSERT(waiting == nullptr);
Debug("Sending UNLOCK");
string request_str;
......@@ -182,31 +109,53 @@ LockClient::unlock(const string &key)
request.SerializeToString(&request_str);
waiting = new Promise(1000);
transport.Timer(0, [=]() {
transport->Timer(0, [=]() {
client->InvokeInconsistent(request_str,
bind(&LockClient::UnlockCallback,
this,
placeholders::_1,
placeholders::_2));
});
}
void
LockClient::unlock_wait() {
waiting->GetReply();
delete waiting;
waiting = nullptr;
}
bool
LockClient::lock(const string &key)
{
lock_async(key);
return lock_wait();
}
void
LockClient::unlock(const string &key)
{
unlock_async(key);
return unlock_wait();
}
string
LockClient::Decide(const set<string> &results)
LockClient::Decide(const map<string, std::size_t> &results)
{
// If a majority say lock, we say lock.
int success_count = 0;
string key;
for (string s : results) {
for (const auto& string_and_count : results) {
const string& s = string_and_count.first;
const std::size_t count = string_and_count.second;
Reply reply;
reply.ParseFromString(s);
key = reply.key();
if (reply.status() == 0)
success_count ++;
if (reply.status() == 0) {
success_count += count;
}
}
string final_reply_str;
......@@ -243,4 +192,15 @@ LockClient::UnlockCallback(const std::string &request_str, const std::string &re
w->Reply(0);
}
void
LockClient::ErrorCallback(const std::string &request_str,
replication::ErrorCode err)
{
Debug("Error Callback: %s %s", request_str.c_str(),
replication::ErrorCodeToString(err).c_str());
Promise *w = waiting;
waiting = NULL;
w->Reply(-3); // Invalid command.
}
} // namespace lockserver
......@@ -33,49 +33,65 @@
#include "lib/assert.h"
#include "lib/message.h"
#include "lib/udptransport.h"
#include "lib/transport.h"
#include "replication/ir/client.h"
#include "store/common/promise.h"
#include "lockserver/locks-proto.pb.h"
#include <map>
#include <set>
#include <string>
#include <thread>
#include <random>
namespace lockserver {
class LockClient
{
public:
LockClient(const std::string &configPath);
LockClient(Transport* transport, const transport::Configuration &config);
~LockClient();
// Synchronously lock and unlock. Calling lock (or unlock) will block until
// the lock (or unlock) request is fully processed.
bool lock(const std::string &key);
void unlock(const std::string &key);
// Asynchronously lock and unlock. Calling lock_async or unlock_async will
// not block. Calling lock_wait (or unlock_wait) will block for the
// previous invocation of lock_async (or unlock_async) to complete.
//
// All async calls must be followed by a corresponding wait call. It is an
// error to issue multiple async requests without waiting. It is also
// erroneous to wait for a request which was never issued.
void lock_async(const std::string &key);
bool lock_wait();
void unlock_async(const std::string &key);
void unlock_wait();
private:
/* Unique ID for this client. */
uint64_t client_id;
/* Transport layer and thread. */
UDPTransport transport;
std::thread *clientTransport;
Transport *transport;
/* Function to run the transport thread. */
void run_client();
/* Decide function for a lock server. */
string Decide(const std::set<string> &results);
string Decide(const std::map<string, std::size_t> &results);
/* IR client proxy. */
replication::ir::IRClient *client;
/* Promise to wait for pending operation. */
Promise *waiting;
Promise *waiting = nullptr;
/* Callbacks for hearing back for an operation. */
void LockCallback(const std::string &, const std::string &);
void UnlockCallback(const std::string &, const std::string &);
void ErrorCallback(const std::string &, replication::ErrorCode);
};
} // namespace lockserver
......
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
/***********************************************************************
*
* lockserver-repl.cc: Step-by-step lock server evaluation.
*
* Copyright 2013 Dan R. K. Ports <drkp@cs.washington.edu>
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
**********************************************************************/
#include <thread>
#include <memory>
#include "lib/configuration.h"
#include "lib/repltransport.h"
#include "lockserver/client.h"
#include "lockserver/server.h"
#include "replication/ir/replica.h"
int main() {
ReplTransport transport;
std::vector<transport::ReplicaAddress> replica_addrs = {
{"replica", "0"},
{"replica", "1"},
{"replica", "2"},
{"replica", "3"},
{"replica", "4"}};
transport::Configuration config(5 /* n */, 2 /* f */, replica_addrs);
// Clients.
lockserver::LockClient client_a(&transport, config);
lockserver::LockClient client_b(&transport, config);
lockserver::LockClient client_c(&transport, config);
client_a.lock_async("a");
client_b.lock_async("b");
client_c.lock_async("c");
// Servers.
std::vector<std::unique_ptr<lockserver::LockServer>> servers;
std::vector<std::unique_ptr<replication::ir::IRReplica>> replicas;
for (std::size_t i = 0; i < replica_addrs.size(); ++i) {
auto server = std::unique_ptr<lockserver::LockServer>(
new lockserver::LockServer());
servers.push_back(std::move(server));
auto replica = std::unique_ptr<replication::ir::IRReplica>(
new replication::ir::IRReplica(config, i, &transport,
servers[i].get()));
replicas.push_back(std::move(replica));
}
// Launch REPL.
transport.Run();
// Remove persisted files.
for (std::size_t i = 0; i < replica_addrs.size(); ++i) {
const transport::ReplicaAddress &addr = replica_addrs[i];
const std::string filename =
addr.host + ":" + addr.port + "_" + std::to_string(i) + ".bin";
int success = std::remove(filename.c_str());
ASSERT(success == 0);
}
}
#include "lockserver/server.h"
int
main(int argc, char **argv)
{
int index = -1;
const char *configPath = NULL;
// Parse arguments
int opt;
char *strtolPtr;
while ((opt = getopt(argc, argv, "c:i:")) != -1) {
switch (opt) {
case 'c':
configPath = optarg;
break;
case 'i':
index = strtol(optarg, &strtolPtr, 10);
if ((*optarg == '\0') || (*strtolPtr != '\0') || (index < 0)) {
fprintf(stderr, "option -i requires a numeric arg\n");
}
break;
default:
fprintf(stderr, "Unknown argument %s\n", argv[optind]);
}
}
if (!configPath) {
fprintf(stderr, "option -c is required\n");
return EXIT_FAILURE;
}
if (index == -1) {
fprintf(stderr, "option -i is required\n");
return EXIT_FAILURE;
}
// Load configuration
std::ifstream configStream(configPath);
if (configStream.fail()) {
fprintf(stderr, "unable to read configuration file: %s\n", configPath);
return EXIT_FAILURE;
}
transport::Configuration config(configStream);
if (index >= config.n) {
fprintf(stderr, "replica index %d is out of bounds; "
"only %d replicas defined\n", index, config.n);
return EXIT_FAILURE;
}
UDPTransport transport(0.0, 0.0, 0);
lockserver::LockServer server;
replication::ir::IRReplica replica(config, index, &transport, &server);
transport.Run();
return EXIT_SUCCESS;
}