Skip to content
Snippets Groups Projects
Commit 5b53485a authored by Sean Busbey's avatar Sean Busbey
Browse files

[core] Merge pull request #674 from danielpoltx/splitrun

[core] Parallel run and string sort order
parents 4c142765 cd5e9d6f
No related branches found
No related tags found
No related merge requests found
/**
* Copyright (c) 2010 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
/**
* Copyright (c) 2010 Yahoo! Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Properties;
/**
* One experiment scenario. One object of this type will
......@@ -27,86 +28,86 @@ import java.util.concurrent.atomic.AtomicBoolean;
* load it dynamically. Any argument-based initialization should be
* done by init().
*
* If you extend this class, you should support the "insertstart" property. This
* allows the load phase to proceed from multiple clients on different machines, in case
* If you extend this class, you should support the "insertstart" property. This
* allows the Client to proceed from multiple clients on different machines, in case
* the client is the bottleneck. For example, if we want to load 1 million records from
* 2 machines, the first machine should have insertstart=0 and the second insertstart=500000. Additionally,
* the "insertcount" property, which is interpreted by Client, can be used to tell each instance of the
* client how many inserts to do. In the example above, both clients should have insertcount=500000.
*/
public abstract class Workload
{
public static final String INSERT_START_PROPERTY="insertstart";
public static final String INSERT_START_PROPERTY_DEFAULT="0";
private volatile AtomicBoolean stopRequested = new AtomicBoolean(false);
/**
* Initialize the scenario. Create any generators and other shared objects here.
* Called once, in the main client thread, before any operations are started.
*/
public void init(Properties p) throws WorkloadException
{
}
public abstract class Workload {
public static final String INSERT_START_PROPERTY = "insertstart";
public static final String INSERT_COUNT_PROPERTY = "insertcount";
public static final String INSERT_START_PROPERTY_DEFAULT = "0";
private volatile AtomicBoolean stopRequested = new AtomicBoolean(false);
/**
* Initialize the scenario. Create any generators and other shared objects here.
* Called once, in the main client thread, before any operations are started.
*/
public void init(Properties p) throws WorkloadException {
}
/**
* Initialize any state for a particular client thread. Since the scenario object
* will be shared among all threads, this is the place to create any state that is specific
* to one thread. To be clear, this means the returned object should be created anew on each
* call to initThread(); do not return the same object multiple times.
* The returned object will be passed to invocations of doInsert() and doTransaction()
* for this thread. There should be no side effects from this call; all state should be encapsulated
* in the returned object. If you have no state to retain for this thread, return null. (But if you have
* no state to retain for this thread, probably you don't need to override initThread().)
*
* @return false if the workload knows it is done for this thread. Client will terminate the thread. Return true otherwise. Return true for workloads that rely on operationcount. For workloads that read traces from a file, return true when there are more to do, false when you are done.
*/
public Object initThread(Properties p, int mythreadid, int threadcount) throws WorkloadException
{
return null;
}
/**
* Cleanup the scenario. Called once, in the main client thread, after all operations have completed.
*/
public void cleanup() throws WorkloadException
{
}
/**
* Do one insert operation. Because it will be called concurrently from multiple client threads, this
* function must be thread safe. However, avoid synchronized, or the threads will block waiting for each
* other, and it will be difficult to reach the target throughput. Ideally, this function would have no side
* effects other than DB operations and mutations on threadstate. Mutations to threadstate do not need to be
* synchronized, since each thread has its own threadstate instance.
*/
public abstract boolean doInsert(DB db, Object threadstate);
/**
* Initialize any state for a particular client thread. Since the scenario object
* will be shared among all threads, this is the place to create any state that is specific
* to one thread. To be clear, this means the returned object should be created anew on each
* call to initThread(); do not return the same object multiple times.
* The returned object will be passed to invocations of doInsert() and doTransaction()
* for this thread. There should be no side effects from this call; all state should be encapsulated
* in the returned object. If you have no state to retain for this thread, return null. (But if you have
* no state to retain for this thread, probably you don't need to override initThread().)
*
* @return false if the workload knows it is done for this thread. Client will terminate the thread.
* Return true otherwise. Return true for workloads that rely on operationcount. For workloads that read
* traces from a file, return true when there are more to do, false when you are done.
*/
public Object initThread(Properties p, int mythreadid, int threadcount) throws WorkloadException {
return null;
}
/**
* Do one transaction operation. Because it will be called concurrently from multiple client threads, this
* function must be thread safe. However, avoid synchronized, or the threads will block waiting for each
* other, and it will be difficult to reach the target throughput. Ideally, this function would have no side
* effects other than DB operations and mutations on threadstate. Mutations to threadstate do not need to be
* synchronized, since each thread has its own threadstate instance.
*
* @return false if the workload knows it is done for this thread. Client will terminate the thread. Return true otherwise. Return true for workloads that rely on operationcount. For workloads that read traces from a file, return true when there are more to do, false when you are done.
*/
public abstract boolean doTransaction(DB db, Object threadstate);
/**
* Allows scheduling a request to stop the workload.
*/
public void requestStop() {
stopRequested.set(true);
}
/**
* Check the status of the stop request flag.
* @return true if stop was requested, false otherwise.
*/
public boolean isStopRequested() {
if (stopRequested.get() == true) return true;
else return false;
}
/**
* Cleanup the scenario. Called once, in the main client thread, after all operations have completed.
*/
public void cleanup() throws WorkloadException {
}
/**
* Do one insert operation. Because it will be called concurrently from multiple client threads, this
* function must be thread safe. However, avoid synchronized, or the threads will block waiting for each
* other, and it will be difficult to reach the target throughput. Ideally, this function would have no side
* effects other than DB operations and mutations on threadstate. Mutations to threadstate do not need to be
* synchronized, since each thread has its own threadstate instance.
*/
public abstract boolean doInsert(DB db, Object threadstate);
/**
* Do one transaction operation. Because it will be called concurrently from multiple client threads, this
* function must be thread safe. However, avoid synchronized, or the threads will block waiting for each
* other, and it will be difficult to reach the target throughput. Ideally, this function would have no side
* effects other than DB operations and mutations on threadstate. Mutations to threadstate do not need to be
* synchronized, since each thread has its own threadstate instance.
*
* @return false if the workload knows it is done for this thread. Client will terminate the thread.
* Return true otherwise. Return true for workloads that rely on operationcount. For workloads that read
* traces from a file, return true when there are more to do, false when you are done.
*/
public abstract boolean doTransaction(DB db, Object threadstate);
/**
* Allows scheduling a request to stop the workload.
*/
public void requestStop() {
stopRequested.set(true);
}
/**
* Check the status of the stop request flag.
* @return true if stop was requested, false otherwise.
*/
public boolean isStopRequested() {
return stopRequested.get();
}
}
/**
* Copyright (c) 2016 YCSB Contributors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.generator;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Generates a sequence of integers 0, 1, ...
*/
public class SequentialGenerator extends NumberGenerator {
final AtomicInteger counter;
int _interval, _countstart;
/**
* Create a counter that starts at countstart.
*/
public SequentialGenerator(int countstart, int countend) {
counter = new AtomicInteger();
setLastValue(counter.get());
_countstart = countstart;
_interval = countend - countstart + 1;
}
/**
* If the generator returns numeric (integer) values, return the next value as an int.
* Default is to return -1, which is appropriate for generators that do not return numeric values.
*/
public int nextInt() {
int ret = _countstart + counter.getAndIncrement() % _interval;
setLastValue(ret);
return ret;
}
@Override
public Number nextValue() {
int ret = _countstart + counter.getAndIncrement() % _interval;
setLastValue(ret);
return ret;
}
@Override
public Number lastValue() {
return counter.get() + 1;
}
@Override
public double mean() {
throw new UnsupportedOperationException("Can't compute mean of non-stationary distribution!");
}
}
......@@ -21,26 +21,28 @@ import java.util.Properties;
import com.yahoo.ycsb.*;
import com.yahoo.ycsb.generator.AcknowledgedCounterGenerator;
import com.yahoo.ycsb.generator.ConstantIntegerGenerator;
import com.yahoo.ycsb.generator.CounterGenerator;
import com.yahoo.ycsb.generator.DiscreteGenerator;
import com.yahoo.ycsb.generator.ExponentialGenerator;
import com.yahoo.ycsb.generator.ConstantIntegerGenerator;
import com.yahoo.ycsb.generator.HotspotIntegerGenerator;
import com.yahoo.ycsb.generator.HistogramGenerator;
import com.yahoo.ycsb.generator.HotspotIntegerGenerator;
import com.yahoo.ycsb.generator.NumberGenerator;
import com.yahoo.ycsb.generator.ScrambledZipfianGenerator;
import com.yahoo.ycsb.generator.SequentialGenerator;
import com.yahoo.ycsb.generator.SkewedLatestGenerator;
import com.yahoo.ycsb.generator.UniformIntegerGenerator;
import com.yahoo.ycsb.generator.ZipfianGenerator;
import com.yahoo.ycsb.measurements.Measurements;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Vector;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.Vector;
/**
* The core benchmark scenario. Represents a set of clients doing simple CRUD operations. The
......@@ -61,10 +63,19 @@ import java.util.ArrayList;
* <LI><b>readmodifywriteproportion</b>: what proportion of operations should be read a record,
* modify it, write it back (default: 0)
* <LI><b>requestdistribution</b>: what distribution should be used to select the records to operate
* on - uniform, zipfian, hotspot, or latest (default: uniform)
* on - uniform, zipfian, hotspot, sequential, exponential or latest (default: uniform)
* <LI><b>maxscanlength</b>: for scans, what is the maximum number of records to scan (default: 1000)
* <LI><b>scanlengthdistribution</b>: for scans, what distribution should be used to choose the
* number of records to scan, for each scan, between 1 and maxscanlength (default: uniform)
* <LI><b>insertstart</b>: for parallel loads and runs, defines the starting record for this
* YCSB instance (default: 0)
* <LI><b>insertcount</b>: for parallel loads and runs, defines the number of records for this
* YCSB instance (default: recordcount)
* <LI><b>zeropadding</b>: for generating a record sequence compatible with string sort order by
* 0 padding the record number. Controls the number of 0s to use for padding. (default: 1)
* For example for row 5, with zeropadding=1 you get 'user5' key and with zeropading=8 you get
* 'user00000005' key. In order to see its impact, zeropadding needs to be bigger than number of
* digits in the record number.
* <LI><b>insertorder</b>: should records be inserted in order by key ("ordered"), or in hashed
* order ("hashed") (default: hashed)
* </ul>
......@@ -99,11 +110,11 @@ public class CoreWorkload extends Workload {
/**
* The name of the property for the field length distribution. Options are "uniform", "zipfian"
* (favoring short records), "constant", and "histogram".
* (favouring short records), "constant", and "histogram".
*
* If "uniform", "zipfian" or "constant", the maximum field length will be that specified by the
* fieldlength property. If "histogram", then the
* histogram will be read from the filename specified in the "fieldlengthhistogram" property.
* fieldlength property. If "histogram", then the histogram will be read from the filename
* specified in the "fieldlengthhistogram" property.
*/
public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY = "fieldlengthdistribution";
......@@ -240,12 +251,24 @@ public class CoreWorkload extends Workload {
public static final String REQUEST_DISTRIBUTION_PROPERTY = "requestdistribution";
/**
* The default distribution of requests across the keyspace
* The default distribution of requests across the keyspace.
*/
public static final String REQUEST_DISTRIBUTION_PROPERTY_DEFAULT = "uniform";
/**
* The name of the property for adding zero padding to record numbers in order to match
* string sort order. Controls the number of 0s to left pad with.
*/
public static final String ZERO_PADDING_PROPERTY = "zeropadding";
/**
* The name of the property for the max scan length (number of records)
* The default zero padding value. Matches integer sort order
*/
public static final String ZERO_PADDING_PROPERTY_DEFAULT = "1";
/**
* The name of the property for the max scan length (number of records).
*/
public static final String MAX_SCAN_LENGTH_PROPERTY = "maxscanlength";
......@@ -322,6 +345,7 @@ public class CoreWorkload extends Workload {
boolean orderedinserts;
int recordcount;
int zeropadding;
int insertionRetryLimit;
int insertionRetryInterval;
......@@ -374,8 +398,9 @@ public class CoreWorkload extends Workload {
recordcount =
Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT));
if (recordcount == 0)
if (recordcount == 0) {
recordcount = Integer.MAX_VALUE;
}
String requestdistrib =
p.getProperty(REQUEST_DISTRIBUTION_PROPERTY, REQUEST_DISTRIBUTION_PROPERTY_DEFAULT);
int maxscanlength =
......@@ -385,6 +410,16 @@ public class CoreWorkload extends Workload {
int insertstart =
Integer.parseInt(p.getProperty(INSERT_START_PROPERTY, INSERT_START_PROPERTY_DEFAULT));
int insertcount =
Integer.parseInt(p.getProperty(INSERT_COUNT_PROPERTY, String.valueOf(recordcount - insertstart)));
// Confirm valid values for insertstart and insertcount in relation to recordcount
if (recordcount < (insertstart + insertcount)) {
System.err.println("Invalid combination of insertstart, insertcount and recordcount.");
System.err.println("recordcount must be bigger than insertstart + insertcount.");
System.exit(-1);
}
zeropadding =
Integer.parseInt(p.getProperty(ZERO_PADDING_PROPERTY, ZERO_PADDING_PROPERTY_DEFAULT));
readallfields = Boolean.parseBoolean(
p.getProperty(READ_ALL_FIELDS_PROPERTY, READ_ALL_FIELDS_PROPERTY_DEFAULT));
......@@ -395,16 +430,14 @@ public class CoreWorkload extends Workload {
p.getProperty(DATA_INTEGRITY_PROPERTY, DATA_INTEGRITY_PROPERTY_DEFAULT));
// Confirm that fieldlengthgenerator returns a constant if data
// integrity check requested.
if (dataintegrity
&& !(p.getProperty(
FIELD_LENGTH_DISTRIBUTION_PROPERTY,
FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT)).equals("constant")) {
if (dataintegrity && !(p.getProperty(
FIELD_LENGTH_DISTRIBUTION_PROPERTY,
FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT)).equals("constant")) {
System.err.println("Must have constant field size to check data integrity.");
System.exit(-1);
}
if (p.getProperty(INSERT_ORDER_PROPERTY, INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed")
== 0) {
if (p.getProperty(INSERT_ORDER_PROPERTY, INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed") == 0) {
orderedinserts = false;
} else if (requestdistrib.compareTo("exponential") == 0) {
double percentile = Double.parseDouble(p.getProperty(
......@@ -423,8 +456,10 @@ public class CoreWorkload extends Workload {
transactioninsertkeysequence = new AcknowledgedCounterGenerator(recordcount);
if (requestdistrib.compareTo("uniform") == 0) {
keychooser = new UniformIntegerGenerator(0, recordcount - 1);
} else if (requestdistrib.compareTo("zipfian") == 0) {
keychooser = new UniformIntegerGenerator(insertstart, insertstart + insertcount - 1);
} else if (requestdistrib.compareTo("sequential") == 0) {
keychooser = new SequentialGenerator(insertstart, insertstart + insertcount - 1);
}else if (requestdistrib.compareTo("zipfian") == 0) {
// it does this by generating a random "next key" in part by taking the modulus over the
// number of keys.
// If the number of keys changes, this would shift the modulus, and we don't want that to
......@@ -439,7 +474,7 @@ public class CoreWorkload extends Workload {
int opcount = Integer.parseInt(p.getProperty(Client.OPERATION_COUNT_PROPERTY));
int expectednewkeys = (int) ((opcount) * insertproportion * 2.0); // 2 is fudge factor
keychooser = new ScrambledZipfianGenerator(recordcount + expectednewkeys);
keychooser = new ScrambledZipfianGenerator(insertstart, insertstart + insertcount + expectednewkeys);
} else if (requestdistrib.compareTo("latest") == 0) {
keychooser = new SkewedLatestGenerator(transactioninsertkeysequence);
} else if (requestdistrib.equals("hotspot")) {
......@@ -447,7 +482,8 @@ public class CoreWorkload extends Workload {
Double.parseDouble(p.getProperty(HOTSPOT_DATA_FRACTION, HOTSPOT_DATA_FRACTION_DEFAULT));
double hotopnfraction =
Double.parseDouble(p.getProperty(HOTSPOT_OPN_FRACTION, HOTSPOT_OPN_FRACTION_DEFAULT));
keychooser = new HotspotIntegerGenerator(0, recordcount - 1, hotsetfraction, hotopnfraction);
keychooser = new HotspotIntegerGenerator(insertstart, insertstart + insertcount - 1,
hotsetfraction, hotopnfraction);
} else {
throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\"");
}
......@@ -465,7 +501,6 @@ public class CoreWorkload extends Workload {
insertionRetryLimit = Integer.parseInt(p.getProperty(
INSERTION_RETRY_LIMIT, INSERTION_RETRY_LIMIT_DEFAULT));
insertionRetryInterval = Integer.parseInt(p.getProperty(
INSERTION_RETRY_INTERVAL, INSERTION_RETRY_INTERVAL_DEFAULT));
}
......@@ -474,7 +509,13 @@ public class CoreWorkload extends Workload {
if (!orderedinserts) {
keynum = Utils.hash(keynum);
}
return "user" + keynum;
String value = Long.toString(keynum);
int fill = zeropadding - value.length();
String prekey = "user";
for(int i=0; i<fill; i++) {
prekey += '0';
}
return prekey + value;
}
/**
......@@ -585,21 +626,20 @@ public class CoreWorkload extends Workload {
@Override
public boolean doTransaction(DB db, Object threadstate) {
switch (operationchooser.nextString()) {
case "READ":
doTransactionRead(db);
break;
case "UPDATE":
doTransactionUpdate(db);
break;
case "INSERT":
doTransactionInsert(db);
break;
case "SCAN":
doTransactionScan(db);
break;
default:
doTransactionReadModifyWrite(db);
case "READ":
doTransactionRead(db);
break;
case "UPDATE":
doTransactionUpdate(db);
break;
case "INSERT":
doTransactionInsert(db);
break;
case "SCAN":
doTransactionScan(db);
break;
default:
doTransactionReadModifyWrite(db);
}
return true;
......@@ -820,4 +860,4 @@ public class CoreWorkload extends Workload {
}
return operationchooser;
}
}
\ No newline at end of file
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment