Skip to content
Snippets Groups Projects
Commit a4afd29f authored by Irene Y Zhang's avatar Irene Y Zhang
Browse files

adding some backend storage code

parent eeaba446
No related branches found
No related tags found
No related merge requests found
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* spanstore/lockserver.cc:
* Simple multi-reader, single-writer lock server
*
**********************************************************************/
#include "spanstore/lockserver.h"
using namespace std;
namespace spanstore {
LockServer::LockServer()
{
readers = 0;
writers = 0;
}
LockServer::~LockServer() { }
bool
LockServer::Waiter::checkTimeout(const struct timeval &now)
{
if (now.tv_sec > waitTime.tv_sec) {
return true;
} else {
ASSERT(now.tv_usec > waitTime.tv_usec && now.tv_sec == waitTime.tv_sec);
if (now.tv_usec - waitTime.tv_usec > LOCK_WAIT_TIMEOUT)
return true;
}
return false;
}
void
LockServer::Lock::waitForLock(uint64_t requester, bool write)
{
if (waiters.find(requester) != waiters.end()) {
// Already waiting
return;
}
Debug("[%lu] Adding me to the queue ...", requester);
// Otherwise
waiters[requester] = Waiter(write);
waitQ.push(requester);
}
bool
LockServer::Lock::tryAcquireLock(uint64_t requester, bool write)
{
if (waitQ.size() == 0) {
return true;
}
Debug("[%lu] Trying to get lock for %d", requester, (int)write);
struct timeval now;
uint64_t w = waitQ.front();
gettimeofday(&now, NULL);
// prune old requests out of the wait queue
while (waiters[w].checkTimeout(now)) {
waiters.erase(w);
waitQ.pop();
// if everyone else was old ...
if (waitQ.size() == 0) {
return true;
}
w = waitQ.front();
ASSERT(waiters.find(w) != waiters.end());
}
if (waitQ.front() == requester) {
// this lock is being reserved for the requester
waitQ.pop();
ASSERT(waiters.find(requester) != waiters.end());
ASSERT(waiters[requester].write == write);
waiters.erase(requester);
return true;
} else {
// otherwise, add me to the list
waitForLock(requester, write);
return false;
}
}
bool
LockServer::Lock::isWriteNext()
{
if (waitQ.size() == 0) return false;
struct timeval now;
uint64_t w = waitQ.front();
gettimeofday(&now, NULL);
// prune old requests out of the wait queue
while (waiters[w].checkTimeout(now)) {
waiters.erase(w);
waitQ.pop();
// if everyone else was old ...
if (waitQ.size() == 0) {
return false;
}
w = waitQ.front();
ASSERT(waiters.find(w) != waiters.end());
}
ASSERT(waiters.find(waitQ.front()) != waiters.end());
return waiters[waitQ.front()].write;
}
bool
LockServer::lockForRead(const string &lock, uint64_t requester)
{
Lock &l = locks[lock];
Debug("Lock for Read: %s [%lu %lu %lu %lu]", lock.c_str(),
readers, writers, l.holders.size(), l.waiters.size());
switch (l.state) {
case UNLOCKED:
// if you are next in the queue
if (l.tryAcquireLock(requester, false)) {
Debug("[%lu] I have acquired the read lock!", requester);
l.state = LOCKED_FOR_READ;
ASSERT(l.holders.size() == 0);
l.holders.insert(requester);
readers++;
return true;
}
return false;
case LOCKED_FOR_READ:
// if you already hold this lock
if (l.holders.find(requester) != l.holders.end()) {
return true;
}
// There is a write waiting, let's give up the lock
if (l.isWriteNext()) {
Debug("[%lu] Waiting on lock because there is a pending write request", requester);
l.waitForLock(requester, false);
return false;
}
l.holders.insert(requester);
readers++;
return true;
case LOCKED_FOR_WRITE:
case LOCKED_FOR_READ_WRITE:
if (l.holders.count(requester) > 0) {
l.state = LOCKED_FOR_READ_WRITE;
readers++;
return true;
}
ASSERT(l.holders.size() == 1);
Debug("Locked for write, held by %lu", *(l.holders.begin()));
l.waitForLock(requester, false);
return false;
}
NOT_REACHABLE();
return false;
}
bool
LockServer::lockForWrite(const string &lock, uint64_t requester)
{
Lock &l = locks[lock];
Debug("Lock for Write: %s [%lu %lu %lu %lu]", lock.c_str(),
readers, writers, l.holders.size(), l.waiters.size());
switch (l.state) {
case UNLOCKED:
// Got it!
if (l.tryAcquireLock(requester, true)) {
Debug("[%lu] I have acquired the write lock!", requester);
l.state = LOCKED_FOR_WRITE;
ASSERT(l.holders.size() == 0);
l.holders.insert(requester);
writers++;
return true;
}
return false;
case LOCKED_FOR_READ:
if (l.holders.size() == 1 && l.holders.count(requester) > 0) {
// if there is one holder of this read lock and it is the
// requester, then upgrade the lock
l.state = LOCKED_FOR_READ_WRITE;
writers++;
return true;
}
Debug("Locked for read by%s%lu other people", l.holders.count(requester) > 0 ? "you" : "", l.holders.size());
l.waitForLock(requester, true);
return false;
case LOCKED_FOR_WRITE:
case LOCKED_FOR_READ_WRITE:
ASSERT(l.holders.size() == 1);
if (l.holders.count(requester) > 0) {
return true;
}
Debug("Held by %lu for %s", *(l.holders.begin()), (l.state == LOCKED_FOR_WRITE) ? "write" : "read-write" );
l.waitForLock(requester, true);
return false;
}
NOT_REACHABLE();
return false;
}
void
LockServer::releaseForRead(const string &lock, uint64_t holder)
{
if (locks.find(lock) == locks.end()) {
return;
}
Lock &l = locks[lock];
if (l.holders.count(holder) == 0) {
Warning("[%ld] Releasing unheld read lock: %s", holder, lock.c_str());
return;
}
switch (l.state) {
case UNLOCKED:
case LOCKED_FOR_WRITE:
return;
case LOCKED_FOR_READ:
readers--;
if (l.holders.erase(holder) < 1) {
Warning("[%ld] Releasing unheld read lock: %s", holder, lock.c_str());
}
if (l.holders.empty()) {
l.state = UNLOCKED;
}
return;
case LOCKED_FOR_READ_WRITE:
readers--;
l.state = LOCKED_FOR_WRITE;
return;
}
}
void
LockServer::releaseForWrite(const string &lock, uint64_t holder)
{
if (locks.find(lock) == locks.end()) {
return;
}
Lock &l = locks[lock];
if (l.holders.count(holder) == 0) {
Warning("[%ld] Releasing unheld write lock: %s", holder, lock.c_str());
return;
}
switch (l.state) {
case UNLOCKED:
case LOCKED_FOR_READ:
return;
case LOCKED_FOR_WRITE:
writers--;
l.holders.erase(holder);
ASSERT(l.holders.size() == 0);
l.state = UNLOCKED;
return;
case LOCKED_FOR_READ_WRITE:
writers--;
l.state = LOCKED_FOR_READ;
ASSERT(l.holders.size() == 1);
return;
}
}
} // namespace spanstore
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* spanstore/lockserver.h:
* Simple multi-reader, single-writer lock server.
*
**********************************************************************/
#ifndef _LOCK_SERVER_H_
#define _LOCK_SERVER_H_
#include "paxos-lib/lib/assert.h"
#include "paxos-lib/lib/message.h"
#include <sys/time.h>
#include <map>
#include <queue>
#include <string>
#include <vector>
#include <unordered_map>
#include <unordered_set>
namespace spanstore {
#define LOCK_WAIT_TIMEOUT 5000
class LockServer
{
public:
LockServer();
~LockServer();
bool lockForRead(const std::string &lock, uint64_t requester);
bool lockForWrite(const std::string &lock, uint64_t requester);
void releaseForRead(const std::string &lock, uint64_t holder);
void releaseForWrite(const std::string &lock, uint64_t holder);
private:
enum LockState {
UNLOCKED,
LOCKED_FOR_READ,
LOCKED_FOR_WRITE,
LOCKED_FOR_READ_WRITE
};
struct Waiter {
bool write;
struct timeval waitTime;
Waiter() {write = false;}
Waiter(bool w) {
gettimeofday(&waitTime, NULL);
write = w;
}
bool checkTimeout(const struct timeval &now);
};
struct Lock {
LockState state;
std::unordered_set<uint64_t> holders;
std::queue<uint64_t> waitQ;
std::map<uint64_t, Waiter> waiters;
Lock() {
state = UNLOCKED;
};
void waitForLock(uint64_t requester, bool write);
bool tryAcquireLock(uint64_t requester, bool write);
bool isWriteNext();
};
/* Global store which keep key -> (timestamp, value) list. */
std::unordered_map<std::string, Lock> locks;
uint64_t readers;
uint64_t writers;
};
} // namespace spanstore
#endif /* _LOCK_SERVER_H_ */
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* nistore/lockstore.cc:
* Key-value store with support for S2PL
*
**********************************************************************/
#include "spanstore/lockstore.h"
using namespace std;
namespace spanstore {
LockStore::LockStore() : TxnStore(), store() { }
LockStore::~LockStore() { }
int
LockStore::Get(uint64_t id, const string &key, pair<Timestamp, string> &value)
{
Debug("[%lu] GET %s", id, key.c_str());
string val;
if (!store.get(key, val)) {
// couldn't find the key
return REPLY_FAIL;
}
// grab the lock (ok, if we already have it)
if (locks.lockForRead(key, id)) {
value = make_pair(Timestamp(), val);
return REPLY_OK;
} else {
Debug("[%lu] Could not acquire read lock", id);
return REPLY_RETRY;
}
}
int
LockStore::Get(uint64_t id, const string &key, const Timestamp &timestamp, pair<Timestamp, string> &value)
{
return Get(id, key, value);
}
int
LockStore::Prepare(uint64_t id, const Transaction &txn)
{
Debug("[%lu] START PREPARE", id);
if (prepared.size() > 100) {
Warning("Lots of prepared transactions! %lu", prepared.size());
}
if (prepared.find(id) != prepared.end()) {
Debug("[%lu] Already prepared", id);
return REPLY_OK;
}
if (getLocks(id, txn)) {
prepared[id] = txn;
Debug("[%lu] PREPARED TO COMMIT", id);
return REPLY_OK;
} else {
Debug("[%lu] Could not acquire write locks", id);
return REPLY_RETRY;
}
}
void
LockStore::Commit(uint64_t id, uint64_t timestamp)
{
Debug("[%lu] COMMIT", id);
ASSERT(prepared.find(id) != prepared.end());
Transaction txn = prepared[id];
for (auto &write : txn.getWriteSet()) {
bool ret = store.put(write.first, // key
write.second); // value
ASSERT(ret);
}
//drop locks
dropLocks(id, txn);
prepared.erase(id);
}
void
LockStore::Abort(uint64_t id, const Transaction &txn)
{
Debug("[%lu] ABORT", id);
dropLocks(id, txn);
prepared.erase(id);
}
void
LockStore::Load(const string &key, const string &value, const Timestamp &timestamp)
{
store.put(key, value);
}
/* Used on commit and abort for second phase of 2PL. */
void
LockStore::dropLocks(uint64_t id, const Transaction &txn)
{
for (auto &write : txn.getWriteSet()) {
locks.releaseForWrite(write.first, id);
}
for (auto &read : txn.getReadSet()) {
locks.releaseForRead(read.first, id);
}
}
bool
LockStore::getLocks(uint64_t id, const Transaction &txn)
{
bool ret = true;
// if we don't have read locks, get read locks
for (auto &read : txn.getReadSet()) {
if (!locks.lockForRead(read.first, id)) {
ret = false;
}
}
for (auto &write : txn.getWriteSet()) {
if (!locks.lockForWrite(write.first, id)) {
ret = false;
}
}
return ret;
}
} // namespace spanstore
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* common/lockstore.h
* Single-node Key-value store with support for 2PC locking-based
* transactions using S2PL
*
**********************************************************************/
#ifndef _LOCK_STORE_H_
#define _LOCK_STORE_H_
#include "paxos-lib/lib/assert.h"
#include "paxos-lib/lib/message.h"
#include "common/kvstore.h"
#include "common/txnstore.h"
#include "common/transaction.h"
#include "spanstore/lockserver.h"
#include <vector>
#include <unordered_map>
#include <set>
#include <map>
#include <list>
namespace spanstore {
class LockStore : public TxnStore
{
public:
LockStore();
~LockStore();
// Overriding from TxnStore.
int Get(uint64_t id, const std::string &key,
std::pair<Timestamp, std::string> &value);
int Get(uint64_t id, const std::string &key, const Timestamp &timestamp,
std::pair<Timestamp, std::string> &value);
int Prepare(uint64_t id, const Transaction &txn);
void Commit(uint64_t id, uint64_t timestamp);
void Abort(uint64_t id, const Transaction &txn);
void Load(const std::string &key, const std::string &value,
const Timestamp &timestamp);
private:
// Data store.
KVStore store;
// Locks manager.
LockServer locks;
std::map<uint64_t, Transaction> prepared;
void dropLocks(uint64_t id, const Transaction &txn);
bool getLocks(uint64_t id, const Transaction &txn);
};
} // namespace spanstore
#endif /* _LOCK_STORE_H_ */
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* spanstore/occstore.cc:
* Key-value store with support for OCC
*
**********************************************************************/
#include "spanstore/occstore.h"
using namespace std;
namespace spanstore {
OCCStore::OCCStore() : store() { }
OCCStore::~OCCStore() { }
int
OCCStore::Get(uint64_t id, const string &key, pair<Timestamp, string> &value)
{
Debug("[%lu] GET %s", id, key.c_str());
// Get latest from store
if (store.get(key, value)) {
Debug("[%lu] GET %s %lu", id, key.c_str(), value.first.getTimestamp());
return REPLY_OK;
} else {
return REPLY_FAIL;
}
}
int
OCCStore::Get(uint64_t id, const string &key, const Timestamp &timestamp, pair<Timestamp, string> &value)
{
Debug("[%lu] GET %s", id, key.c_str());
// Get version at timestamp from store
if (store.get(key, timestamp, value)) {
return REPLY_OK;
} else {
return REPLY_FAIL;
}
}
int
OCCStore::Prepare(uint64_t id, const Transaction &txn)
{
Debug("[%lu] START PREPARE", id);
if (prepared.find(id) != prepared.end()) {
Debug("[%lu] Already prepared!", id);
return REPLY_OK;
}
// Do OCC checks.
set<string> pWrites = getPreparedWrites();
set<string> pRW = getPreparedReadWrites();
// Check for conflicts with the read set.
for (auto &read : txn.getReadSet()) {
pair<Timestamp, string> cur;
bool ret = store.get(read.first, cur);
// ASSERT(ret);
if (!ret)
continue;
// If this key has been written since we read it, abort.
if (cur.first > read.second) {
Debug("[%lu] ABORT rw conflict key:%s %lu %lu",
id, read.first.c_str(), cur.first.getTimestamp(),
read.second.getTimestamp());
Abort(id);
return REPLY_FAIL;
}
// If there is a pending write for this key, abort.
if (pWrites.find(read.first) != pWrites.end()) {
Debug("[%lu] ABORT rw conflict w/ prepared key:%s",
id, read.first.c_str());
Abort(id);
return REPLY_FAIL;
}
}
// Check for conflicts with the write set.
for (auto &write : txn.getWriteSet()) {
// If there is a pending read or write for this key, abort.
if (pRW.find(write.first) != pRW.end()) {
Debug("[%lu] ABORT ww conflict w/ prepared key:%s", id,
write.first.c_str());
Abort(id);
return REPLY_FAIL;
}
}
// Otherwise, prepare this transaction for commit
prepared[id] = txn;
Debug("[%lu] PREPARED TO COMMIT", id);
return REPLY_OK;
}
void
OCCStore::Commit(uint64_t id, uint64_t timestamp)
{
Debug("[%lu] COMMIT", id);
ASSERT(prepared.find(id) != prepared.end());
Transaction txn = prepared[id];
for (auto &write : txn.getWriteSet()) {
store.put(write.first, // key
write.second, // value
Timestamp(timestamp)); // timestamp
}
prepared.erase(id);
}
void
OCCStore::Abort(uint64_t id, const Transaction &txn)
{
Debug("[%lu] ABORT", id);
prepared.erase(id);
}
void
OCCStore::Load(const string &key, const string &value, const Timestamp &timestamp)
{
store.put(key, value, timestamp);
}
set<string>
OCCStore::getPreparedWrites()
{
// gather up the set of all writes that we are currently prepared for
set<string> writes;
for (auto &t : prepared) {
for (auto &write : t.second.getWriteSet()) {
writes.insert(write.first);
}
}
return writes;
}
set<string>
OCCStore::getPreparedReadWrites()
{
// gather up the set of all writes that we are currently prepared for
set<string> readwrites;
for (auto &t : prepared) {
for (auto &write : t.second.getWriteSet()) {
readwrites.insert(write.first);
}
for (auto &read : t.second.getReadSet()) {
readwrites.insert(read.first);
}
}
return readwrites;
}
} // namespace spanstore
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* spanstore/occstore.h:
* Key-value store with support for transactions using OCC.
*
**********************************************************************/
#ifndef _OCC_STORE_H_
#define _OCC_STORE_H_
#include "paxos-lib/lib/assert.h"
#include "paxos-lib/lib/message.h"
#include "common/versionedKVStore.h"
#include "common/txnstore.h"
#include "common/transaction.h"
#include <vector>
#include <unordered_map>
#include <set>
#include <map>
#include <list>
namespace spanstore {
class OCCStore : public TxnStore
{
public:
OCCStore();
~OCCStore();
// Overriding from TxnStore.
int Get(uint64_t id, const std::string &key, std::pair<Timestamp, std::string> &value);
int Get(uint64_t id, const std::string &key, const Timestamp &timestamp, std::pair<Timestamp, std::string> &value);
int Prepare(uint64_t id, const Transaction &txn);
void Commit(uint64_t id, uint64_t timestamp);
void Abort(uint64_t id, const Transaction &txn = Transaction());
void Load(const std::string &key, const std::string &value, const Timestamp &timestamp);
private:
// Data store.
VersionedKVStore store;
std::map<uint64_t, Transaction> prepared;
std::set<std::string> getPreparedWrites();
std::set<std::string> getPreparedReadWrites();
};
} // namespace spanstore
#endif /* _OCC_STORE_H_ */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment