diff --git a/store/backend/lockserver.cc b/store/backend/lockserver.cc new file mode 100644 index 0000000000000000000000000000000000000000..b22285c1b11cf080ab3a23d10b5ad016f5090000 --- /dev/null +++ b/store/backend/lockserver.cc @@ -0,0 +1,283 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +// vim: set ts=4 sw=4: +/*********************************************************************** + * + * spanstore/lockserver.cc: + * Simple multi-reader, single-writer lock server + * + **********************************************************************/ + +#include "spanstore/lockserver.h" + +using namespace std; + +namespace spanstore { + +LockServer::LockServer() +{ + readers = 0; + writers = 0; +} + +LockServer::~LockServer() { } + +bool +LockServer::Waiter::checkTimeout(const struct timeval &now) +{ + if (now.tv_sec > waitTime.tv_sec) { + return true; + } else { + ASSERT(now.tv_usec > waitTime.tv_usec && now.tv_sec == waitTime.tv_sec); + + if (now.tv_usec - waitTime.tv_usec > LOCK_WAIT_TIMEOUT) + return true; + } + return false; +} + +void +LockServer::Lock::waitForLock(uint64_t requester, bool write) +{ + if (waiters.find(requester) != waiters.end()) { + // Already waiting + return; + } + + Debug("[%lu] Adding me to the queue ...", requester); + // Otherwise + waiters[requester] = Waiter(write); + waitQ.push(requester); +} + +bool +LockServer::Lock::tryAcquireLock(uint64_t requester, bool write) +{ + if (waitQ.size() == 0) { + return true; + } + + Debug("[%lu] Trying to get lock for %d", requester, (int)write); + struct timeval now; + uint64_t w = waitQ.front(); + + gettimeofday(&now, NULL); + // prune old requests out of the wait queue + while (waiters[w].checkTimeout(now)) { + waiters.erase(w); + waitQ.pop(); + + // if everyone else was old ... + if (waitQ.size() == 0) { + return true; + } + + w = waitQ.front(); + ASSERT(waiters.find(w) != waiters.end()); + } + + if (waitQ.front() == requester) { + // this lock is being reserved for the requester + waitQ.pop(); + ASSERT(waiters.find(requester) != waiters.end()); + ASSERT(waiters[requester].write == write); + waiters.erase(requester); + return true; + } else { + // otherwise, add me to the list + waitForLock(requester, write); + return false; + } +} + +bool +LockServer::Lock::isWriteNext() +{ + if (waitQ.size() == 0) return false; + + struct timeval now; + uint64_t w = waitQ.front(); + + gettimeofday(&now, NULL); + // prune old requests out of the wait queue + while (waiters[w].checkTimeout(now)) { + waiters.erase(w); + waitQ.pop(); + + // if everyone else was old ... + if (waitQ.size() == 0) { + return false; + } + + w = waitQ.front(); + ASSERT(waiters.find(w) != waiters.end()); + } + + ASSERT(waiters.find(waitQ.front()) != waiters.end()); + return waiters[waitQ.front()].write; +} + +bool +LockServer::lockForRead(const string &lock, uint64_t requester) +{ + Lock &l = locks[lock]; + Debug("Lock for Read: %s [%lu %lu %lu %lu]", lock.c_str(), + readers, writers, l.holders.size(), l.waiters.size()); + + switch (l.state) { + case UNLOCKED: + // if you are next in the queue + if (l.tryAcquireLock(requester, false)) { + Debug("[%lu] I have acquired the read lock!", requester); + l.state = LOCKED_FOR_READ; + ASSERT(l.holders.size() == 0); + l.holders.insert(requester); + readers++; + return true; + } + return false; + case LOCKED_FOR_READ: + // if you already hold this lock + if (l.holders.find(requester) != l.holders.end()) { + return true; + } + + // There is a write waiting, let's give up the lock + if (l.isWriteNext()) { + Debug("[%lu] Waiting on lock because there is a pending write request", requester); + l.waitForLock(requester, false); + return false; + } + + l.holders.insert(requester); + readers++; + return true; + case LOCKED_FOR_WRITE: + case LOCKED_FOR_READ_WRITE: + if (l.holders.count(requester) > 0) { + l.state = LOCKED_FOR_READ_WRITE; + readers++; + return true; + } + ASSERT(l.holders.size() == 1); + Debug("Locked for write, held by %lu", *(l.holders.begin())); + l.waitForLock(requester, false); + return false; + } + NOT_REACHABLE(); + return false; +} + +bool +LockServer::lockForWrite(const string &lock, uint64_t requester) +{ + Lock &l = locks[lock]; + + Debug("Lock for Write: %s [%lu %lu %lu %lu]", lock.c_str(), + readers, writers, l.holders.size(), l.waiters.size()); + + switch (l.state) { + case UNLOCKED: + // Got it! + if (l.tryAcquireLock(requester, true)) { + Debug("[%lu] I have acquired the write lock!", requester); + l.state = LOCKED_FOR_WRITE; + ASSERT(l.holders.size() == 0); + l.holders.insert(requester); + writers++; + return true; + } + return false; + case LOCKED_FOR_READ: + if (l.holders.size() == 1 && l.holders.count(requester) > 0) { + // if there is one holder of this read lock and it is the + // requester, then upgrade the lock + l.state = LOCKED_FOR_READ_WRITE; + writers++; + return true; + } + + Debug("Locked for read by%s%lu other people", l.holders.count(requester) > 0 ? "you" : "", l.holders.size()); + l.waitForLock(requester, true); + return false; + case LOCKED_FOR_WRITE: + case LOCKED_FOR_READ_WRITE: + ASSERT(l.holders.size() == 1); + if (l.holders.count(requester) > 0) { + return true; + } + + Debug("Held by %lu for %s", *(l.holders.begin()), (l.state == LOCKED_FOR_WRITE) ? "write" : "read-write" ); + l.waitForLock(requester, true); + return false; + } + NOT_REACHABLE(); + return false; +} + +void +LockServer::releaseForRead(const string &lock, uint64_t holder) +{ + if (locks.find(lock) == locks.end()) { + return; + } + + Lock &l = locks[lock]; + + if (l.holders.count(holder) == 0) { + Warning("[%ld] Releasing unheld read lock: %s", holder, lock.c_str()); + return; + } + + switch (l.state) { + case UNLOCKED: + case LOCKED_FOR_WRITE: + return; + case LOCKED_FOR_READ: + readers--; + if (l.holders.erase(holder) < 1) { + Warning("[%ld] Releasing unheld read lock: %s", holder, lock.c_str()); + } + if (l.holders.empty()) { + l.state = UNLOCKED; + } + return; + case LOCKED_FOR_READ_WRITE: + readers--; + l.state = LOCKED_FOR_WRITE; + return; + } +} + +void +LockServer::releaseForWrite(const string &lock, uint64_t holder) +{ + if (locks.find(lock) == locks.end()) { + return; + } + + Lock &l = locks[lock]; + + if (l.holders.count(holder) == 0) { + Warning("[%ld] Releasing unheld write lock: %s", holder, lock.c_str()); + return; + } + + switch (l.state) { + case UNLOCKED: + case LOCKED_FOR_READ: + return; + case LOCKED_FOR_WRITE: + writers--; + l.holders.erase(holder); + ASSERT(l.holders.size() == 0); + l.state = UNLOCKED; + return; + case LOCKED_FOR_READ_WRITE: + writers--; + l.state = LOCKED_FOR_READ; + ASSERT(l.holders.size() == 1); + return; + } +} + +} // namespace spanstore diff --git a/store/backend/lockserver.h b/store/backend/lockserver.h new file mode 100644 index 0000000000000000000000000000000000000000..45cb4b73e1e272da1e72c55952a0e919be969f44 --- /dev/null +++ b/store/backend/lockserver.h @@ -0,0 +1,83 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +// vim: set ts=4 sw=4: +/*********************************************************************** + * + * spanstore/lockserver.h: + * Simple multi-reader, single-writer lock server. + * + **********************************************************************/ + +#ifndef _LOCK_SERVER_H_ +#define _LOCK_SERVER_H_ + +#include "paxos-lib/lib/assert.h" +#include "paxos-lib/lib/message.h" +#include <sys/time.h> +#include <map> +#include <queue> +#include <string> +#include <vector> +#include <unordered_map> +#include <unordered_set> + +namespace spanstore { + +#define LOCK_WAIT_TIMEOUT 5000 + +class LockServer +{ + +public: + LockServer(); + ~LockServer(); + + bool lockForRead(const std::string &lock, uint64_t requester); + bool lockForWrite(const std::string &lock, uint64_t requester); + void releaseForRead(const std::string &lock, uint64_t holder); + void releaseForWrite(const std::string &lock, uint64_t holder); + +private: + enum LockState { + UNLOCKED, + LOCKED_FOR_READ, + LOCKED_FOR_WRITE, + LOCKED_FOR_READ_WRITE + }; + + struct Waiter { + bool write; + struct timeval waitTime; + + Waiter() {write = false;} + Waiter(bool w) { + gettimeofday(&waitTime, NULL); + write = w; + } + + bool checkTimeout(const struct timeval &now); + }; + + struct Lock { + LockState state; + std::unordered_set<uint64_t> holders; + std::queue<uint64_t> waitQ; + std::map<uint64_t, Waiter> waiters; + + Lock() { + state = UNLOCKED; + }; + void waitForLock(uint64_t requester, bool write); + bool tryAcquireLock(uint64_t requester, bool write); + bool isWriteNext(); + }; + + /* Global store which keep key -> (timestamp, value) list. */ + std::unordered_map<std::string, Lock> locks; + + uint64_t readers; + uint64_t writers; +}; + +} // namespace spanstore + +#endif /* _LOCK_SERVER_H_ */ diff --git a/store/backend/lockstore.cc b/store/backend/lockstore.cc new file mode 100644 index 0000000000000000000000000000000000000000..cb5782e6562653567878b4bde87023913eb556a6 --- /dev/null +++ b/store/backend/lockstore.cc @@ -0,0 +1,135 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +// vim: set ts=4 sw=4: +/*********************************************************************** + * + * nistore/lockstore.cc: + * Key-value store with support for S2PL + * + **********************************************************************/ + +#include "spanstore/lockstore.h" + +using namespace std; + +namespace spanstore { + +LockStore::LockStore() : TxnStore(), store() { } +LockStore::~LockStore() { } + +int +LockStore::Get(uint64_t id, const string &key, pair<Timestamp, string> &value) +{ + Debug("[%lu] GET %s", id, key.c_str()); + string val; + + if (!store.get(key, val)) { + // couldn't find the key + return REPLY_FAIL; + } + + // grab the lock (ok, if we already have it) + if (locks.lockForRead(key, id)) { + value = make_pair(Timestamp(), val); + return REPLY_OK; + } else { + Debug("[%lu] Could not acquire read lock", id); + return REPLY_RETRY; + } +} + +int +LockStore::Get(uint64_t id, const string &key, const Timestamp ×tamp, pair<Timestamp, string> &value) +{ + return Get(id, key, value); +} + +int +LockStore::Prepare(uint64_t id, const Transaction &txn) +{ + Debug("[%lu] START PREPARE", id); + + if (prepared.size() > 100) { + Warning("Lots of prepared transactions! %lu", prepared.size()); + } + + if (prepared.find(id) != prepared.end()) { + Debug("[%lu] Already prepared", id); + return REPLY_OK; + } + + if (getLocks(id, txn)) { + prepared[id] = txn; + Debug("[%lu] PREPARED TO COMMIT", id); + return REPLY_OK; + } else { + Debug("[%lu] Could not acquire write locks", id); + return REPLY_RETRY; + } +} + +void +LockStore::Commit(uint64_t id, uint64_t timestamp) +{ + Debug("[%lu] COMMIT", id); + ASSERT(prepared.find(id) != prepared.end()); + + Transaction txn = prepared[id]; + + for (auto &write : txn.getWriteSet()) { + bool ret = store.put(write.first, // key + write.second); // value + ASSERT(ret); + } + + //drop locks + dropLocks(id, txn); + + prepared.erase(id); +} + +void +LockStore::Abort(uint64_t id, const Transaction &txn) +{ + Debug("[%lu] ABORT", id); + dropLocks(id, txn); + prepared.erase(id); +} + +void +LockStore::Load(const string &key, const string &value, const Timestamp ×tamp) +{ + store.put(key, value); +} + +/* Used on commit and abort for second phase of 2PL. */ +void +LockStore::dropLocks(uint64_t id, const Transaction &txn) +{ + for (auto &write : txn.getWriteSet()) { + locks.releaseForWrite(write.first, id); + } + + for (auto &read : txn.getReadSet()) { + locks.releaseForRead(read.first, id); + } +} + +bool +LockStore::getLocks(uint64_t id, const Transaction &txn) +{ + bool ret = true; + // if we don't have read locks, get read locks + for (auto &read : txn.getReadSet()) { + if (!locks.lockForRead(read.first, id)) { + ret = false; + } + } + for (auto &write : txn.getWriteSet()) { + if (!locks.lockForWrite(write.first, id)) { + ret = false; + } + } + return ret; +} + +} // namespace spanstore diff --git a/store/backend/lockstore.h b/store/backend/lockstore.h new file mode 100644 index 0000000000000000000000000000000000000000..6c68e73cba4802b343c1e6515bdc0932d1794e34 --- /dev/null +++ b/store/backend/lockstore.h @@ -0,0 +1,61 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +// vim: set ts=4 sw=4: +/*********************************************************************** + * + * common/lockstore.h + + * Single-node Key-value store with support for 2PC locking-based + * transactions using S2PL + * + **********************************************************************/ + +#ifndef _LOCK_STORE_H_ +#define _LOCK_STORE_H_ + +#include "paxos-lib/lib/assert.h" +#include "paxos-lib/lib/message.h" +#include "common/kvstore.h" +#include "common/txnstore.h" +#include "common/transaction.h" +#include "spanstore/lockserver.h" +#include <vector> +#include <unordered_map> +#include <set> +#include <map> +#include <list> + +namespace spanstore { + +class LockStore : public TxnStore +{ +public: + LockStore(); + ~LockStore(); + + // Overriding from TxnStore. + int Get(uint64_t id, const std::string &key, + std::pair<Timestamp, std::string> &value); + int Get(uint64_t id, const std::string &key, const Timestamp ×tamp, + std::pair<Timestamp, std::string> &value); + int Prepare(uint64_t id, const Transaction &txn); + void Commit(uint64_t id, uint64_t timestamp); + void Abort(uint64_t id, const Transaction &txn); + void Load(const std::string &key, const std::string &value, + const Timestamp ×tamp); + +private: + // Data store. + KVStore store; + + // Locks manager. + LockServer locks; + + std::map<uint64_t, Transaction> prepared; + + void dropLocks(uint64_t id, const Transaction &txn); + bool getLocks(uint64_t id, const Transaction &txn); +}; + +} // namespace spanstore + +#endif /* _LOCK_STORE_H_ */ diff --git a/store/backend/occstore.cc b/store/backend/occstore.cc new file mode 100644 index 0000000000000000000000000000000000000000..53175626413083876654e53d3ee3b5aadb5db289 --- /dev/null +++ b/store/backend/occstore.cc @@ -0,0 +1,164 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +// vim: set ts=4 sw=4: +/*********************************************************************** + * + * spanstore/occstore.cc: + * Key-value store with support for OCC + * + **********************************************************************/ + +#include "spanstore/occstore.h" + +using namespace std; + +namespace spanstore { + +OCCStore::OCCStore() : store() { } +OCCStore::~OCCStore() { } + +int +OCCStore::Get(uint64_t id, const string &key, pair<Timestamp, string> &value) +{ + Debug("[%lu] GET %s", id, key.c_str()); + + // Get latest from store + if (store.get(key, value)) { + Debug("[%lu] GET %s %lu", id, key.c_str(), value.first.getTimestamp()); + return REPLY_OK; + } else { + return REPLY_FAIL; + } +} + +int +OCCStore::Get(uint64_t id, const string &key, const Timestamp ×tamp, pair<Timestamp, string> &value) +{ + Debug("[%lu] GET %s", id, key.c_str()); + + // Get version at timestamp from store + if (store.get(key, timestamp, value)) { + return REPLY_OK; + } else { + return REPLY_FAIL; + } +} + +int +OCCStore::Prepare(uint64_t id, const Transaction &txn) +{ + Debug("[%lu] START PREPARE", id); + + if (prepared.find(id) != prepared.end()) { + Debug("[%lu] Already prepared!", id); + return REPLY_OK; + } + + // Do OCC checks. + set<string> pWrites = getPreparedWrites(); + set<string> pRW = getPreparedReadWrites(); + + // Check for conflicts with the read set. + for (auto &read : txn.getReadSet()) { + pair<Timestamp, string> cur; + bool ret = store.get(read.first, cur); + + // ASSERT(ret); + if (!ret) + continue; + + // If this key has been written since we read it, abort. + if (cur.first > read.second) { + Debug("[%lu] ABORT rw conflict key:%s %lu %lu", + id, read.first.c_str(), cur.first.getTimestamp(), + read.second.getTimestamp()); + + Abort(id); + return REPLY_FAIL; + } + + // If there is a pending write for this key, abort. + if (pWrites.find(read.first) != pWrites.end()) { + Debug("[%lu] ABORT rw conflict w/ prepared key:%s", + id, read.first.c_str()); + Abort(id); + return REPLY_FAIL; + } + } + + // Check for conflicts with the write set. + for (auto &write : txn.getWriteSet()) { + // If there is a pending read or write for this key, abort. + if (pRW.find(write.first) != pRW.end()) { + Debug("[%lu] ABORT ww conflict w/ prepared key:%s", id, + write.first.c_str()); + Abort(id); + return REPLY_FAIL; + } + } + + // Otherwise, prepare this transaction for commit + prepared[id] = txn; + Debug("[%lu] PREPARED TO COMMIT", id); + return REPLY_OK; +} + +void +OCCStore::Commit(uint64_t id, uint64_t timestamp) +{ + Debug("[%lu] COMMIT", id); + ASSERT(prepared.find(id) != prepared.end()); + + Transaction txn = prepared[id]; + + for (auto &write : txn.getWriteSet()) { + store.put(write.first, // key + write.second, // value + Timestamp(timestamp)); // timestamp + } + + prepared.erase(id); +} + +void +OCCStore::Abort(uint64_t id, const Transaction &txn) +{ + Debug("[%lu] ABORT", id); + prepared.erase(id); +} + +void +OCCStore::Load(const string &key, const string &value, const Timestamp ×tamp) +{ + store.put(key, value, timestamp); +} + +set<string> +OCCStore::getPreparedWrites() +{ + // gather up the set of all writes that we are currently prepared for + set<string> writes; + for (auto &t : prepared) { + for (auto &write : t.second.getWriteSet()) { + writes.insert(write.first); + } + } + return writes; +} + +set<string> +OCCStore::getPreparedReadWrites() +{ + // gather up the set of all writes that we are currently prepared for + set<string> readwrites; + for (auto &t : prepared) { + for (auto &write : t.second.getWriteSet()) { + readwrites.insert(write.first); + } + for (auto &read : t.second.getReadSet()) { + readwrites.insert(read.first); + } + } + return readwrites; +} + +} // namespace spanstore diff --git a/store/backend/occstore.h b/store/backend/occstore.h new file mode 100644 index 0000000000000000000000000000000000000000..b7c0057c81b5b7cc23cc7632fcb4c3fdce941c57 --- /dev/null +++ b/store/backend/occstore.h @@ -0,0 +1,53 @@ +// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*- +// vim: set ts=4 sw=4: +/*********************************************************************** + * + * spanstore/occstore.h: + * Key-value store with support for transactions using OCC. + * + **********************************************************************/ + +#ifndef _OCC_STORE_H_ +#define _OCC_STORE_H_ + +#include "paxos-lib/lib/assert.h" +#include "paxos-lib/lib/message.h" +#include "common/versionedKVStore.h" +#include "common/txnstore.h" +#include "common/transaction.h" + +#include <vector> +#include <unordered_map> +#include <set> +#include <map> +#include <list> + +namespace spanstore { + +class OCCStore : public TxnStore +{ +public: + OCCStore(); + ~OCCStore(); + + // Overriding from TxnStore. + int Get(uint64_t id, const std::string &key, std::pair<Timestamp, std::string> &value); + int Get(uint64_t id, const std::string &key, const Timestamp ×tamp, std::pair<Timestamp, std::string> &value); + int Prepare(uint64_t id, const Transaction &txn); + void Commit(uint64_t id, uint64_t timestamp); + void Abort(uint64_t id, const Transaction &txn = Transaction()); + void Load(const std::string &key, const std::string &value, const Timestamp ×tamp); + +private: + // Data store. + VersionedKVStore store; + + std::map<uint64_t, Transaction> prepared; + + std::set<std::string> getPreparedWrites(); + std::set<std::string> getPreparedReadWrites(); +}; + +} // namespace spanstore + +#endif /* _OCC_STORE_H_ */