Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// -*- mode: c++; c-file-style: "k&r"; c-basic-offset: 4 -*-
// vim: set ts=4 sw=4:
/***********************************************************************
*
* common/bufferclient.cc:
* Single shard buffering client implementation.
*
**********************************************************************/
#include "common/bufferclient.h"
#include "common/txnstore.h"
using namespace std;
BufferClient::BufferClient(TxnClient* txnclient) : txn()
{
this->txnclient = txnclient;
}
BufferClient::~BufferClient() { }
/* Begins a transaction. */
void
BufferClient::Begin(uint64_t tid)
{
// Initialize data structures.
txn = Transaction();
this->tid = tid;
txnclient->Begin(tid);
}
/* Get value for a key.
* Returns 0 on success, else -1. */
void
BufferClient::Get(const string &key, Promise *promise)
{
// Read your own writes, check the write set first.
if (txn.getWriteSet().find(key) != txn.getWriteSet().end()) {
promise->Reply(REPLY_OK, (txn.getWriteSet().find(key))->second);
return;
}
// Consistent reads, check the read set.
if (txn.getReadSet().find(key) != txn.getReadSet().end()) {
// read from the server at same timestamp.
txnclient->Get(tid, key, (txn.getReadSet().find(key))->second, promise);
return;
}
// Otherwise, get latest value from server.
Promise p(GET_TIMEOUT);
Promise *pp = (promise != NULL) ? promise : &p;
txnclient->Get(tid, key, pp);
if (pp->GetReply() == REPLY_OK) {
Debug("Adding [%s] with ts %lu", key.c_str(), pp->GetTimestamp().getTimestamp());
txn.addReadSet(key, pp->GetTimestamp());
}
}
/* Set value for a key. (Always succeeds).
* Returns 0 on success, else -1. */
void
BufferClient::Put(const string &key, const string &value, Promise *promise)
{
// Update the write set.
txn.addWriteSet(key, value);
promise->Reply(REPLY_OK);
}
/* Prepare the transaction. */
void
BufferClient::Prepare(const Timestamp ×tamp, Promise *promise)
{
txnclient->Prepare(tid, txn, timestamp, promise);
}
void
BufferClient::Commit(uint64_t timestamp, Promise *promise)
{
txnclient->Commit(tid, txn, timestamp, promise);
}
/* Aborts the ongoing transaction. */
void
BufferClient::Abort(Promise *promise)
{
txnclient->Abort(tid, Transaction(), promise);
}