From a4afd29f42fd03a6d7492a0e8827195660d55a4b Mon Sep 17 00:00:00 2001
From: Irene Y Zhang <iyzhang@cs.washington.edu>
Date: Wed, 24 Jun 2015 08:53:27 -0700
Subject: [PATCH] adding some backend storage code

---
 store/backend/lockserver.cc | 283 ++++++++++++++++++++++++++++++++++++
 store/backend/lockserver.h  |  83 +++++++++++
 store/backend/lockstore.cc  | 135 +++++++++++++++++
 store/backend/lockstore.h   |  61 ++++++++
 store/backend/occstore.cc   | 164 +++++++++++++++++++++
 store/backend/occstore.h    |  53 +++++++
 6 files changed, 779 insertions(+)
 create mode 100644 store/backend/lockserver.cc
 create mode 100644 store/backend/lockserver.h
 create mode 100644 store/backend/lockstore.cc
 create mode 100644 store/backend/lockstore.h
 create mode 100644 store/backend/occstore.cc
 create mode 100644 store/backend/occstore.h

diff --git a/store/backend/lockserver.cc b/store/backend/lockserver.cc
new file mode 100644
index 0000000..b22285c
--- /dev/null
+++ b/store/backend/lockserver.cc
@@ -0,0 +1,283 @@
+// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
+// vim: set ts=4 sw=4:
+/***********************************************************************
+ *
+ * spanstore/lockserver.cc:
+ *   Simple multi-reader, single-writer lock server
+ *
+ **********************************************************************/
+
+#include "spanstore/lockserver.h"
+
+using namespace std;
+
+namespace spanstore {
+
+LockServer::LockServer()
+{
+    readers = 0;
+    writers = 0;
+}
+    
+LockServer::~LockServer() { }
+
+bool
+LockServer::Waiter::checkTimeout(const struct timeval &now)
+{
+    if (now.tv_sec > waitTime.tv_sec) {
+        return true;
+    } else {
+        ASSERT(now.tv_usec > waitTime.tv_usec && now.tv_sec == waitTime.tv_sec);
+        
+        if (now.tv_usec - waitTime.tv_usec > LOCK_WAIT_TIMEOUT)
+            return true;
+    }
+    return false;
+}
+    
+void
+LockServer::Lock::waitForLock(uint64_t requester, bool write)
+{
+    if (waiters.find(requester) != waiters.end()) {
+        // Already waiting
+        return;
+    }
+
+    Debug("[%lu] Adding me to the queue ...", requester);
+    // Otherwise
+    waiters[requester] = Waiter(write);
+    waitQ.push(requester);
+}
+
+bool
+LockServer::Lock::tryAcquireLock(uint64_t requester, bool write)
+{
+    if (waitQ.size() == 0) {
+        return true;
+    }
+
+    Debug("[%lu] Trying to get lock for %d", requester, (int)write);
+    struct timeval now;
+    uint64_t w = waitQ.front();
+    
+    gettimeofday(&now, NULL);
+    // prune old requests out of the wait queue
+    while (waiters[w].checkTimeout(now)) {
+        waiters.erase(w);
+        waitQ.pop();
+
+        // if everyone else was old ...
+        if (waitQ.size() == 0) {
+            return true;
+        }
+
+        w = waitQ.front();
+        ASSERT(waiters.find(w) != waiters.end());
+    }
+
+    if (waitQ.front() == requester) {
+        // this lock is being reserved for the requester
+        waitQ.pop();
+        ASSERT(waiters.find(requester) != waiters.end());
+        ASSERT(waiters[requester].write == write);
+        waiters.erase(requester);
+        return true;
+    } else {
+        // otherwise, add me to the list
+        waitForLock(requester, write);
+        return false;
+    }
+}
+
+bool
+LockServer::Lock::isWriteNext()
+{
+    if (waitQ.size() == 0) return false;
+
+    struct timeval now;
+    uint64_t w = waitQ.front();
+    
+    gettimeofday(&now, NULL);
+    // prune old requests out of the wait queue
+    while (waiters[w].checkTimeout(now)) {
+        waiters.erase(w);
+        waitQ.pop();
+
+        // if everyone else was old ...
+        if (waitQ.size() == 0) {
+            return false;
+        }
+
+        w = waitQ.front();
+        ASSERT(waiters.find(w) != waiters.end());
+    }
+
+    ASSERT(waiters.find(waitQ.front()) != waiters.end());
+    return waiters[waitQ.front()].write;
+}
+
+bool
+LockServer::lockForRead(const string &lock, uint64_t requester)
+{
+    Lock &l = locks[lock];
+    Debug("Lock for Read: %s [%lu %lu %lu %lu]", lock.c_str(),
+            readers, writers, l.holders.size(), l.waiters.size());
+
+    switch (l.state) {
+    case UNLOCKED:
+        // if you are next in the queue
+        if (l.tryAcquireLock(requester, false)) {
+            Debug("[%lu] I have acquired the read lock!", requester);
+            l.state = LOCKED_FOR_READ;
+            ASSERT(l.holders.size() == 0);
+            l.holders.insert(requester);
+            readers++;
+            return true;
+        }
+        return false;
+    case LOCKED_FOR_READ:
+        // if you already hold this lock
+        if (l.holders.find(requester) != l.holders.end()) {
+            return true;
+        }
+
+        // There is a write waiting, let's give up the lock
+        if (l.isWriteNext()) {
+	    Debug("[%lu] Waiting on lock because there is a pending write request", requester);
+            l.waitForLock(requester, false);
+            return false;
+        }
+
+        l.holders.insert(requester);
+        readers++;
+        return true;
+    case LOCKED_FOR_WRITE:
+    case LOCKED_FOR_READ_WRITE:
+        if (l.holders.count(requester) > 0) {
+            l.state = LOCKED_FOR_READ_WRITE;
+            readers++;
+            return true;
+        }
+        ASSERT(l.holders.size() == 1);
+        Debug("Locked for write, held by %lu", *(l.holders.begin())); 
+        l.waitForLock(requester, false);
+        return false;
+    }
+    NOT_REACHABLE();
+    return false;
+}
+
+bool
+LockServer::lockForWrite(const string &lock, uint64_t requester)
+{
+    Lock &l = locks[lock];
+
+    Debug("Lock for Write: %s [%lu %lu %lu %lu]", lock.c_str(),
+    readers, writers, l.holders.size(), l.waiters.size());
+
+    switch (l.state) {
+    case UNLOCKED:
+        // Got it!
+        if (l.tryAcquireLock(requester, true)) {
+            Debug("[%lu] I have acquired the write lock!", requester);
+            l.state = LOCKED_FOR_WRITE;
+            ASSERT(l.holders.size() == 0);
+            l.holders.insert(requester);
+            writers++;
+            return true;
+        }
+        return false;
+    case LOCKED_FOR_READ:
+        if (l.holders.size() == 1 && l.holders.count(requester) > 0) {
+            // if there is one holder of this read lock and it is the
+            // requester, then upgrade the lock
+            l.state = LOCKED_FOR_READ_WRITE;
+            writers++;
+            return true;
+        }
+
+        Debug("Locked for read by%s%lu other people", l.holders.count(requester) > 0 ? "you" : "", l.holders.size());
+        l.waitForLock(requester, true);
+        return false;
+    case LOCKED_FOR_WRITE:
+    case LOCKED_FOR_READ_WRITE:
+        ASSERT(l.holders.size() == 1);
+        if (l.holders.count(requester) > 0) {
+            return true;
+        }
+
+        Debug("Held by %lu for %s", *(l.holders.begin()), (l.state == LOCKED_FOR_WRITE) ? "write" : "read-write" );
+        l.waitForLock(requester, true);
+        return false;
+    }
+    NOT_REACHABLE();
+    return false;
+}
+
+void
+LockServer::releaseForRead(const string &lock, uint64_t holder)
+{
+    if (locks.find(lock) == locks.end()) {
+        return;
+    }
+    
+    Lock &l = locks[lock];
+
+    if (l.holders.count(holder) == 0) {
+        Warning("[%ld] Releasing unheld read lock: %s", holder, lock.c_str());
+        return;
+    }
+
+    switch (l.state) {
+    case UNLOCKED:
+    case LOCKED_FOR_WRITE:
+        return;
+    case LOCKED_FOR_READ:
+        readers--;
+        if (l.holders.erase(holder) < 1) {
+            Warning("[%ld] Releasing unheld read lock: %s", holder, lock.c_str()); 
+        }
+        if (l.holders.empty()) {
+            l.state = UNLOCKED;
+        }
+	return;
+    case LOCKED_FOR_READ_WRITE:
+        readers--;
+        l.state = LOCKED_FOR_WRITE;
+        return;
+    }
+}
+
+void
+LockServer::releaseForWrite(const string &lock, uint64_t holder)
+{
+    if (locks.find(lock) == locks.end()) {
+        return;
+    }
+
+    Lock &l = locks[lock];
+
+    if (l.holders.count(holder) == 0) {
+        Warning("[%ld] Releasing unheld write lock: %s", holder, lock.c_str());
+        return;
+    }
+
+    switch (l.state) {
+    case UNLOCKED:
+    case LOCKED_FOR_READ:
+        return;
+    case LOCKED_FOR_WRITE:
+        writers--;
+        l.holders.erase(holder);
+        ASSERT(l.holders.size() == 0);
+        l.state = UNLOCKED;
+        return;
+    case LOCKED_FOR_READ_WRITE:
+        writers--;
+        l.state = LOCKED_FOR_READ;
+        ASSERT(l.holders.size() == 1);
+        return;
+    }
+}
+
+} // namespace spanstore
diff --git a/store/backend/lockserver.h b/store/backend/lockserver.h
new file mode 100644
index 0000000..45cb4b7
--- /dev/null
+++ b/store/backend/lockserver.h
@@ -0,0 +1,83 @@
+// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
+// vim: set ts=4 sw=4:
+/***********************************************************************
+ *
+ * spanstore/lockserver.h:
+ *   Simple multi-reader, single-writer lock server.
+ *
+ **********************************************************************/
+
+#ifndef _LOCK_SERVER_H_
+#define _LOCK_SERVER_H_
+
+#include "paxos-lib/lib/assert.h"
+#include "paxos-lib/lib/message.h"
+#include <sys/time.h>
+#include <map>
+#include <queue>
+#include <string>
+#include <vector>
+#include <unordered_map>
+#include <unordered_set>
+
+namespace spanstore {
+
+#define LOCK_WAIT_TIMEOUT 5000
+
+class LockServer
+{
+
+public:
+    LockServer();
+    ~LockServer();
+
+    bool lockForRead(const std::string &lock, uint64_t requester);
+    bool lockForWrite(const std::string &lock, uint64_t requester);
+    void releaseForRead(const std::string &lock, uint64_t holder);
+    void releaseForWrite(const std::string &lock, uint64_t holder);
+
+private:
+    enum LockState {
+        UNLOCKED,
+        LOCKED_FOR_READ,
+        LOCKED_FOR_WRITE,
+        LOCKED_FOR_READ_WRITE
+    };
+
+    struct Waiter {
+        bool write;
+        struct timeval waitTime;
+
+        Waiter() {write = false;}
+        Waiter(bool w) {
+            gettimeofday(&waitTime, NULL);
+            write = w;
+        }
+
+        bool checkTimeout(const struct timeval &now);
+    };
+
+    struct Lock {
+        LockState state;
+        std::unordered_set<uint64_t> holders;
+        std::queue<uint64_t> waitQ;
+        std::map<uint64_t, Waiter> waiters;
+
+        Lock() {
+            state = UNLOCKED;
+        };
+        void waitForLock(uint64_t requester, bool write);
+        bool tryAcquireLock(uint64_t requester, bool write);
+        bool isWriteNext();
+    };
+
+    /* Global store which keep key -> (timestamp, value) list. */
+    std::unordered_map<std::string, Lock> locks;
+
+    uint64_t readers;
+    uint64_t writers;
+};
+
+} // namespace spanstore
+
+#endif /* _LOCK_SERVER_H_ */
diff --git a/store/backend/lockstore.cc b/store/backend/lockstore.cc
new file mode 100644
index 0000000..cb5782e
--- /dev/null
+++ b/store/backend/lockstore.cc
@@ -0,0 +1,135 @@
+// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
+// vim: set ts=4 sw=4:
+/***********************************************************************
+ *
+ * nistore/lockstore.cc:
+ *   Key-value store with support for S2PL
+ *
+ **********************************************************************/
+
+#include "spanstore/lockstore.h"
+
+using namespace std;
+
+namespace spanstore {
+
+LockStore::LockStore() : TxnStore(), store() { }
+LockStore::~LockStore() { }
+
+int
+LockStore::Get(uint64_t id, const string &key, pair<Timestamp, string> &value)
+{
+    Debug("[%lu] GET %s", id, key.c_str());
+    string val;
+
+    if (!store.get(key, val)) {
+        // couldn't find the key
+        return REPLY_FAIL;
+    }
+
+    // grab the lock (ok, if we already have it)
+    if (locks.lockForRead(key, id)) {
+        value = make_pair(Timestamp(), val);
+        return REPLY_OK;
+    } else {
+        Debug("[%lu] Could not acquire read lock", id);
+        return REPLY_RETRY;
+    }
+}
+
+int
+LockStore::Get(uint64_t id, const string &key, const Timestamp &timestamp, pair<Timestamp, string> &value)
+{
+    return Get(id, key, value);
+}
+
+int
+LockStore::Prepare(uint64_t id, const Transaction &txn)
+{    
+    Debug("[%lu] START PREPARE", id);
+
+    if (prepared.size() > 100) {
+        Warning("Lots of prepared transactions! %lu", prepared.size());
+    }
+
+    if (prepared.find(id) != prepared.end()) {
+        Debug("[%lu] Already prepared", id);
+        return REPLY_OK;
+    }
+
+    if (getLocks(id, txn)) {
+        prepared[id] = txn;
+        Debug("[%lu] PREPARED TO COMMIT", id);
+        return REPLY_OK;
+    } else {
+        Debug("[%lu] Could not acquire write locks", id);
+        return REPLY_RETRY;
+    }
+}
+
+void
+LockStore::Commit(uint64_t id, uint64_t timestamp)
+{
+    Debug("[%lu] COMMIT", id);
+    ASSERT(prepared.find(id) != prepared.end());
+
+    Transaction txn = prepared[id];
+
+    for (auto &write : txn.getWriteSet()) {
+        bool ret = store.put(write.first, // key
+                             write.second); // value
+        ASSERT(ret);
+    }
+
+    //drop locks
+    dropLocks(id, txn);
+
+    prepared.erase(id);
+}
+
+void
+LockStore::Abort(uint64_t id, const Transaction &txn)
+{
+    Debug("[%lu] ABORT", id);
+    dropLocks(id, txn);
+    prepared.erase(id);
+}
+
+void
+LockStore::Load(const string &key, const string &value, const Timestamp &timestamp)
+{
+    store.put(key, value);
+}
+
+/* Used on commit and abort for second phase of 2PL. */
+void
+LockStore::dropLocks(uint64_t id, const Transaction &txn)
+{
+    for (auto &write : txn.getWriteSet()) {
+        locks.releaseForWrite(write.first, id);
+    }
+
+    for (auto &read : txn.getReadSet()) {
+        locks.releaseForRead(read.first, id);
+    }
+}
+
+bool
+LockStore::getLocks(uint64_t id, const Transaction &txn)
+{
+    bool ret = true;
+    // if we don't have read locks, get read locks
+    for (auto &read : txn.getReadSet()) {
+        if (!locks.lockForRead(read.first, id)) {
+            ret = false;
+        }
+    }
+    for (auto &write : txn.getWriteSet()) {
+        if (!locks.lockForWrite(write.first, id)) {
+            ret = false;
+        }
+    }
+    return ret;
+}
+
+} // namespace spanstore
diff --git a/store/backend/lockstore.h b/store/backend/lockstore.h
new file mode 100644
index 0000000..6c68e73
--- /dev/null
+++ b/store/backend/lockstore.h
@@ -0,0 +1,61 @@
+// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
+// vim: set ts=4 sw=4:
+/***********************************************************************
+ *
+ * common/lockstore.h 
+
+ * Single-node Key-value store with support for 2PC locking-based
+ * transactions using S2PL
+ *
+ **********************************************************************/
+
+#ifndef _LOCK_STORE_H_
+#define _LOCK_STORE_H_
+
+#include "paxos-lib/lib/assert.h"
+#include "paxos-lib/lib/message.h"
+#include "common/kvstore.h"
+#include "common/txnstore.h"
+#include "common/transaction.h"
+#include "spanstore/lockserver.h"
+#include <vector>
+#include <unordered_map>
+#include <set>
+#include <map>
+#include <list>
+
+namespace spanstore {
+
+class LockStore : public TxnStore
+{
+public:
+    LockStore();
+    ~LockStore();
+
+    // Overriding from TxnStore.
+    int Get(uint64_t id, const std::string &key,
+        std::pair<Timestamp, std::string> &value);
+    int Get(uint64_t id, const std::string &key, const Timestamp &timestamp,
+        std::pair<Timestamp, std::string> &value);
+    int Prepare(uint64_t id, const Transaction &txn);
+    void Commit(uint64_t id, uint64_t timestamp);
+    void Abort(uint64_t id, const Transaction &txn);
+    void Load(const std::string &key, const std::string &value,
+        const Timestamp &timestamp);
+
+private:
+    // Data store.
+    KVStore store;
+
+    // Locks manager.
+    LockServer locks;
+
+    std::map<uint64_t, Transaction> prepared;
+
+    void dropLocks(uint64_t id, const Transaction &txn);
+    bool getLocks(uint64_t id, const Transaction &txn);
+};
+
+} // namespace spanstore
+
+#endif /* _LOCK_STORE_H_ */
diff --git a/store/backend/occstore.cc b/store/backend/occstore.cc
new file mode 100644
index 0000000..5317562
--- /dev/null
+++ b/store/backend/occstore.cc
@@ -0,0 +1,164 @@
+// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
+// vim: set ts=4 sw=4:
+/***********************************************************************
+ *
+ * spanstore/occstore.cc:
+ *   Key-value store with support for OCC
+ *
+ **********************************************************************/
+
+#include "spanstore/occstore.h"
+
+using namespace std;
+
+namespace spanstore {
+
+OCCStore::OCCStore() : store() { }
+OCCStore::~OCCStore() { }
+
+int
+OCCStore::Get(uint64_t id, const string &key, pair<Timestamp, string> &value)
+{
+    Debug("[%lu] GET %s", id, key.c_str());
+
+    // Get latest from store
+    if (store.get(key, value)) {
+        Debug("[%lu] GET %s %lu", id, key.c_str(), value.first.getTimestamp());
+        return REPLY_OK;
+    } else {
+        return REPLY_FAIL;
+    }
+}
+
+int
+OCCStore::Get(uint64_t id, const string &key, const Timestamp &timestamp, pair<Timestamp, string> &value)
+{
+    Debug("[%lu] GET %s", id, key.c_str());
+    
+    // Get version at timestamp from store
+    if (store.get(key, timestamp, value)) {
+        return REPLY_OK;
+    } else {
+        return REPLY_FAIL;
+    }
+}
+
+int
+OCCStore::Prepare(uint64_t id, const Transaction &txn)
+{    
+    Debug("[%lu] START PREPARE", id);
+
+    if (prepared.find(id) != prepared.end()) {
+        Debug("[%lu] Already prepared!", id);
+        return REPLY_OK;
+    }
+
+    // Do OCC checks.
+    set<string> pWrites = getPreparedWrites();
+    set<string> pRW = getPreparedReadWrites();
+
+    // Check for conflicts with the read set.
+    for (auto &read : txn.getReadSet()) {
+        pair<Timestamp, string> cur;
+        bool ret = store.get(read.first, cur);
+
+	    // ASSERT(ret);
+        if (!ret)
+            continue;
+
+        // If this key has been written since we read it, abort.
+        if (cur.first > read.second) {
+            Debug("[%lu] ABORT rw conflict key:%s %lu %lu",
+                  id, read.first.c_str(), cur.first.getTimestamp(),
+                  read.second.getTimestamp());
+            
+            Abort(id);
+            return REPLY_FAIL;
+        }
+
+        // If there is a pending write for this key, abort.
+        if (pWrites.find(read.first) != pWrites.end()) {
+            Debug("[%lu] ABORT rw conflict w/ prepared key:%s",
+                  id, read.first.c_str());
+            Abort(id);
+            return REPLY_FAIL;
+        }
+    }
+
+    // Check for conflicts with the write set.
+    for (auto &write : txn.getWriteSet()) {
+        // If there is a pending read or write for this key, abort.
+        if (pRW.find(write.first) != pRW.end()) {
+            Debug("[%lu] ABORT ww conflict w/ prepared key:%s", id,
+                    write.first.c_str());
+            Abort(id);
+            return REPLY_FAIL;
+        }
+    }
+
+    // Otherwise, prepare this transaction for commit
+    prepared[id] = txn;
+    Debug("[%lu] PREPARED TO COMMIT", id);
+    return REPLY_OK;
+}
+
+void
+OCCStore::Commit(uint64_t id, uint64_t timestamp)
+{
+    Debug("[%lu] COMMIT", id);
+    ASSERT(prepared.find(id) != prepared.end());
+
+    Transaction txn = prepared[id];
+
+    for (auto &write : txn.getWriteSet()) {
+        store.put(write.first, // key
+                    write.second, // value
+                    Timestamp(timestamp)); // timestamp
+    }
+
+    prepared.erase(id);
+}
+
+void
+OCCStore::Abort(uint64_t id, const Transaction &txn)
+{
+    Debug("[%lu] ABORT", id);
+    prepared.erase(id);
+}
+
+void
+OCCStore::Load(const string &key, const string &value, const Timestamp &timestamp)
+{
+    store.put(key, value, timestamp);
+}
+
+set<string>
+OCCStore::getPreparedWrites()
+{
+    // gather up the set of all writes that we are currently prepared for
+    set<string> writes;
+    for (auto &t : prepared) {
+        for (auto &write : t.second.getWriteSet()) {
+            writes.insert(write.first);
+        }
+    }
+    return writes;
+}
+
+set<string>
+OCCStore::getPreparedReadWrites()
+{
+    // gather up the set of all writes that we are currently prepared for
+    set<string> readwrites;
+    for (auto &t : prepared) {
+        for (auto &write : t.second.getWriteSet()) {
+            readwrites.insert(write.first);
+        }
+        for (auto &read : t.second.getReadSet()) {
+            readwrites.insert(read.first);
+        }
+    }
+    return readwrites;
+}
+
+} // namespace spanstore
diff --git a/store/backend/occstore.h b/store/backend/occstore.h
new file mode 100644
index 0000000..b7c0057
--- /dev/null
+++ b/store/backend/occstore.h
@@ -0,0 +1,53 @@
+// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
+// vim: set ts=4 sw=4:
+/***********************************************************************
+ *
+ * spanstore/occstore.h:
+ *   Key-value store with support for transactions using OCC.
+ *
+ **********************************************************************/
+
+#ifndef _OCC_STORE_H_
+#define _OCC_STORE_H_
+
+#include "paxos-lib/lib/assert.h"
+#include "paxos-lib/lib/message.h"
+#include "common/versionedKVStore.h"
+#include "common/txnstore.h"
+#include "common/transaction.h"
+
+#include <vector>
+#include <unordered_map>
+#include <set>
+#include <map>
+#include <list>
+
+namespace spanstore {
+
+class OCCStore : public TxnStore
+{
+public:
+    OCCStore();
+    ~OCCStore();
+
+    // Overriding from TxnStore.
+    int Get(uint64_t id, const std::string &key, std::pair<Timestamp, std::string> &value);
+    int Get(uint64_t id, const std::string &key, const Timestamp &timestamp, std::pair<Timestamp, std::string> &value);
+    int Prepare(uint64_t id, const Transaction &txn);
+    void Commit(uint64_t id, uint64_t timestamp);
+    void Abort(uint64_t id, const Transaction &txn = Transaction());
+    void Load(const std::string &key, const std::string &value, const Timestamp &timestamp);
+
+private:
+    // Data store.
+    VersionedKVStore store;
+
+    std::map<uint64_t, Transaction> prepared;
+
+    std::set<std::string> getPreparedWrites();
+    std::set<std::string> getPreparedReadWrites();
+};
+
+} // namespace spanstore
+
+#endif /* _OCC_STORE_H_ */
-- 
GitLab