Skip to content
Snippets Groups Projects
Commit ed0a3181 authored by Stanley Feng's avatar Stanley Feng
Browse files

[Core] Re-formatting only change to CoreWorkload.java

- Remove trailing whitespaces,
- Break longlines to below 120 chars
- Right indentation.

This is to prepare for the next commit that will add retry logic to insert to make the load process more fault tolerant.
parent 956784f7
No related branches found
No related tags found
No related merge requests found
/**
* Copyright (c) 2010 Yahoo! Inc. All rights reserved.
*
* Copyright (c) 2010 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......@@ -44,115 +44,127 @@ import java.util.Map;
import java.util.ArrayList;
/**
* The core benchmark scenario. Represents a set of clients doing simple CRUD operations. The relative
* proportion of different kinds of operations, and other properties of the workload, are controlled
* by parameters specified at runtime.
*
* The core benchmark scenario. Represents a set of clients doing simple CRUD operations. The
* relative proportion of different kinds of operations, and other properties of the workload,
* are controlled by parameters specified at runtime.
*
* Properties to control the client:
* <UL>
* <LI><b>fieldcount</b>: the number of fields in a record (default: 10)
* <LI><b>fieldlength</b>: the size of each field (default: 100)
* <LI><b>readallfields</b>: should reads read all fields (true) or just one (false) (default: true)
* <LI><b>writeallfields</b>: should updates and read/modify/writes update all fields (true) or just one (false) (default: false)
* <LI><b>writeallfields</b>: should updates and read/modify/writes update all fields (true) or just
* one (false) (default: false)
* <LI><b>readproportion</b>: what proportion of operations should be reads (default: 0.95)
* <LI><b>updateproportion</b>: what proportion of operations should be updates (default: 0.05)
* <LI><b>insertproportion</b>: what proportion of operations should be inserts (default: 0)
* <LI><b>scanproportion</b>: what proportion of operations should be scans (default: 0)
* <LI><b>readmodifywriteproportion</b>: what proportion of operations should be read a record, modify it, write it back (default: 0)
* <LI><b>requestdistribution</b>: what distribution should be used to select the records to operate on - uniform, zipfian, hotspot, or latest (default: uniform)
* <LI><b>readmodifywriteproportion</b>: what proportion of operations should be read a record,
* modify it, write it back (default: 0)
* <LI><b>requestdistribution</b>: what distribution should be used to select the records to operate
* on - uniform, zipfian, hotspot, or latest (default: uniform)
* <LI><b>maxscanlength</b>: for scans, what is the maximum number of records to scan (default: 1000)
* <LI><b>scanlengthdistribution</b>: for scans, what distribution should be used to choose the number of records to scan, for each scan, between 1 and maxscanlength (default: uniform)
* <LI><b>insertorder</b>: should records be inserted in order by key ("ordered"), or in hashed order ("hashed") (default: hashed)
* </ul>
* <LI><b>scanlengthdistribution</b>: for scans, what distribution should be used to choose the
* number of records to scan, for each scan, between 1 and maxscanlength (default: uniform)
* <LI><b>insertorder</b>: should records be inserted in order by key ("ordered"), or in hashed
* order ("hashed") (default: hashed)
* </ul>
*/
public class CoreWorkload extends Workload
{
/**
* The name of the database table to run queries against.
*/
public static final String TABLENAME_PROPERTY="table";
/**
* The default name of the database table to run queries against.
*/
public static final String TABLENAME_PROPERTY_DEFAULT="usertable";
public static String table;
/**
* The name of the property for the number of fields in a record.
*/
public static final String FIELD_COUNT_PROPERTY="fieldcount";
/**
* Default number of fields in a record.
*/
public static final String FIELD_COUNT_PROPERTY_DEFAULT="10";
int fieldcount;
private List<String> fieldnames;
/**
* The name of the property for the field length distribution. Options are "uniform", "zipfian" (favoring short records), "constant", and "histogram".
*
* If "uniform", "zipfian" or "constant", the maximum field length will be that specified by the fieldlength property. If "histogram", then the
* histogram will be read from the filename specified in the "fieldlengthhistogram" property.
*/
public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY="fieldlengthdistribution";
/**
* The default field length distribution.
*/
public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "constant";
/**
* The name of the property for the length of a field in bytes.
*/
public static final String FIELD_LENGTH_PROPERTY="fieldlength";
/**
* The default maximum length of a field in bytes.
*/
public static final String FIELD_LENGTH_PROPERTY_DEFAULT="100";
/**
* The name of a property that specifies the filename containing the field length histogram (only used if fieldlengthdistribution is "histogram").
*/
public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY = "fieldlengthhistogram";
/**
* The default filename containing a field length histogram.
*/
public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT = "hist.txt";
/**
* Generator object that produces field lengths. The value of this depends on the properties that start with "FIELD_LENGTH_".
*/
IntegerGenerator fieldlengthgenerator;
/**
* The name of the property for deciding whether to read one field (false) or all fields (true) of a record.
*/
public static final String READ_ALL_FIELDS_PROPERTY="readallfields";
/**
* The default value for the readallfields property.
*/
public static final String READ_ALL_FIELDS_PROPERTY_DEFAULT="true";
boolean readallfields;
/**
* The name of the property for deciding whether to write one field (false) or all fields (true) of a record.
*/
public static final String WRITE_ALL_FIELDS_PROPERTY="writeallfields";
/**
* The default value for the writeallfields property.
*/
public static final String WRITE_ALL_FIELDS_PROPERTY_DEFAULT="false";
boolean writeallfields;
public class CoreWorkload extends Workload {
/**
* The name of the database table to run queries against.
*/
public static final String TABLENAME_PROPERTY = "table";
/**
* The default name of the database table to run queries against.
*/
public static final String TABLENAME_PROPERTY_DEFAULT = "usertable";
public static String table;
/**
* The name of the property for the number of fields in a record.
*/
public static final String FIELD_COUNT_PROPERTY = "fieldcount";
/**
* Default number of fields in a record.
*/
public static final String FIELD_COUNT_PROPERTY_DEFAULT = "10";
int fieldcount;
private List<String> fieldnames;
/**
* The name of the property for the field length distribution. Options are "uniform", "zipfian"
* (favoring short records), "constant", and "histogram".
*
* If "uniform", "zipfian" or "constant", the maximum field length will be that specified by the
* fieldlength property. If "histogram", then the
* histogram will be read from the filename specified in the "fieldlengthhistogram" property.
*/
public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY = "fieldlengthdistribution";
/**
* The default field length distribution.
*/
public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "constant";
/**
* The name of the property for the length of a field in bytes.
*/
public static final String FIELD_LENGTH_PROPERTY = "fieldlength";
/**
* The default maximum length of a field in bytes.
*/
public static final String FIELD_LENGTH_PROPERTY_DEFAULT = "100";
/**
* The name of a property that specifies the filename containing the field length histogram (only
* used if fieldlengthdistribution is "histogram").
*/
public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY = "fieldlengthhistogram";
/**
* The default filename containing a field length histogram.
*/
public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT = "hist.txt";
/**
* Generator object that produces field lengths. The value of this depends on the properties that
* start with "FIELD_LENGTH_".
*/
IntegerGenerator fieldlengthgenerator;
/**
* The name of the property for deciding whether to read one field (false) or all fields (true) of
* a record.
*/
public static final String READ_ALL_FIELDS_PROPERTY = "readallfields";
/**
* The default value for the readallfields property.
*/
public static final String READ_ALL_FIELDS_PROPERTY_DEFAULT = "true";
boolean readallfields;
/**
* The name of the property for deciding whether to write one field (false) or all fields (true)
* of a record.
*/
public static final String WRITE_ALL_FIELDS_PROPERTY = "writeallfields";
/**
* The default value for the writeallfields property.
*/
public static final String WRITE_ALL_FIELDS_PROPERTY_DEFAULT = "false";
boolean writeallfields;
/**
......@@ -160,7 +172,7 @@ public class CoreWorkload extends Workload
* data against the formation template to ensure data integrity.
*/
public static final String DATA_INTEGRITY_PROPERTY = "dataintegrity";
/**
* The default value for the dataintegrity property.
*/
......@@ -172,337 +184,340 @@ public class CoreWorkload extends Workload
*/
private boolean dataintegrity;
/**
* The name of the property for the proportion of transactions that are reads.
*/
public static final String READ_PROPORTION_PROPERTY="readproportion";
/**
* The default proportion of transactions that are reads.
*/
public static final String READ_PROPORTION_PROPERTY_DEFAULT="0.95";
/**
* The name of the property for the proportion of transactions that are updates.
*/
public static final String UPDATE_PROPORTION_PROPERTY="updateproportion";
/**
* The default proportion of transactions that are updates.
*/
public static final String UPDATE_PROPORTION_PROPERTY_DEFAULT="0.05";
/**
* The name of the property for the proportion of transactions that are inserts.
*/
public static final String INSERT_PROPORTION_PROPERTY="insertproportion";
/**
* The default proportion of transactions that are inserts.
*/
public static final String INSERT_PROPORTION_PROPERTY_DEFAULT="0.0";
/**
* The name of the property for the proportion of transactions that are scans.
*/
public static final String SCAN_PROPORTION_PROPERTY="scanproportion";
/**
* The default proportion of transactions that are scans.
*/
public static final String SCAN_PROPORTION_PROPERTY_DEFAULT="0.0";
/**
* The name of the property for the proportion of transactions that are read-modify-write.
*/
public static final String READMODIFYWRITE_PROPORTION_PROPERTY="readmodifywriteproportion";
/**
* The default proportion of transactions that are scans.
*/
public static final String READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT="0.0";
/**
* The name of the property for the the distribution of requests across the keyspace. Options are "uniform", "zipfian" and "latest"
*/
public static final String REQUEST_DISTRIBUTION_PROPERTY="requestdistribution";
/**
* The default distribution of requests across the keyspace
*/
public static final String REQUEST_DISTRIBUTION_PROPERTY_DEFAULT="uniform";
/**
* The name of the property for the max scan length (number of records)
*/
public static final String MAX_SCAN_LENGTH_PROPERTY="maxscanlength";
/**
* The default max scan length.
*/
public static final String MAX_SCAN_LENGTH_PROPERTY_DEFAULT="1000";
/**
* The name of the property for the scan length distribution. Options are "uniform" and "zipfian" (favoring short scans)
*/
public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY="scanlengthdistribution";
/**
* The default max scan length.
*/
public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT="uniform";
/**
* The name of the property for the order to insert records. Options are "ordered" or "hashed"
*/
public static final String INSERT_ORDER_PROPERTY="insertorder";
/**
* Default insert order.
*/
public static final String INSERT_ORDER_PROPERTY_DEFAULT="hashed";
/**
/**
* The name of the property for the proportion of transactions that are reads.
*/
public static final String READ_PROPORTION_PROPERTY = "readproportion";
/**
* The default proportion of transactions that are reads.
*/
public static final String READ_PROPORTION_PROPERTY_DEFAULT = "0.95";
/**
* The name of the property for the proportion of transactions that are updates.
*/
public static final String UPDATE_PROPORTION_PROPERTY = "updateproportion";
/**
* The default proportion of transactions that are updates.
*/
public static final String UPDATE_PROPORTION_PROPERTY_DEFAULT = "0.05";
/**
* The name of the property for the proportion of transactions that are inserts.
*/
public static final String INSERT_PROPORTION_PROPERTY = "insertproportion";
/**
* The default proportion of transactions that are inserts.
*/
public static final String INSERT_PROPORTION_PROPERTY_DEFAULT = "0.0";
/**
* The name of the property for the proportion of transactions that are scans.
*/
public static final String SCAN_PROPORTION_PROPERTY = "scanproportion";
/**
* The default proportion of transactions that are scans.
*/
public static final String SCAN_PROPORTION_PROPERTY_DEFAULT = "0.0";
/**
* The name of the property for the proportion of transactions that are read-modify-write.
*/
public static final String READMODIFYWRITE_PROPORTION_PROPERTY = "readmodifywriteproportion";
/**
* The default proportion of transactions that are scans.
*/
public static final String READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT = "0.0";
/**
* The name of the property for the the distribution of requests across the keyspace. Options are
* "uniform", "zipfian" and "latest"
*/
public static final String REQUEST_DISTRIBUTION_PROPERTY = "requestdistribution";
/**
* The default distribution of requests across the keyspace
*/
public static final String REQUEST_DISTRIBUTION_PROPERTY_DEFAULT = "uniform";
/**
* The name of the property for the max scan length (number of records)
*/
public static final String MAX_SCAN_LENGTH_PROPERTY = "maxscanlength";
/**
* The default max scan length.
*/
public static final String MAX_SCAN_LENGTH_PROPERTY_DEFAULT = "1000";
/**
* The name of the property for the scan length distribution. Options are "uniform" and "zipfian"
* (favoring short scans)
*/
public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY = "scanlengthdistribution";
/**
* The default max scan length.
*/
public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "uniform";
/**
* The name of the property for the order to insert records. Options are "ordered" or "hashed"
*/
public static final String INSERT_ORDER_PROPERTY = "insertorder";
/**
* Default insert order.
*/
public static final String INSERT_ORDER_PROPERTY_DEFAULT = "hashed";
/**
* Percentage data items that constitute the hot set.
*/
public static final String HOTSPOT_DATA_FRACTION = "hotspotdatafraction";
/**
* Default value of the size of the hot set.
*/
public static final String HOTSPOT_DATA_FRACTION_DEFAULT = "0.2";
/**
* Percentage operations that access the hot set.
*/
public static final String HOTSPOT_OPN_FRACTION = "hotspotopnfraction";
/**
* Default value of the percentage operations accessing the hot set.
*/
public static final String HOTSPOT_OPN_FRACTION_DEFAULT = "0.8";
IntegerGenerator keysequence;
DiscreteGenerator operationchooser;
IntegerGenerator keychooser;
Generator fieldchooser;
AcknowledgedCounterGenerator transactioninsertkeysequence;
IntegerGenerator scanlength;
boolean orderedinserts;
int recordcount;
private Measurements _measurements = Measurements.getMeasurements();
protected static IntegerGenerator getFieldLengthGenerator(Properties p) throws WorkloadException{
IntegerGenerator fieldlengthgenerator;
String fieldlengthdistribution = p.getProperty(FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
int fieldlength=Integer.parseInt(p.getProperty(FIELD_LENGTH_PROPERTY,FIELD_LENGTH_PROPERTY_DEFAULT));
String fieldlengthhistogram = p.getProperty(FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY, FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT);
if(fieldlengthdistribution.compareTo("constant") == 0) {
fieldlengthgenerator = new ConstantIntegerGenerator(fieldlength);
} else if(fieldlengthdistribution.compareTo("uniform") == 0) {
fieldlengthgenerator = new UniformIntegerGenerator(1, fieldlength);
} else if(fieldlengthdistribution.compareTo("zipfian") == 0) {
fieldlengthgenerator = new ZipfianGenerator(1, fieldlength);
} else if(fieldlengthdistribution.compareTo("histogram") == 0) {
try {
fieldlengthgenerator = new HistogramGenerator(fieldlengthhistogram);
} catch(IOException e) {
throw new WorkloadException("Couldn't read field length histogram file: "+fieldlengthhistogram, e);
}
} else {
throw new WorkloadException("Unknown field length distribution \""+fieldlengthdistribution+"\"");
}
return fieldlengthgenerator;
}
/**
* Initialize the scenario.
* Called once, in the main client thread, before any operations are started.
*/
public void init(Properties p) throws WorkloadException
{
table = p.getProperty(TABLENAME_PROPERTY,TABLENAME_PROPERTY_DEFAULT);
fieldcount=Integer.parseInt(p.getProperty(FIELD_COUNT_PROPERTY,FIELD_COUNT_PROPERTY_DEFAULT));
IntegerGenerator keysequence;
DiscreteGenerator operationchooser;
IntegerGenerator keychooser;
Generator fieldchooser;
AcknowledgedCounterGenerator transactioninsertkeysequence;
IntegerGenerator scanlength;
boolean orderedinserts;
int recordcount;
private Measurements _measurements = Measurements.getMeasurements();
protected static IntegerGenerator getFieldLengthGenerator(Properties p) throws WorkloadException {
IntegerGenerator fieldlengthgenerator;
String fieldlengthdistribution = p.getProperty(
FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
int fieldlength =
Integer.parseInt(p.getProperty(FIELD_LENGTH_PROPERTY, FIELD_LENGTH_PROPERTY_DEFAULT));
String fieldlengthhistogram = p.getProperty(
FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY, FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT);
if (fieldlengthdistribution.compareTo("constant") == 0) {
fieldlengthgenerator = new ConstantIntegerGenerator(fieldlength);
} else if (fieldlengthdistribution.compareTo("uniform") == 0) {
fieldlengthgenerator = new UniformIntegerGenerator(1, fieldlength);
} else if (fieldlengthdistribution.compareTo("zipfian") == 0) {
fieldlengthgenerator = new ZipfianGenerator(1, fieldlength);
} else if (fieldlengthdistribution.compareTo("histogram") == 0) {
try {
fieldlengthgenerator = new HistogramGenerator(fieldlengthhistogram);
} catch (IOException e) {
throw new WorkloadException(
"Couldn't read field length histogram file: " + fieldlengthhistogram, e);
}
} else {
throw new WorkloadException(
"Unknown field length distribution \"" + fieldlengthdistribution + "\"");
}
return fieldlengthgenerator;
}
/**
* Initialize the scenario.
* Called once, in the main client thread, before any operations are started.
*/
public void init(Properties p) throws WorkloadException {
table = p.getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT);
fieldcount =
Integer.parseInt(p.getProperty(FIELD_COUNT_PROPERTY, FIELD_COUNT_PROPERTY_DEFAULT));
fieldnames = new ArrayList<String>();
for (int i = 0; i < fieldcount; i++) {
fieldnames.add("field" + i);
fieldnames.add("field" + i);
}
fieldlengthgenerator = CoreWorkload.getFieldLengthGenerator(p);
double readproportion=Double.parseDouble(p.getProperty(READ_PROPORTION_PROPERTY,READ_PROPORTION_PROPERTY_DEFAULT));
double updateproportion=Double.parseDouble(p.getProperty(UPDATE_PROPORTION_PROPERTY,UPDATE_PROPORTION_PROPERTY_DEFAULT));
double insertproportion=Double.parseDouble(p.getProperty(INSERT_PROPORTION_PROPERTY,INSERT_PROPORTION_PROPERTY_DEFAULT));
double scanproportion=Double.parseDouble(p.getProperty(SCAN_PROPORTION_PROPERTY,SCAN_PROPORTION_PROPERTY_DEFAULT));
double readmodifywriteproportion=Double.parseDouble(p.getProperty(READMODIFYWRITE_PROPORTION_PROPERTY,READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT));
recordcount=Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT));
if(recordcount == 0)
recordcount = Integer.MAX_VALUE;
String requestdistrib=p.getProperty(REQUEST_DISTRIBUTION_PROPERTY,REQUEST_DISTRIBUTION_PROPERTY_DEFAULT);
int maxscanlength=Integer.parseInt(p.getProperty(MAX_SCAN_LENGTH_PROPERTY,MAX_SCAN_LENGTH_PROPERTY_DEFAULT));
String scanlengthdistrib=p.getProperty(SCAN_LENGTH_DISTRIBUTION_PROPERTY,SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
int insertstart=Integer.parseInt(p.getProperty(INSERT_START_PROPERTY,INSERT_START_PROPERTY_DEFAULT));
readallfields=Boolean.parseBoolean(p.getProperty(READ_ALL_FIELDS_PROPERTY,READ_ALL_FIELDS_PROPERTY_DEFAULT));
writeallfields=Boolean.parseBoolean(p.getProperty(WRITE_ALL_FIELDS_PROPERTY,WRITE_ALL_FIELDS_PROPERTY_DEFAULT));
dataintegrity = Boolean.parseBoolean(p.getProperty(DATA_INTEGRITY_PROPERTY, DATA_INTEGRITY_PROPERTY_DEFAULT));
//Confirm that fieldlengthgenerator returns a constant if data
//integrity check requested.
if (dataintegrity && !(p.getProperty(FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT)).equals("constant"))
{
fieldlengthgenerator = CoreWorkload.getFieldLengthGenerator(p);
double readproportion = Double.parseDouble(
p.getProperty(READ_PROPORTION_PROPERTY, READ_PROPORTION_PROPERTY_DEFAULT));
double updateproportion = Double.parseDouble(
p.getProperty(UPDATE_PROPORTION_PROPERTY, UPDATE_PROPORTION_PROPERTY_DEFAULT));
double insertproportion = Double.parseDouble(
p.getProperty(INSERT_PROPORTION_PROPERTY, INSERT_PROPORTION_PROPERTY_DEFAULT));
double scanproportion = Double.parseDouble(
p.getProperty(SCAN_PROPORTION_PROPERTY, SCAN_PROPORTION_PROPERTY_DEFAULT));
double readmodifywriteproportion = Double.parseDouble(p.getProperty(
READMODIFYWRITE_PROPORTION_PROPERTY, READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT));
recordcount =
Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT));
if (recordcount == 0)
recordcount = Integer.MAX_VALUE;
String requestdistrib =
p.getProperty(REQUEST_DISTRIBUTION_PROPERTY, REQUEST_DISTRIBUTION_PROPERTY_DEFAULT);
int maxscanlength =
Integer.parseInt(p.getProperty(MAX_SCAN_LENGTH_PROPERTY, MAX_SCAN_LENGTH_PROPERTY_DEFAULT));
String scanlengthdistrib =
p.getProperty(SCAN_LENGTH_DISTRIBUTION_PROPERTY, SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
int insertstart =
Integer.parseInt(p.getProperty(INSERT_START_PROPERTY, INSERT_START_PROPERTY_DEFAULT));
readallfields = Boolean.parseBoolean(
p.getProperty(READ_ALL_FIELDS_PROPERTY, READ_ALL_FIELDS_PROPERTY_DEFAULT));
writeallfields = Boolean.parseBoolean(
p.getProperty(WRITE_ALL_FIELDS_PROPERTY, WRITE_ALL_FIELDS_PROPERTY_DEFAULT));
dataintegrity = Boolean.parseBoolean(
p.getProperty(DATA_INTEGRITY_PROPERTY, DATA_INTEGRITY_PROPERTY_DEFAULT));
// Confirm that fieldlengthgenerator returns a constant if data
// integrity check requested.
if (dataintegrity
&& !(p.getProperty(
FIELD_LENGTH_DISTRIBUTION_PROPERTY,
FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT)).equals("constant")) {
System.err.println("Must have constant field size to check data integrity.");
System.exit(-1);
}
if (p.getProperty(INSERT_ORDER_PROPERTY,INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed")==0)
{
orderedinserts=false;
}
else if (requestdistrib.compareTo("exponential")==0)
{
double percentile = Double.parseDouble(p.getProperty(ExponentialGenerator.EXPONENTIAL_PERCENTILE_PROPERTY,
ExponentialGenerator.EXPONENTIAL_PERCENTILE_DEFAULT));
double frac = Double.parseDouble(p.getProperty(ExponentialGenerator.EXPONENTIAL_FRAC_PROPERTY,
ExponentialGenerator.EXPONENTIAL_FRAC_DEFAULT));
keychooser = new ExponentialGenerator(percentile, recordcount*frac);
}
else
{
orderedinserts=true;
}
keysequence=new CounterGenerator(insertstart);
operationchooser=new DiscreteGenerator();
if (readproportion>0)
{
operationchooser.addValue(readproportion,"READ");
}
if (updateproportion>0)
{
operationchooser.addValue(updateproportion,"UPDATE");
}
if (insertproportion>0)
{
operationchooser.addValue(insertproportion,"INSERT");
}
if (scanproportion>0)
{
operationchooser.addValue(scanproportion,"SCAN");
}
if (readmodifywriteproportion>0)
{
operationchooser.addValue(readmodifywriteproportion,"READMODIFYWRITE");
}
transactioninsertkeysequence=new AcknowledgedCounterGenerator(recordcount);
if (requestdistrib.compareTo("uniform")==0)
{
keychooser=new UniformIntegerGenerator(0,recordcount-1);
}
else if (requestdistrib.compareTo("zipfian")==0)
{
//it does this by generating a random "next key" in part by taking the modulus over the number of keys
//if the number of keys changes, this would shift the modulus, and we don't want that to change which keys are popular
//so we'll actually construct the scrambled zipfian generator with a keyspace that is larger than exists at the beginning
//of the test. that is, we'll predict the number of inserts, and tell the scrambled zipfian generator the number of existing keys
//plus the number of predicted keys as the total keyspace. then, if the generator picks a key that hasn't been inserted yet, will
//just ignore it and pick another key. this way, the size of the keyspace doesn't change from the perspective of the scrambled zipfian generator
int opcount=Integer.parseInt(p.getProperty(Client.OPERATION_COUNT_PROPERTY));
int expectednewkeys=(int)(((double)opcount)*insertproportion*2.0); //2 is fudge factor
keychooser=new ScrambledZipfianGenerator(recordcount+expectednewkeys);
}
else if (requestdistrib.compareTo("latest")==0)
{
keychooser=new SkewedLatestGenerator(transactioninsertkeysequence);
}
else if (requestdistrib.equals("hotspot"))
{
double hotsetfraction = Double.parseDouble(p.getProperty(
HOTSPOT_DATA_FRACTION, HOTSPOT_DATA_FRACTION_DEFAULT));
double hotopnfraction = Double.parseDouble(p.getProperty(
HOTSPOT_OPN_FRACTION, HOTSPOT_OPN_FRACTION_DEFAULT));
keychooser = new HotspotIntegerGenerator(0, recordcount - 1,
hotsetfraction, hotopnfraction);
if (p.getProperty(INSERT_ORDER_PROPERTY, INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed")
== 0) {
orderedinserts = false;
} else if (requestdistrib.compareTo("exponential") == 0) {
double percentile = Double.parseDouble(p.getProperty(
ExponentialGenerator.EXPONENTIAL_PERCENTILE_PROPERTY,
ExponentialGenerator.EXPONENTIAL_PERCENTILE_DEFAULT));
double frac = Double.parseDouble(p.getProperty(
ExponentialGenerator.EXPONENTIAL_FRAC_PROPERTY,
ExponentialGenerator.EXPONENTIAL_FRAC_DEFAULT));
keychooser = new ExponentialGenerator(percentile, recordcount * frac);
} else {
orderedinserts = true;
}
keysequence = new CounterGenerator(insertstart);
operationchooser = new DiscreteGenerator();
if (readproportion > 0) {
operationchooser.addValue(readproportion, "READ");
}
else
{
throw new WorkloadException("Unknown request distribution \""+requestdistrib+"\"");
}
fieldchooser=new UniformIntegerGenerator(0,fieldcount-1);
if (scanlengthdistrib.compareTo("uniform")==0)
{
scanlength=new UniformIntegerGenerator(1,maxscanlength);
}
else if (scanlengthdistrib.compareTo("zipfian")==0)
{
scanlength=new ZipfianGenerator(1,maxscanlength);
}
else
{
throw new WorkloadException("Distribution \""+scanlengthdistrib+"\" not allowed for scan length");
}
}
public String buildKeyName(long keynum) {
if (!orderedinserts)
{
keynum=Utils.hash(keynum);
}
return "user"+keynum;
}
if (updateproportion > 0) {
operationchooser.addValue(updateproportion, "UPDATE");
}
if (insertproportion > 0) {
operationchooser.addValue(insertproportion, "INSERT");
}
if (scanproportion > 0) {
operationchooser.addValue(scanproportion, "SCAN");
}
if (readmodifywriteproportion > 0) {
operationchooser.addValue(readmodifywriteproportion, "READMODIFYWRITE");
}
transactioninsertkeysequence = new AcknowledgedCounterGenerator(recordcount);
if (requestdistrib.compareTo("uniform") == 0) {
keychooser = new UniformIntegerGenerator(0, recordcount - 1);
} else if (requestdistrib.compareTo("zipfian") == 0) {
// it does this by generating a random "next key" in part by taking the modulus over the
// number of keys.
// If the number of keys changes, this would shift the modulus, and we don't want that to
// change which keys are popular so we'll actually construct the scrambled zipfian generator
// with a keyspace that is larger than exists at the beginning of the test. that is, we'll predict
// the number of inserts, and tell the scrambled zipfian generator the number of existing keys
// plus the number of predicted keys as the total keyspace. then, if the generator picks a key
// that hasn't been inserted yet, will just ignore it and pick another key. this way, the size of
// the keyspace doesn't change from the perspective of the scrambled zipfian generator
int opcount = Integer.parseInt(p.getProperty(Client.OPERATION_COUNT_PROPERTY));
int expectednewkeys = (int) ((opcount) * insertproportion * 2.0); // 2 is fudge factor
keychooser = new ScrambledZipfianGenerator(recordcount + expectednewkeys);
} else if (requestdistrib.compareTo("latest") == 0) {
keychooser = new SkewedLatestGenerator(transactioninsertkeysequence);
} else if (requestdistrib.equals("hotspot")) {
double hotsetfraction =
Double.parseDouble(p.getProperty(HOTSPOT_DATA_FRACTION, HOTSPOT_DATA_FRACTION_DEFAULT));
double hotopnfraction =
Double.parseDouble(p.getProperty(HOTSPOT_OPN_FRACTION, HOTSPOT_OPN_FRACTION_DEFAULT));
keychooser = new HotspotIntegerGenerator(0, recordcount - 1, hotsetfraction, hotopnfraction);
} else {
throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\"");
}
fieldchooser = new UniformIntegerGenerator(0, fieldcount - 1);
if (scanlengthdistrib.compareTo("uniform") == 0) {
scanlength = new UniformIntegerGenerator(1, maxscanlength);
} else if (scanlengthdistrib.compareTo("zipfian") == 0) {
scanlength = new ZipfianGenerator(1, maxscanlength);
} else {
throw new WorkloadException(
"Distribution \"" + scanlengthdistrib + "\" not allowed for scan length");
}
}
public String buildKeyName(long keynum) {
if (!orderedinserts) {
keynum = Utils.hash(keynum);
}
return "user" + keynum;
}
/**
* Builds a value for a randomly chosen field.
*/
private HashMap<String, ByteIterator> buildSingleValue(String key) {
HashMap<String,ByteIterator> value = new HashMap<String,ByteIterator>();
HashMap<String, ByteIterator> value = new HashMap<String, ByteIterator>();
String fieldkey = fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
ByteIterator data;
if (dataintegrity) {
data = new StringByteIterator(buildDeterministicValue(key, fieldkey));
} else {
//fill with random data
// fill with random data
data = new RandomByteIterator(fieldlengthgenerator.nextInt());
}
value.put(fieldkey,data);
value.put(fieldkey, data);
return value;
return value;
}
/**
* Builds values for all fields.
*/
private HashMap<String, ByteIterator> buildValues(String key) {
HashMap<String,ByteIterator> values = new HashMap<String,ByteIterator>();
private HashMap<String, ByteIterator> buildValues(String key) {
HashMap<String, ByteIterator> values = new HashMap<String, ByteIterator>();
for (String fieldkey : fieldnames) {
ByteIterator data;
if (dataintegrity) {
data = new StringByteIterator(buildDeterministicValue(key, fieldkey));
} else {
//fill with random data
// fill with random data
data = new RandomByteIterator(fieldlengthgenerator.nextInt());
}
values.put(fieldkey,data);
values.put(fieldkey, data);
}
return values;
}
......@@ -525,244 +540,213 @@ public class CoreWorkload extends Workload
return sb.toString();
}
/**
* Do one insert operation. Because it will be called concurrently from multiple client threads, this
* function must be thread safe. However, avoid synchronized, or the threads will block waiting for each
* other, and it will be difficult to reach the target throughput. Ideally, this function would have no side
* effects other than DB operations.
*/
public boolean doInsert(DB db, Object threadstate)
{
int keynum=keysequence.nextInt();
String dbkey = buildKeyName(keynum);
HashMap<String, ByteIterator> values = buildValues(dbkey);
if (db.insert(table,dbkey,values).equals(Status.OK))
return true;
else
return false;
}
/**
* Do one transaction operation. Because it will be called concurrently from multiple client threads, this
* function must be thread safe. However, avoid synchronized, or the threads will block waiting for each
* other, and it will be difficult to reach the target throughput. Ideally, this function would have no side
* effects other than DB operations.
*/
public boolean doTransaction(DB db, Object threadstate)
{
String op=operationchooser.nextString();
if (op.compareTo("READ")==0)
{
doTransactionRead(db);
}
else if (op.compareTo("UPDATE")==0)
{
doTransactionUpdate(db);
}
else if (op.compareTo("INSERT")==0)
{
doTransactionInsert(db);
}
else if (op.compareTo("SCAN")==0)
{
doTransactionScan(db);
}
else
{
doTransactionReadModifyWrite(db);
}
return true;
}
/**
* Do one insert operation. Because it will be called concurrently from multiple client threads,
* this function must be thread safe. However, avoid synchronized, or the threads will block waiting
* for each other, and it will be difficult to reach the target throughput. Ideally, this function would
* have no side effects other than DB operations.
*/
public boolean doInsert(DB db, Object threadstate) {
int keynum = keysequence.nextInt();
String dbkey = buildKeyName(keynum);
HashMap<String, ByteIterator> values = buildValues(dbkey);
if (db.insert(table, dbkey, values).equals(Status.OK))
return true;
else
return false;
}
/**
* Do one transaction operation. Because it will be called concurrently from multiple client
* threads, this function must be thread safe. However, avoid synchronized, or the threads will block waiting
* for each other, and it will be difficult to reach the target throughput. Ideally, this function would
* have no side effects other than DB operations.
*/
public boolean doTransaction(DB db, Object threadstate) {
String op = operationchooser.nextString();
if (op.compareTo("READ") == 0) {
doTransactionRead(db);
} else if (op.compareTo("UPDATE") == 0) {
doTransactionUpdate(db);
} else if (op.compareTo("INSERT") == 0) {
doTransactionInsert(db);
} else if (op.compareTo("SCAN") == 0) {
doTransactionScan(db);
} else {
doTransactionReadModifyWrite(db);
}
return true;
}
/**
* Results are reported in the first three buckets of the histogram under
* the label "VERIFY".
* the label "VERIFY".
* Bucket 0 means the expected data was returned.
* Bucket 1 means incorrect data was returned.
* Bucket 2 means null data was returned when some data was expected.
* Bucket 2 means null data was returned when some data was expected.
*/
protected void verifyRow(String key, HashMap<String,ByteIterator> cells) {
protected void verifyRow(String key, HashMap<String, ByteIterator> cells) {
Status verifyStatus = Status.OK;
long startTime = System.nanoTime();
if (!cells.isEmpty()) {
for (Map.Entry<String, ByteIterator> entry : cells.entrySet()) {
if (!entry.getValue().toString().equals(
buildDeterministicValue(key, entry.getKey()))) {
if (!entry.getValue().toString().equals(buildDeterministicValue(key, entry.getKey()))) {
verifyStatus = Status.UNEXPECTED_STATE;
break;
}
}
} else {
//This assumes that null data is never valid
// This assumes that null data is never valid
verifyStatus = Status.ERROR;
}
long endTime = System.nanoTime();
_measurements.measure("VERIFY", (int) (endTime - startTime) / 1000);
_measurements.reportStatus("VERIFY",verifyStatus);
_measurements.reportStatus("VERIFY", verifyStatus);
}
int nextKeynum() {
int keynum;
if(keychooser instanceof ExponentialGenerator) {
do
{
keynum=transactioninsertkeysequence.lastInt() - keychooser.nextInt();
}
while(keynum < 0);
} else {
do
{
keynum=keychooser.nextInt();
}
while (keynum > transactioninsertkeysequence.lastInt());
}
return keynum;
int nextKeynum() {
int keynum;
if (keychooser instanceof ExponentialGenerator) {
do {
keynum = transactioninsertkeysequence.lastInt() - keychooser.nextInt();
} while (keynum < 0);
} else {
do {
keynum = keychooser.nextInt();
} while (keynum > transactioninsertkeysequence.lastInt());
}
return keynum;
}
public void doTransactionRead(DB db) {
// choose a random key
int keynum = nextKeynum();
String keyname = buildKeyName(keynum);
public void doTransactionRead(DB db)
{
//choose a random key
int keynum = nextKeynum();
String keyname = buildKeyName(keynum);
HashSet<String> fields=null;
if (!readallfields)
{
//read a random field
String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
fields=new HashSet<String>();
fields.add(fieldname);
} else if (dataintegrity) {
HashSet<String> fields = null;
if (!readallfields) {
// read a random field
String fieldname = fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
fields = new HashSet<String>();
fields.add(fieldname);
} else if (dataintegrity) {
// pass the full field list if dataintegrity is on for verification
fields = new HashSet<String>(fieldnames);
}
HashMap<String,ByteIterator> cells =
new HashMap<String,ByteIterator>();
db.read(table,keyname,fields,cells);
HashMap<String, ByteIterator> cells = new HashMap<String, ByteIterator>();
db.read(table, keyname, fields, cells);
if (dataintegrity) {
verifyRow(keyname, cells);
}
}
public void doTransactionReadModifyWrite(DB db)
{
//choose a random key
int keynum = nextKeynum();
String keyname = buildKeyName(keynum);
HashSet<String> fields=null;
if (!readallfields)
{
//read a random field
String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
fields=new HashSet<String>();
fields.add(fieldname);
}
HashMap<String,ByteIterator> values;
if (writeallfields)
{
//new data for all the fields
values = buildValues(keyname);
}
else
{
//update a random field
values = buildSingleValue(keyname);
}
//do the transaction
HashMap<String,ByteIterator> cells =
new HashMap<String,ByteIterator>();
long ist=_measurements.getIntendedtartTimeNs();
long st = System.nanoTime();
db.read(table,keyname,fields,cells);
db.update(table,keyname,values);
long en=System.nanoTime();
}
public void doTransactionReadModifyWrite(DB db) {
// choose a random key
int keynum = nextKeynum();
String keyname = buildKeyName(keynum);
HashSet<String> fields = null;
if (!readallfields) {
// read a random field
String fieldname = fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
fields = new HashSet<String>();
fields.add(fieldname);
}
HashMap<String, ByteIterator> values;
if (writeallfields) {
// new data for all the fields
values = buildValues(keyname);
} else {
// update a random field
values = buildSingleValue(keyname);
}
// do the transaction
HashMap<String, ByteIterator> cells = new HashMap<String, ByteIterator>();
long ist = _measurements.getIntendedtartTimeNs();
long st = System.nanoTime();
db.read(table, keyname, fields, cells);
db.update(table, keyname, values);
long en = System.nanoTime();
if (dataintegrity) {
verifyRow(keyname, cells);
}
_measurements .measure("READ-MODIFY-WRITE", (int)((en-st)/1000));
_measurements .measureIntended("READ-MODIFY-WRITE", (int)((en-ist)/1000));
}
public void doTransactionScan(DB db)
{
//choose a random key
int keynum = nextKeynum();
String startkeyname = buildKeyName(keynum);
//choose a random scan length
int len=scanlength.nextInt();
HashSet<String> fields=null;
if (!readallfields)
{
//read a random field
String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
fields=new HashSet<String>();
fields.add(fieldname);
}
db.scan(table,startkeyname,len,fields,new Vector<HashMap<String,ByteIterator>>());
}
public void doTransactionUpdate(DB db)
{
//choose a random key
int keynum = nextKeynum();
String keyname=buildKeyName(keynum);
HashMap<String,ByteIterator> values;
if (writeallfields)
{
//new data for all the fields
values = buildValues(keyname);
}
else
{
//update a random field
values = buildSingleValue(keyname);
}
db.update(table,keyname,values);
}
public void doTransactionInsert(DB db)
{
//choose the next key
int keynum=transactioninsertkeysequence.nextInt();
try {
String dbkey = buildKeyName(keynum);
HashMap<String, ByteIterator> values = buildValues(dbkey);
db.insert(table,dbkey,values);
} finally {
transactioninsertkeysequence.acknowledge(keynum);
}
}
_measurements.measure("READ-MODIFY-WRITE", (int) ((en - st) / 1000));
_measurements.measureIntended("READ-MODIFY-WRITE", (int) ((en - ist) / 1000));
}
public void doTransactionScan(DB db) {
// choose a random key
int keynum = nextKeynum();
String startkeyname = buildKeyName(keynum);
// choose a random scan length
int len = scanlength.nextInt();
HashSet<String> fields = null;
if (!readallfields) {
// read a random field
String fieldname = fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
fields = new HashSet<String>();
fields.add(fieldname);
}
db.scan(table, startkeyname, len, fields, new Vector<HashMap<String, ByteIterator>>());
}
public void doTransactionUpdate(DB db) {
// choose a random key
int keynum = nextKeynum();
String keyname = buildKeyName(keynum);
HashMap<String, ByteIterator> values;
if (writeallfields) {
// new data for all the fields
values = buildValues(keyname);
} else {
// update a random field
values = buildSingleValue(keyname);
}
db.update(table, keyname, values);
}
public void doTransactionInsert(DB db) {
// choose the next key
int keynum = transactioninsertkeysequence.nextInt();
try {
String dbkey = buildKeyName(keynum);
HashMap<String, ByteIterator> values = buildValues(dbkey);
db.insert(table, dbkey, values);
} finally {
transactioninsertkeysequence.acknowledge(keynum);
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment