Skip to content
Snippets Groups Projects
Commit 4791cfd3 authored by Irene Y Zhang's avatar Irene Y Zhang
Browse files

adding locks to occstore

parent 10bcdf54
No related branches found
No related tags found
No related merge requests found
......@@ -11,6 +11,7 @@
#include "store/strongstore/client.h"
#include "store/weakstore/client.h"
#include "store/tapirstore/client.h"
#include <algorithm>
using namespace std;
......
......@@ -42,18 +42,19 @@ VersionedKVStore::inStore(const string &key)
return store.find(key) != store.end() && store[key].size() > 0;
}
void
bool
VersionedKVStore::getValue(const string &key, const Timestamp &t, set<VersionedKVStore::VersionedValue>::iterator &it)
{
if (!inStore(key)) return false;
VersionedValue v(t);
it = store[key].upper_bound(v);
// if there is no valid version at this timestamp
if (it == store[key].begin()) {
it = store[key].end();
} else {
it--;
}
if (it == store[key].begin()) return false;
it--;
return true;
}
......@@ -76,13 +77,10 @@ VersionedKVStore::get(const string &key, pair<Timestamp, string> &value)
bool
VersionedKVStore::get(const string &key, const Timestamp &t, pair<Timestamp, string> &value)
{
if (inStore(key)) {
set<VersionedValue>::iterator it;
getValue(key, t, it);
if (it != store[key].end()) {
value = make_pair((*it).write, (*it).value);
return true;
}
set<VersionedValue>::iterator it;
if (getValue(key, t, it)) {
value = make_pair((*it).write, (*it).value);
return true;
}
return false;
}
......@@ -91,18 +89,14 @@ bool
VersionedKVStore::getRange(const string &key, const Timestamp &t,
pair<Timestamp, Timestamp> &range)
{
if (inStore(key)) {
set<VersionedValue>::iterator it;
getValue(key, t, it);
set<VersionedValue>::iterator it;
if (getValue(key, t, it)) {
range.first = (*it).write;
it++;
if (it != store[key].end()) {
range.first = (*it).write;
it++;
if (it != store[key].end()) {
range.second = (*it).write;
}
return true;
range.second = (*it).write;
}
return true;
}
return false;
}
......@@ -122,17 +116,13 @@ void
VersionedKVStore::commitGet(const string &key, const Timestamp &readTime, const Timestamp &commit)
{
// Hmm ... could read a key we don't have if we are behind ... do we commit this or wait for the log update?
if (inStore(key)) {
set<VersionedValue>::iterator it;
getValue(key, readTime, it);
if (it != store[key].end()) {
// figure out if anyone has read this version before
if (lastReads.find(key) != lastReads.end() &&
lastReads[key].find((*it).write) != lastReads[key].end()) {
if (lastReads[key][(*it).write] < commit) {
lastReads[key][(*it).write] = commit;
}
set<VersionedValue>::iterator it;
if (getValue(key, readTime, it)) {
// figure out if anyone has read this version before
if (lastReads.find(key) != lastReads.end() &&
lastReads[key].find((*it).write) != lastReads[key].end()) {
if (lastReads[key][(*it).write] < commit) {
lastReads[key][(*it).write] = commit;
}
}
} // otherwise, ignore the read
......@@ -158,17 +148,15 @@ VersionedKVStore::getLastRead(const string &key, Timestamp &lastRead)
bool
VersionedKVStore::getLastRead(const string &key, const Timestamp &t, Timestamp &lastRead)
{
if (inStore(key)) {
set<VersionedValue>::iterator it;
getValue(key, t, it);
ASSERT(it != store[key].end());
set<VersionedValue>::iterator it;
bool ret = getValue(key, t, it);
ASSERT(ret);
// figure out if anyone has read this version before
if (lastReads.find(key) != lastReads.end() &&
lastReads[key].find((*it).write) != lastReads[key].end()) {
lastRead = lastReads[key][(*it).write];
return true;
}
// figure out if anyone has read this version before
if (lastReads.find(key) != lastReads.end() &&
lastReads[key].find((*it).write) != lastReads[key].end()) {
lastRead = lastReads[key][(*it).write];
return true;
}
return false;
}
......@@ -76,7 +76,7 @@ private:
std::unordered_map< std::string, std::set<VersionedValue> > store;
std::unordered_map< std::string, std::map< Timestamp, Timestamp > > lastReads;
bool inStore(const std::string &key);
void getValue(const std::string &key, const Timestamp &t, std::set<VersionedValue>::iterator &it);
bool getValue(const std::string &key, const Timestamp &t, std::set<VersionedValue>::iterator &it);
};
#endif /* _VERSIONED_KV_STORE_H_ */
......@@ -43,9 +43,9 @@ int
Store::Get(uint64_t id, const string &key, pair<Timestamp,string> &value)
{
Debug("[%lu] GET %s", id, key.c_str());
bool ret = store.get(key, value);
if (ret) {
std::lock_guard<std::mutex> lck (mtx);
if (store.get(key, value)) {
Debug("Value: %s at <%lu, %lu>", value.second.c_str(), value.first.getTimestamp(), value.first.getID());
return REPLY_OK;
} else {
......@@ -57,9 +57,9 @@ int
Store::Get(uint64_t id, const string &key, const Timestamp &timestamp, pair<Timestamp,string> &value)
{
Debug("[%lu] GET %s at <%lu, %lu>", id, key.c_str(), timestamp.getTimestamp(), timestamp.getID());
std::lock_guard<std::mutex> lck (mtx);
bool ret = store.get(key, timestamp, value);
if (ret) {
if (store.get(key, timestamp, value)) {
return REPLY_OK;
} else {
return REPLY_FAIL;
......@@ -71,6 +71,8 @@ Store::Prepare(uint64_t id, const Transaction &txn, const Timestamp &timestamp,
{
Debug("[%lu] START PREPARE", id);
std::lock_guard<std::mutex> lck (mtx);
if (prepared.find(id) != prepared.end()) {
if (prepared[id].first == timestamp) {
Warning("[%lu] Already Prepared!", id);
......@@ -212,7 +214,9 @@ Store::Commit(uint64_t id, uint64_t timestamp)
{
Debug("[%lu] COMMIT", id);
std::lock_guard<std::mutex> lck (mtx);
// Nope. might not find it
//ASSERT(prepared.find(id) != prepared.end());
......@@ -245,7 +249,8 @@ void
Store::Abort(uint64_t id, const Transaction &txn)
{
Debug("[%lu] ABORT", id);
std::lock_guard<std::mutex> lck (mtx);
if (prepared.find(id) != prepared.end()) {
prepared.erase(id);
}
......@@ -254,9 +259,11 @@ Store::Abort(uint64_t id, const Transaction &txn)
void
Store::Load(const string &key, const string &value, const Timestamp &timestamp)
{
std::lock_guard<std::mutex> lck (mtx);
store.put(key, value, timestamp);
}
// hold lock when using this function
void
Store::GetPreparedWrites(unordered_map<string, set<Timestamp>> &writes)
{
......@@ -268,6 +275,7 @@ Store::GetPreparedWrites(unordered_map<string, set<Timestamp>> &writes)
}
}
// hold lock when using this function
void
Store::GetPreparedReads(unordered_map<string, set<Timestamp>> &reads)
{
......
......@@ -42,6 +42,7 @@
#include <set>
#include <unordered_map>
#include <vector>
#include <mutex>
namespace tapirstore {
......@@ -68,8 +69,11 @@ private:
// Data store
VersionedKVStore store;
// TODO: comment this.
// list of prepared transactions
std::unordered_map<uint64_t, std::pair<Timestamp, Transaction>> prepared;
// protect both store and prepared
std::mutex mtx;
void GetPreparedWrites(std::unordered_map< std::string, std::set<Timestamp> > &writes);
void GetPreparedReads(std::unordered_map< std::string, std::set<Timestamp> > &reads);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment