Skip to content
Snippets Groups Projects
Commit e54dbdb2 authored by Sean Busbey's avatar Sean Busbey
Browse files

Merge pull request #527 from stfeng/stfeng-ycsb-core-fixes

[Core] Add retry for insertion logic to make the load process more robust
parents 956784f7 94c2b23b
No related branches found
No related tags found
No related merge requests found
/**
* Copyright (c) 2010 Yahoo! Inc. All rights reserved.
*
* Copyright (c) 2010 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......@@ -44,115 +44,127 @@ import java.util.Map;
import java.util.ArrayList;
/**
* The core benchmark scenario. Represents a set of clients doing simple CRUD operations. The relative
* proportion of different kinds of operations, and other properties of the workload, are controlled
* by parameters specified at runtime.
*
* The core benchmark scenario. Represents a set of clients doing simple CRUD operations. The
* relative proportion of different kinds of operations, and other properties of the workload,
* are controlled by parameters specified at runtime.
*
* Properties to control the client:
* <UL>
* <LI><b>fieldcount</b>: the number of fields in a record (default: 10)
* <LI><b>fieldlength</b>: the size of each field (default: 100)
* <LI><b>readallfields</b>: should reads read all fields (true) or just one (false) (default: true)
* <LI><b>writeallfields</b>: should updates and read/modify/writes update all fields (true) or just one (false) (default: false)
* <LI><b>writeallfields</b>: should updates and read/modify/writes update all fields (true) or just
* one (false) (default: false)
* <LI><b>readproportion</b>: what proportion of operations should be reads (default: 0.95)
* <LI><b>updateproportion</b>: what proportion of operations should be updates (default: 0.05)
* <LI><b>insertproportion</b>: what proportion of operations should be inserts (default: 0)
* <LI><b>scanproportion</b>: what proportion of operations should be scans (default: 0)
* <LI><b>readmodifywriteproportion</b>: what proportion of operations should be read a record, modify it, write it back (default: 0)
* <LI><b>requestdistribution</b>: what distribution should be used to select the records to operate on - uniform, zipfian, hotspot, or latest (default: uniform)
* <LI><b>readmodifywriteproportion</b>: what proportion of operations should be read a record,
* modify it, write it back (default: 0)
* <LI><b>requestdistribution</b>: what distribution should be used to select the records to operate
* on - uniform, zipfian, hotspot, or latest (default: uniform)
* <LI><b>maxscanlength</b>: for scans, what is the maximum number of records to scan (default: 1000)
* <LI><b>scanlengthdistribution</b>: for scans, what distribution should be used to choose the number of records to scan, for each scan, between 1 and maxscanlength (default: uniform)
* <LI><b>insertorder</b>: should records be inserted in order by key ("ordered"), or in hashed order ("hashed") (default: hashed)
* </ul>
* <LI><b>scanlengthdistribution</b>: for scans, what distribution should be used to choose the
* number of records to scan, for each scan, between 1 and maxscanlength (default: uniform)
* <LI><b>insertorder</b>: should records be inserted in order by key ("ordered"), or in hashed
* order ("hashed") (default: hashed)
* </ul>
*/
public class CoreWorkload extends Workload
{
/**
* The name of the database table to run queries against.
*/
public static final String TABLENAME_PROPERTY="table";
/**
* The default name of the database table to run queries against.
*/
public static final String TABLENAME_PROPERTY_DEFAULT="usertable";
public static String table;
/**
* The name of the property for the number of fields in a record.
*/
public static final String FIELD_COUNT_PROPERTY="fieldcount";
/**
* Default number of fields in a record.
*/
public static final String FIELD_COUNT_PROPERTY_DEFAULT="10";
int fieldcount;
private List<String> fieldnames;
/**
* The name of the property for the field length distribution. Options are "uniform", "zipfian" (favoring short records), "constant", and "histogram".
*
* If "uniform", "zipfian" or "constant", the maximum field length will be that specified by the fieldlength property. If "histogram", then the
* histogram will be read from the filename specified in the "fieldlengthhistogram" property.
*/
public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY="fieldlengthdistribution";
/**
* The default field length distribution.
*/
public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "constant";
/**
* The name of the property for the length of a field in bytes.
*/
public static final String FIELD_LENGTH_PROPERTY="fieldlength";
/**
* The default maximum length of a field in bytes.
*/
public static final String FIELD_LENGTH_PROPERTY_DEFAULT="100";
/**
* The name of a property that specifies the filename containing the field length histogram (only used if fieldlengthdistribution is "histogram").
*/
public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY = "fieldlengthhistogram";
/**
* The default filename containing a field length histogram.
*/
public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT = "hist.txt";
/**
* Generator object that produces field lengths. The value of this depends on the properties that start with "FIELD_LENGTH_".
*/
IntegerGenerator fieldlengthgenerator;
/**
* The name of the property for deciding whether to read one field (false) or all fields (true) of a record.
*/
public static final String READ_ALL_FIELDS_PROPERTY="readallfields";
/**
* The default value for the readallfields property.
*/
public static final String READ_ALL_FIELDS_PROPERTY_DEFAULT="true";
boolean readallfields;
/**
* The name of the property for deciding whether to write one field (false) or all fields (true) of a record.
*/
public static final String WRITE_ALL_FIELDS_PROPERTY="writeallfields";
/**
* The default value for the writeallfields property.
*/
public static final String WRITE_ALL_FIELDS_PROPERTY_DEFAULT="false";
boolean writeallfields;
public class CoreWorkload extends Workload {
/**
* The name of the database table to run queries against.
*/
public static final String TABLENAME_PROPERTY = "table";
/**
* The default name of the database table to run queries against.
*/
public static final String TABLENAME_PROPERTY_DEFAULT = "usertable";
public static String table;
/**
* The name of the property for the number of fields in a record.
*/
public static final String FIELD_COUNT_PROPERTY = "fieldcount";
/**
* Default number of fields in a record.
*/
public static final String FIELD_COUNT_PROPERTY_DEFAULT = "10";
int fieldcount;
private List<String> fieldnames;
/**
* The name of the property for the field length distribution. Options are "uniform", "zipfian"
* (favoring short records), "constant", and "histogram".
*
* If "uniform", "zipfian" or "constant", the maximum field length will be that specified by the
* fieldlength property. If "histogram", then the
* histogram will be read from the filename specified in the "fieldlengthhistogram" property.
*/
public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY = "fieldlengthdistribution";
/**
* The default field length distribution.
*/
public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "constant";
/**
* The name of the property for the length of a field in bytes.
*/
public static final String FIELD_LENGTH_PROPERTY = "fieldlength";
/**
* The default maximum length of a field in bytes.
*/
public static final String FIELD_LENGTH_PROPERTY_DEFAULT = "100";
/**
* The name of a property that specifies the filename containing the field length histogram (only
* used if fieldlengthdistribution is "histogram").
*/
public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY = "fieldlengthhistogram";
/**
* The default filename containing a field length histogram.
*/
public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT = "hist.txt";
/**
* Generator object that produces field lengths. The value of this depends on the properties that
* start with "FIELD_LENGTH_".
*/
IntegerGenerator fieldlengthgenerator;
/**
* The name of the property for deciding whether to read one field (false) or all fields (true) of
* a record.
*/
public static final String READ_ALL_FIELDS_PROPERTY = "readallfields";
/**
* The default value for the readallfields property.
*/
public static final String READ_ALL_FIELDS_PROPERTY_DEFAULT = "true";
boolean readallfields;
/**
* The name of the property for deciding whether to write one field (false) or all fields (true)
* of a record.
*/
public static final String WRITE_ALL_FIELDS_PROPERTY = "writeallfields";
/**
* The default value for the writeallfields property.
*/
public static final String WRITE_ALL_FIELDS_PROPERTY_DEFAULT = "false";
boolean writeallfields;
/**
......@@ -160,7 +172,7 @@ public class CoreWorkload extends Workload
* data against the formation template to ensure data integrity.
*/
public static final String DATA_INTEGRITY_PROPERTY = "dataintegrity";
/**
* The default value for the dataintegrity property.
*/
......@@ -172,337 +184,361 @@ public class CoreWorkload extends Workload
*/
private boolean dataintegrity;
/**
* The name of the property for the proportion of transactions that are reads.
*/
public static final String READ_PROPORTION_PROPERTY="readproportion";
/**
* The default proportion of transactions that are reads.
*/
public static final String READ_PROPORTION_PROPERTY_DEFAULT="0.95";
/**
* The name of the property for the proportion of transactions that are updates.
*/
public static final String UPDATE_PROPORTION_PROPERTY="updateproportion";
/**
* The default proportion of transactions that are updates.
*/
public static final String UPDATE_PROPORTION_PROPERTY_DEFAULT="0.05";
/**
* The name of the property for the proportion of transactions that are inserts.
*/
public static final String INSERT_PROPORTION_PROPERTY="insertproportion";
/**
* The default proportion of transactions that are inserts.
*/
public static final String INSERT_PROPORTION_PROPERTY_DEFAULT="0.0";
/**
* The name of the property for the proportion of transactions that are scans.
*/
public static final String SCAN_PROPORTION_PROPERTY="scanproportion";
/**
* The default proportion of transactions that are scans.
*/
public static final String SCAN_PROPORTION_PROPERTY_DEFAULT="0.0";
/**
* The name of the property for the proportion of transactions that are read-modify-write.
*/
public static final String READMODIFYWRITE_PROPORTION_PROPERTY="readmodifywriteproportion";
/**
* The default proportion of transactions that are scans.
*/
public static final String READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT="0.0";
/**
* The name of the property for the the distribution of requests across the keyspace. Options are "uniform", "zipfian" and "latest"
*/
public static final String REQUEST_DISTRIBUTION_PROPERTY="requestdistribution";
/**
* The default distribution of requests across the keyspace
*/
public static final String REQUEST_DISTRIBUTION_PROPERTY_DEFAULT="uniform";
/**
* The name of the property for the max scan length (number of records)
*/
public static final String MAX_SCAN_LENGTH_PROPERTY="maxscanlength";
/**
* The default max scan length.
*/
public static final String MAX_SCAN_LENGTH_PROPERTY_DEFAULT="1000";
/**
* The name of the property for the scan length distribution. Options are "uniform" and "zipfian" (favoring short scans)
*/
public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY="scanlengthdistribution";
/**
* The default max scan length.
*/
public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT="uniform";
/**
* The name of the property for the order to insert records. Options are "ordered" or "hashed"
*/
public static final String INSERT_ORDER_PROPERTY="insertorder";
/**
* Default insert order.
*/
public static final String INSERT_ORDER_PROPERTY_DEFAULT="hashed";
/**
/**
* The name of the property for the proportion of transactions that are reads.
*/
public static final String READ_PROPORTION_PROPERTY = "readproportion";
/**
* The default proportion of transactions that are reads.
*/
public static final String READ_PROPORTION_PROPERTY_DEFAULT = "0.95";
/**
* The name of the property for the proportion of transactions that are updates.
*/
public static final String UPDATE_PROPORTION_PROPERTY = "updateproportion";
/**
* The default proportion of transactions that are updates.
*/
public static final String UPDATE_PROPORTION_PROPERTY_DEFAULT = "0.05";
/**
* The name of the property for the proportion of transactions that are inserts.
*/
public static final String INSERT_PROPORTION_PROPERTY = "insertproportion";
/**
* The default proportion of transactions that are inserts.
*/
public static final String INSERT_PROPORTION_PROPERTY_DEFAULT = "0.0";
/**
* The name of the property for the proportion of transactions that are scans.
*/
public static final String SCAN_PROPORTION_PROPERTY = "scanproportion";
/**
* The default proportion of transactions that are scans.
*/
public static final String SCAN_PROPORTION_PROPERTY_DEFAULT = "0.0";
/**
* The name of the property for the proportion of transactions that are read-modify-write.
*/
public static final String READMODIFYWRITE_PROPORTION_PROPERTY = "readmodifywriteproportion";
/**
* The default proportion of transactions that are scans.
*/
public static final String READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT = "0.0";
/**
* The name of the property for the the distribution of requests across the keyspace. Options are
* "uniform", "zipfian" and "latest"
*/
public static final String REQUEST_DISTRIBUTION_PROPERTY = "requestdistribution";
/**
* The default distribution of requests across the keyspace
*/
public static final String REQUEST_DISTRIBUTION_PROPERTY_DEFAULT = "uniform";
/**
* The name of the property for the max scan length (number of records)
*/
public static final String MAX_SCAN_LENGTH_PROPERTY = "maxscanlength";
/**
* The default max scan length.
*/
public static final String MAX_SCAN_LENGTH_PROPERTY_DEFAULT = "1000";
/**
* The name of the property for the scan length distribution. Options are "uniform" and "zipfian"
* (favoring short scans)
*/
public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY = "scanlengthdistribution";
/**
* The default max scan length.
*/
public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "uniform";
/**
* The name of the property for the order to insert records. Options are "ordered" or "hashed"
*/
public static final String INSERT_ORDER_PROPERTY = "insertorder";
/**
* Default insert order.
*/
public static final String INSERT_ORDER_PROPERTY_DEFAULT = "hashed";
/**
* Percentage data items that constitute the hot set.
*/
public static final String HOTSPOT_DATA_FRACTION = "hotspotdatafraction";
/**
* Default value of the size of the hot set.
*/
public static final String HOTSPOT_DATA_FRACTION_DEFAULT = "0.2";
/**
* Percentage operations that access the hot set.
*/
public static final String HOTSPOT_OPN_FRACTION = "hotspotopnfraction";
/**
* Default value of the percentage operations accessing the hot set.
*/
public static final String HOTSPOT_OPN_FRACTION_DEFAULT = "0.8";
IntegerGenerator keysequence;
DiscreteGenerator operationchooser;
IntegerGenerator keychooser;
Generator fieldchooser;
AcknowledgedCounterGenerator transactioninsertkeysequence;
IntegerGenerator scanlength;
boolean orderedinserts;
int recordcount;
private Measurements _measurements = Measurements.getMeasurements();
protected static IntegerGenerator getFieldLengthGenerator(Properties p) throws WorkloadException{
IntegerGenerator fieldlengthgenerator;
String fieldlengthdistribution = p.getProperty(FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
int fieldlength=Integer.parseInt(p.getProperty(FIELD_LENGTH_PROPERTY,FIELD_LENGTH_PROPERTY_DEFAULT));
String fieldlengthhistogram = p.getProperty(FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY, FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT);
if(fieldlengthdistribution.compareTo("constant") == 0) {
fieldlengthgenerator = new ConstantIntegerGenerator(fieldlength);
} else if(fieldlengthdistribution.compareTo("uniform") == 0) {
fieldlengthgenerator = new UniformIntegerGenerator(1, fieldlength);
} else if(fieldlengthdistribution.compareTo("zipfian") == 0) {
fieldlengthgenerator = new ZipfianGenerator(1, fieldlength);
} else if(fieldlengthdistribution.compareTo("histogram") == 0) {
try {
fieldlengthgenerator = new HistogramGenerator(fieldlengthhistogram);
} catch(IOException e) {
throw new WorkloadException("Couldn't read field length histogram file: "+fieldlengthhistogram, e);
}
} else {
throw new WorkloadException("Unknown field length distribution \""+fieldlengthdistribution+"\"");
}
return fieldlengthgenerator;
}
/**
* Initialize the scenario.
* Called once, in the main client thread, before any operations are started.
*/
public void init(Properties p) throws WorkloadException
{
table = p.getProperty(TABLENAME_PROPERTY,TABLENAME_PROPERTY_DEFAULT);
fieldcount=Integer.parseInt(p.getProperty(FIELD_COUNT_PROPERTY,FIELD_COUNT_PROPERTY_DEFAULT));
/**
* How many times to retry when insertion of a single item to a DB fails.
*/
public static final String INSERTION_RETRY_LIMIT = "core_workload_insertion_retry_limit";
public static final String INSERTION_RETRY_LIMIT_DEFAULT = "0";
/**
* On average, how long to wait between the retries, in seconds.
*/
public static final String INSERTION_RETRY_INTERVAL = "core_workload_insertion_retry_interval";
public static final String INSERTION_RETRY_INTERVAL_DEFAULT = "3";
IntegerGenerator keysequence;
DiscreteGenerator operationchooser;
IntegerGenerator keychooser;
Generator fieldchooser;
AcknowledgedCounterGenerator transactioninsertkeysequence;
IntegerGenerator scanlength;
boolean orderedinserts;
int recordcount;
int insertionRetryLimit;
int insertionRetryInterval;
private Measurements _measurements = Measurements.getMeasurements();
protected static IntegerGenerator getFieldLengthGenerator(Properties p) throws WorkloadException {
IntegerGenerator fieldlengthgenerator;
String fieldlengthdistribution = p.getProperty(
FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
int fieldlength =
Integer.parseInt(p.getProperty(FIELD_LENGTH_PROPERTY, FIELD_LENGTH_PROPERTY_DEFAULT));
String fieldlengthhistogram = p.getProperty(
FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY, FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT);
if (fieldlengthdistribution.compareTo("constant") == 0) {
fieldlengthgenerator = new ConstantIntegerGenerator(fieldlength);
} else if (fieldlengthdistribution.compareTo("uniform") == 0) {
fieldlengthgenerator = new UniformIntegerGenerator(1, fieldlength);
} else if (fieldlengthdistribution.compareTo("zipfian") == 0) {
fieldlengthgenerator = new ZipfianGenerator(1, fieldlength);
} else if (fieldlengthdistribution.compareTo("histogram") == 0) {
try {
fieldlengthgenerator = new HistogramGenerator(fieldlengthhistogram);
} catch (IOException e) {
throw new WorkloadException(
"Couldn't read field length histogram file: " + fieldlengthhistogram, e);
}
} else {
throw new WorkloadException(
"Unknown field length distribution \"" + fieldlengthdistribution + "\"");
}
return fieldlengthgenerator;
}
/**
* Initialize the scenario.
* Called once, in the main client thread, before any operations are started.
*/
public void init(Properties p) throws WorkloadException {
table = p.getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT);
fieldcount =
Integer.parseInt(p.getProperty(FIELD_COUNT_PROPERTY, FIELD_COUNT_PROPERTY_DEFAULT));
fieldnames = new ArrayList<String>();
for (int i = 0; i < fieldcount; i++) {
fieldnames.add("field" + i);
fieldnames.add("field" + i);
}
fieldlengthgenerator = CoreWorkload.getFieldLengthGenerator(p);
double readproportion=Double.parseDouble(p.getProperty(READ_PROPORTION_PROPERTY,READ_PROPORTION_PROPERTY_DEFAULT));
double updateproportion=Double.parseDouble(p.getProperty(UPDATE_PROPORTION_PROPERTY,UPDATE_PROPORTION_PROPERTY_DEFAULT));
double insertproportion=Double.parseDouble(p.getProperty(INSERT_PROPORTION_PROPERTY,INSERT_PROPORTION_PROPERTY_DEFAULT));
double scanproportion=Double.parseDouble(p.getProperty(SCAN_PROPORTION_PROPERTY,SCAN_PROPORTION_PROPERTY_DEFAULT));
double readmodifywriteproportion=Double.parseDouble(p.getProperty(READMODIFYWRITE_PROPORTION_PROPERTY,READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT));
recordcount=Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT));
if(recordcount == 0)
recordcount = Integer.MAX_VALUE;
String requestdistrib=p.getProperty(REQUEST_DISTRIBUTION_PROPERTY,REQUEST_DISTRIBUTION_PROPERTY_DEFAULT);
int maxscanlength=Integer.parseInt(p.getProperty(MAX_SCAN_LENGTH_PROPERTY,MAX_SCAN_LENGTH_PROPERTY_DEFAULT));
String scanlengthdistrib=p.getProperty(SCAN_LENGTH_DISTRIBUTION_PROPERTY,SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
int insertstart=Integer.parseInt(p.getProperty(INSERT_START_PROPERTY,INSERT_START_PROPERTY_DEFAULT));
readallfields=Boolean.parseBoolean(p.getProperty(READ_ALL_FIELDS_PROPERTY,READ_ALL_FIELDS_PROPERTY_DEFAULT));
writeallfields=Boolean.parseBoolean(p.getProperty(WRITE_ALL_FIELDS_PROPERTY,WRITE_ALL_FIELDS_PROPERTY_DEFAULT));
dataintegrity = Boolean.parseBoolean(p.getProperty(DATA_INTEGRITY_PROPERTY, DATA_INTEGRITY_PROPERTY_DEFAULT));
//Confirm that fieldlengthgenerator returns a constant if data
//integrity check requested.
if (dataintegrity && !(p.getProperty(FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT)).equals("constant"))
{
fieldlengthgenerator = CoreWorkload.getFieldLengthGenerator(p);
double readproportion = Double.parseDouble(
p.getProperty(READ_PROPORTION_PROPERTY, READ_PROPORTION_PROPERTY_DEFAULT));
double updateproportion = Double.parseDouble(
p.getProperty(UPDATE_PROPORTION_PROPERTY, UPDATE_PROPORTION_PROPERTY_DEFAULT));
double insertproportion = Double.parseDouble(
p.getProperty(INSERT_PROPORTION_PROPERTY, INSERT_PROPORTION_PROPERTY_DEFAULT));
double scanproportion = Double.parseDouble(
p.getProperty(SCAN_PROPORTION_PROPERTY, SCAN_PROPORTION_PROPERTY_DEFAULT));
double readmodifywriteproportion = Double.parseDouble(p.getProperty(
READMODIFYWRITE_PROPORTION_PROPERTY, READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT));
recordcount =
Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT));
if (recordcount == 0)
recordcount = Integer.MAX_VALUE;
String requestdistrib =
p.getProperty(REQUEST_DISTRIBUTION_PROPERTY, REQUEST_DISTRIBUTION_PROPERTY_DEFAULT);
int maxscanlength =
Integer.parseInt(p.getProperty(MAX_SCAN_LENGTH_PROPERTY, MAX_SCAN_LENGTH_PROPERTY_DEFAULT));
String scanlengthdistrib =
p.getProperty(SCAN_LENGTH_DISTRIBUTION_PROPERTY, SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT);
int insertstart =
Integer.parseInt(p.getProperty(INSERT_START_PROPERTY, INSERT_START_PROPERTY_DEFAULT));
readallfields = Boolean.parseBoolean(
p.getProperty(READ_ALL_FIELDS_PROPERTY, READ_ALL_FIELDS_PROPERTY_DEFAULT));
writeallfields = Boolean.parseBoolean(
p.getProperty(WRITE_ALL_FIELDS_PROPERTY, WRITE_ALL_FIELDS_PROPERTY_DEFAULT));
dataintegrity = Boolean.parseBoolean(
p.getProperty(DATA_INTEGRITY_PROPERTY, DATA_INTEGRITY_PROPERTY_DEFAULT));
// Confirm that fieldlengthgenerator returns a constant if data
// integrity check requested.
if (dataintegrity
&& !(p.getProperty(
FIELD_LENGTH_DISTRIBUTION_PROPERTY,
FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT)).equals("constant")) {
System.err.println("Must have constant field size to check data integrity.");
System.exit(-1);
}
if (p.getProperty(INSERT_ORDER_PROPERTY,INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed")==0)
{
orderedinserts=false;
}
else if (requestdistrib.compareTo("exponential")==0)
{
double percentile = Double.parseDouble(p.getProperty(ExponentialGenerator.EXPONENTIAL_PERCENTILE_PROPERTY,
ExponentialGenerator.EXPONENTIAL_PERCENTILE_DEFAULT));
double frac = Double.parseDouble(p.getProperty(ExponentialGenerator.EXPONENTIAL_FRAC_PROPERTY,
ExponentialGenerator.EXPONENTIAL_FRAC_DEFAULT));
keychooser = new ExponentialGenerator(percentile, recordcount*frac);
}
else
{
orderedinserts=true;
}
keysequence=new CounterGenerator(insertstart);
operationchooser=new DiscreteGenerator();
if (readproportion>0)
{
operationchooser.addValue(readproportion,"READ");
}
if (updateproportion>0)
{
operationchooser.addValue(updateproportion,"UPDATE");
}
if (insertproportion>0)
{
operationchooser.addValue(insertproportion,"INSERT");
}
if (scanproportion>0)
{
operationchooser.addValue(scanproportion,"SCAN");
}
if (readmodifywriteproportion>0)
{
operationchooser.addValue(readmodifywriteproportion,"READMODIFYWRITE");
}
transactioninsertkeysequence=new AcknowledgedCounterGenerator(recordcount);
if (requestdistrib.compareTo("uniform")==0)
{
keychooser=new UniformIntegerGenerator(0,recordcount-1);
}
else if (requestdistrib.compareTo("zipfian")==0)
{
//it does this by generating a random "next key" in part by taking the modulus over the number of keys
//if the number of keys changes, this would shift the modulus, and we don't want that to change which keys are popular
//so we'll actually construct the scrambled zipfian generator with a keyspace that is larger than exists at the beginning
//of the test. that is, we'll predict the number of inserts, and tell the scrambled zipfian generator the number of existing keys
//plus the number of predicted keys as the total keyspace. then, if the generator picks a key that hasn't been inserted yet, will
//just ignore it and pick another key. this way, the size of the keyspace doesn't change from the perspective of the scrambled zipfian generator
int opcount=Integer.parseInt(p.getProperty(Client.OPERATION_COUNT_PROPERTY));
int expectednewkeys=(int)(((double)opcount)*insertproportion*2.0); //2 is fudge factor
keychooser=new ScrambledZipfianGenerator(recordcount+expectednewkeys);
}
else if (requestdistrib.compareTo("latest")==0)
{
keychooser=new SkewedLatestGenerator(transactioninsertkeysequence);
}
else if (requestdistrib.equals("hotspot"))
{
double hotsetfraction = Double.parseDouble(p.getProperty(
HOTSPOT_DATA_FRACTION, HOTSPOT_DATA_FRACTION_DEFAULT));
double hotopnfraction = Double.parseDouble(p.getProperty(
HOTSPOT_OPN_FRACTION, HOTSPOT_OPN_FRACTION_DEFAULT));
keychooser = new HotspotIntegerGenerator(0, recordcount - 1,
hotsetfraction, hotopnfraction);
if (p.getProperty(INSERT_ORDER_PROPERTY, INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed")
== 0) {
orderedinserts = false;
} else if (requestdistrib.compareTo("exponential") == 0) {
double percentile = Double.parseDouble(p.getProperty(
ExponentialGenerator.EXPONENTIAL_PERCENTILE_PROPERTY,
ExponentialGenerator.EXPONENTIAL_PERCENTILE_DEFAULT));
double frac = Double.parseDouble(p.getProperty(
ExponentialGenerator.EXPONENTIAL_FRAC_PROPERTY,
ExponentialGenerator.EXPONENTIAL_FRAC_DEFAULT));
keychooser = new ExponentialGenerator(percentile, recordcount * frac);
} else {
orderedinserts = true;
}
keysequence = new CounterGenerator(insertstart);
operationchooser = new DiscreteGenerator();
if (readproportion > 0) {
operationchooser.addValue(readproportion, "READ");
}
else
{
throw new WorkloadException("Unknown request distribution \""+requestdistrib+"\"");
}
fieldchooser=new UniformIntegerGenerator(0,fieldcount-1);
if (scanlengthdistrib.compareTo("uniform")==0)
{
scanlength=new UniformIntegerGenerator(1,maxscanlength);
}
else if (scanlengthdistrib.compareTo("zipfian")==0)
{
scanlength=new ZipfianGenerator(1,maxscanlength);
}
else
{
throw new WorkloadException("Distribution \""+scanlengthdistrib+"\" not allowed for scan length");
}
}
public String buildKeyName(long keynum) {
if (!orderedinserts)
{
keynum=Utils.hash(keynum);
}
return "user"+keynum;
}
if (updateproportion > 0) {
operationchooser.addValue(updateproportion, "UPDATE");
}
if (insertproportion > 0) {
operationchooser.addValue(insertproportion, "INSERT");
}
if (scanproportion > 0) {
operationchooser.addValue(scanproportion, "SCAN");
}
if (readmodifywriteproportion > 0) {
operationchooser.addValue(readmodifywriteproportion, "READMODIFYWRITE");
}
transactioninsertkeysequence = new AcknowledgedCounterGenerator(recordcount);
if (requestdistrib.compareTo("uniform") == 0) {
keychooser = new UniformIntegerGenerator(0, recordcount - 1);
} else if (requestdistrib.compareTo("zipfian") == 0) {
// it does this by generating a random "next key" in part by taking the modulus over the
// number of keys.
// If the number of keys changes, this would shift the modulus, and we don't want that to
// change which keys are popular so we'll actually construct the scrambled zipfian generator
// with a keyspace that is larger than exists at the beginning of the test. that is, we'll predict
// the number of inserts, and tell the scrambled zipfian generator the number of existing keys
// plus the number of predicted keys as the total keyspace. then, if the generator picks a key
// that hasn't been inserted yet, will just ignore it and pick another key. this way, the size of
// the keyspace doesn't change from the perspective of the scrambled zipfian generator
int opcount = Integer.parseInt(p.getProperty(Client.OPERATION_COUNT_PROPERTY));
int expectednewkeys = (int) ((opcount) * insertproportion * 2.0); // 2 is fudge factor
keychooser = new ScrambledZipfianGenerator(recordcount + expectednewkeys);
} else if (requestdistrib.compareTo("latest") == 0) {
keychooser = new SkewedLatestGenerator(transactioninsertkeysequence);
} else if (requestdistrib.equals("hotspot")) {
double hotsetfraction =
Double.parseDouble(p.getProperty(HOTSPOT_DATA_FRACTION, HOTSPOT_DATA_FRACTION_DEFAULT));
double hotopnfraction =
Double.parseDouble(p.getProperty(HOTSPOT_OPN_FRACTION, HOTSPOT_OPN_FRACTION_DEFAULT));
keychooser = new HotspotIntegerGenerator(0, recordcount - 1, hotsetfraction, hotopnfraction);
} else {
throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\"");
}
fieldchooser = new UniformIntegerGenerator(0, fieldcount - 1);
if (scanlengthdistrib.compareTo("uniform") == 0) {
scanlength = new UniformIntegerGenerator(1, maxscanlength);
} else if (scanlengthdistrib.compareTo("zipfian") == 0) {
scanlength = new ZipfianGenerator(1, maxscanlength);
} else {
throw new WorkloadException(
"Distribution \"" + scanlengthdistrib + "\" not allowed for scan length");
}
insertionRetryLimit = Integer.parseInt(p.getProperty(
INSERTION_RETRY_LIMIT, INSERTION_RETRY_LIMIT_DEFAULT));
insertionRetryInterval = Integer.parseInt(p.getProperty(
INSERTION_RETRY_INTERVAL, INSERTION_RETRY_INTERVAL_DEFAULT));
}
public String buildKeyName(long keynum) {
if (!orderedinserts) {
keynum = Utils.hash(keynum);
}
return "user" + keynum;
}
/**
* Builds a value for a randomly chosen field.
*/
private HashMap<String, ByteIterator> buildSingleValue(String key) {
HashMap<String,ByteIterator> value = new HashMap<String,ByteIterator>();
HashMap<String, ByteIterator> value = new HashMap<String, ByteIterator>();
String fieldkey = fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
ByteIterator data;
if (dataintegrity) {
data = new StringByteIterator(buildDeterministicValue(key, fieldkey));
} else {
//fill with random data
// fill with random data
data = new RandomByteIterator(fieldlengthgenerator.nextInt());
}
value.put(fieldkey,data);
value.put(fieldkey, data);
return value;
return value;
}
/**
* Builds values for all fields.
*/
private HashMap<String, ByteIterator> buildValues(String key) {
HashMap<String,ByteIterator> values = new HashMap<String,ByteIterator>();
private HashMap<String, ByteIterator> buildValues(String key) {
HashMap<String, ByteIterator> values = new HashMap<String, ByteIterator>();
for (String fieldkey : fieldnames) {
ByteIterator data;
if (dataintegrity) {
data = new StringByteIterator(buildDeterministicValue(key, fieldkey));
} else {
//fill with random data
// fill with random data
data = new RandomByteIterator(fieldlengthgenerator.nextInt());
}
values.put(fieldkey,data);
values.put(fieldkey, data);
}
return values;
}
......@@ -525,244 +561,238 @@ public class CoreWorkload extends Workload
return sb.toString();
}
/**
* Do one insert operation. Because it will be called concurrently from multiple client threads, this
* function must be thread safe. However, avoid synchronized, or the threads will block waiting for each
* other, and it will be difficult to reach the target throughput. Ideally, this function would have no side
* effects other than DB operations.
*/
public boolean doInsert(DB db, Object threadstate)
{
int keynum=keysequence.nextInt();
String dbkey = buildKeyName(keynum);
HashMap<String, ByteIterator> values = buildValues(dbkey);
if (db.insert(table,dbkey,values).equals(Status.OK))
return true;
else
return false;
}
/**
* Do one transaction operation. Because it will be called concurrently from multiple client threads, this
* function must be thread safe. However, avoid synchronized, or the threads will block waiting for each
* other, and it will be difficult to reach the target throughput. Ideally, this function would have no side
* effects other than DB operations.
*/
public boolean doTransaction(DB db, Object threadstate)
{
String op=operationchooser.nextString();
if (op.compareTo("READ")==0)
{
doTransactionRead(db);
}
else if (op.compareTo("UPDATE")==0)
{
doTransactionUpdate(db);
}
else if (op.compareTo("INSERT")==0)
{
doTransactionInsert(db);
}
else if (op.compareTo("SCAN")==0)
{
doTransactionScan(db);
}
else
{
doTransactionReadModifyWrite(db);
}
return true;
}
/**
* Do one insert operation. Because it will be called concurrently from multiple client threads,
* this function must be thread safe. However, avoid synchronized, or the threads will block waiting
* for each other, and it will be difficult to reach the target throughput. Ideally, this function would
* have no side effects other than DB operations.
*/
public boolean doInsert(DB db, Object threadstate) {
int keynum = keysequence.nextInt();
String dbkey = buildKeyName(keynum);
HashMap<String, ByteIterator> values = buildValues(dbkey);
Status status;
int numOfRetries = 0;
do {
status = db.insert(table, dbkey, values);
if (status == Status.OK) {
break;
}
// Retry if configured. Without retrying, the load process will fail
// even if one single insertion fails. User can optionally configure
// an insertion retry limit (default is 0) to enable retry.
if (++numOfRetries <= insertionRetryLimit) {
System.err.println("Retrying insertion, retry count: " + numOfRetries);
try {
// Sleep for a random number between [0.8, 1.2)*insertionRetryInterval.
int sleepTime = (int) (1000 * insertionRetryInterval * (0.8 + 0.4 * Math.random()));
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
break;
}
} else {
System.err.println("Error inserting, not retrying any more. number of attempts: " + numOfRetries +
"Insertion Retry Limit: " + insertionRetryLimit);
break;
}
} while (true);
return (status == Status.OK);
}
/**
* Do one transaction operation. Because it will be called concurrently from multiple client
* threads, this function must be thread safe. However, avoid synchronized, or the threads will block waiting
* for each other, and it will be difficult to reach the target throughput. Ideally, this function would
* have no side effects other than DB operations.
*/
public boolean doTransaction(DB db, Object threadstate) {
String op = operationchooser.nextString();
if (op.compareTo("READ") == 0) {
doTransactionRead(db);
} else if (op.compareTo("UPDATE") == 0) {
doTransactionUpdate(db);
} else if (op.compareTo("INSERT") == 0) {
doTransactionInsert(db);
} else if (op.compareTo("SCAN") == 0) {
doTransactionScan(db);
} else {
doTransactionReadModifyWrite(db);
}
return true;
}
/**
* Results are reported in the first three buckets of the histogram under
* the label "VERIFY".
* the label "VERIFY".
* Bucket 0 means the expected data was returned.
* Bucket 1 means incorrect data was returned.
* Bucket 2 means null data was returned when some data was expected.
* Bucket 2 means null data was returned when some data was expected.
*/
protected void verifyRow(String key, HashMap<String,ByteIterator> cells) {
protected void verifyRow(String key, HashMap<String, ByteIterator> cells) {
Status verifyStatus = Status.OK;
long startTime = System.nanoTime();
if (!cells.isEmpty()) {
for (Map.Entry<String, ByteIterator> entry : cells.entrySet()) {
if (!entry.getValue().toString().equals(
buildDeterministicValue(key, entry.getKey()))) {
if (!entry.getValue().toString().equals(buildDeterministicValue(key, entry.getKey()))) {
verifyStatus = Status.UNEXPECTED_STATE;
break;
}
}
} else {
//This assumes that null data is never valid
// This assumes that null data is never valid
verifyStatus = Status.ERROR;
}
long endTime = System.nanoTime();
_measurements.measure("VERIFY", (int) (endTime - startTime) / 1000);
_measurements.reportStatus("VERIFY",verifyStatus);
_measurements.reportStatus("VERIFY", verifyStatus);
}
int nextKeynum() {
int keynum;
if(keychooser instanceof ExponentialGenerator) {
do
{
keynum=transactioninsertkeysequence.lastInt() - keychooser.nextInt();
}
while(keynum < 0);
} else {
do
{
keynum=keychooser.nextInt();
}
while (keynum > transactioninsertkeysequence.lastInt());
}
return keynum;
int nextKeynum() {
int keynum;
if (keychooser instanceof ExponentialGenerator) {
do {
keynum = transactioninsertkeysequence.lastInt() - keychooser.nextInt();
} while (keynum < 0);
} else {
do {
keynum = keychooser.nextInt();
} while (keynum > transactioninsertkeysequence.lastInt());
}
return keynum;
}
public void doTransactionRead(DB db) {
// choose a random key
int keynum = nextKeynum();
String keyname = buildKeyName(keynum);
public void doTransactionRead(DB db)
{
//choose a random key
int keynum = nextKeynum();
String keyname = buildKeyName(keynum);
HashSet<String> fields=null;
if (!readallfields)
{
//read a random field
String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
fields=new HashSet<String>();
fields.add(fieldname);
} else if (dataintegrity) {
HashSet<String> fields = null;
if (!readallfields) {
// read a random field
String fieldname = fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
fields = new HashSet<String>();
fields.add(fieldname);
} else if (dataintegrity) {
// pass the full field list if dataintegrity is on for verification
fields = new HashSet<String>(fieldnames);
}
HashMap<String,ByteIterator> cells =
new HashMap<String,ByteIterator>();
db.read(table,keyname,fields,cells);
HashMap<String, ByteIterator> cells = new HashMap<String, ByteIterator>();
db.read(table, keyname, fields, cells);
if (dataintegrity) {
verifyRow(keyname, cells);
}
}
public void doTransactionReadModifyWrite(DB db)
{
//choose a random key
int keynum = nextKeynum();
String keyname = buildKeyName(keynum);
HashSet<String> fields=null;
if (!readallfields)
{
//read a random field
String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
fields=new HashSet<String>();
fields.add(fieldname);
}
HashMap<String,ByteIterator> values;
if (writeallfields)
{
//new data for all the fields
values = buildValues(keyname);
}
else
{
//update a random field
values = buildSingleValue(keyname);
}
//do the transaction
HashMap<String,ByteIterator> cells =
new HashMap<String,ByteIterator>();
long ist=_measurements.getIntendedtartTimeNs();
long st = System.nanoTime();
db.read(table,keyname,fields,cells);
db.update(table,keyname,values);
long en=System.nanoTime();
}
public void doTransactionReadModifyWrite(DB db) {
// choose a random key
int keynum = nextKeynum();
String keyname = buildKeyName(keynum);
HashSet<String> fields = null;
if (!readallfields) {
// read a random field
String fieldname = fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
fields = new HashSet<String>();
fields.add(fieldname);
}
HashMap<String, ByteIterator> values;
if (writeallfields) {
// new data for all the fields
values = buildValues(keyname);
} else {
// update a random field
values = buildSingleValue(keyname);
}
// do the transaction
HashMap<String, ByteIterator> cells = new HashMap<String, ByteIterator>();
long ist = _measurements.getIntendedtartTimeNs();
long st = System.nanoTime();
db.read(table, keyname, fields, cells);
db.update(table, keyname, values);
long en = System.nanoTime();
if (dataintegrity) {
verifyRow(keyname, cells);
}
_measurements .measure("READ-MODIFY-WRITE", (int)((en-st)/1000));
_measurements .measureIntended("READ-MODIFY-WRITE", (int)((en-ist)/1000));
}
public void doTransactionScan(DB db)
{
//choose a random key
int keynum = nextKeynum();
String startkeyname = buildKeyName(keynum);
//choose a random scan length
int len=scanlength.nextInt();
HashSet<String> fields=null;
if (!readallfields)
{
//read a random field
String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
fields=new HashSet<String>();
fields.add(fieldname);
}
db.scan(table,startkeyname,len,fields,new Vector<HashMap<String,ByteIterator>>());
}
public void doTransactionUpdate(DB db)
{
//choose a random key
int keynum = nextKeynum();
String keyname=buildKeyName(keynum);
HashMap<String,ByteIterator> values;
if (writeallfields)
{
//new data for all the fields
values = buildValues(keyname);
}
else
{
//update a random field
values = buildSingleValue(keyname);
}
db.update(table,keyname,values);
}
public void doTransactionInsert(DB db)
{
//choose the next key
int keynum=transactioninsertkeysequence.nextInt();
try {
String dbkey = buildKeyName(keynum);
HashMap<String, ByteIterator> values = buildValues(dbkey);
db.insert(table,dbkey,values);
} finally {
transactioninsertkeysequence.acknowledge(keynum);
}
}
_measurements.measure("READ-MODIFY-WRITE", (int) ((en - st) / 1000));
_measurements.measureIntended("READ-MODIFY-WRITE", (int) ((en - ist) / 1000));
}
public void doTransactionScan(DB db) {
// choose a random key
int keynum = nextKeynum();
String startkeyname = buildKeyName(keynum);
// choose a random scan length
int len = scanlength.nextInt();
HashSet<String> fields = null;
if (!readallfields) {
// read a random field
String fieldname = fieldnames.get(Integer.parseInt(fieldchooser.nextString()));
fields = new HashSet<String>();
fields.add(fieldname);
}
db.scan(table, startkeyname, len, fields, new Vector<HashMap<String, ByteIterator>>());
}
public void doTransactionUpdate(DB db) {
// choose a random key
int keynum = nextKeynum();
String keyname = buildKeyName(keynum);
HashMap<String, ByteIterator> values;
if (writeallfields) {
// new data for all the fields
values = buildValues(keyname);
} else {
// update a random field
values = buildSingleValue(keyname);
}
db.update(table, keyname, values);
}
public void doTransactionInsert(DB db) {
// choose the next key
int keynum = transactioninsertkeysequence.nextInt();
try {
String dbkey = buildKeyName(keynum);
HashMap<String, ByteIterator> values = buildValues(dbkey);
db.insert(table, dbkey, values);
} finally {
transactioninsertkeysequence.acknowledge(keynum);
}
}
}
......@@ -156,3 +156,16 @@ timeseries.granularity=1000
# property.
# reportlatencyforeacherror=false
# latencytrackederrors="<comma separated strings of error codes>"
# Insertion error retry for the core workload.
#
# By default, the YCSB core workload does not retry any operations.
# However, during the load process, if any insertion fails, the entire
# load process is terminated.
# If a user desires to have more robust behavior during this phase, they can
# enable retry for insertion by setting the following property to a positive
# number.
# core_workload_insertion_retry_limit = 0
#
# the following number controls the interval between retries (in seconds):
# core_workload_insertion_retry_interval = 3
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment