Skip to content
Snippets Groups Projects
Commit bf13e815 authored by Robert J. Moore's avatar Robert J. Moore
Browse files

[client] Update the client status thread to detect the

client (worker) thread completing via a CountDownLatch.

Fixes #316
parent 2c83327f
No related branches found
No related tags found
No related merge requests found
/** /**
* Copyright (c) 2010 Yahoo! Inc. All rights reserved. * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); you * Licensed under the Apache License, Version 2.0 (the "License"); you
...@@ -24,10 +24,12 @@ import java.io.IOException; ...@@ -24,10 +24,12 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.text.DecimalFormat; import java.text.DecimalFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.Vector; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
...@@ -45,18 +47,32 @@ import com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter; ...@@ -45,18 +47,32 @@ import com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter;
*/ */
class StatusThread extends Thread class StatusThread extends Thread
{ {
Vector<Thread> _threads; /** Counts down each of the clients completing. */
String _label; private final CountDownLatch _completeLatch;
boolean _standardstatus;
/** The clients that are running. */
private final List<ClientThread> _clients;
private final String _label;
private final boolean _standardstatus;
/** The interval for reporting status. */
private long _sleeptimeNs;
/** /**
* The interval for reporting status. * Creates a new StatusThread.
*
* @param completeLatch The latch that each client thread will {@link CountDownLatch#countDown()} as they complete.
* @param clients The clients to collect metrics from.
* @param label The label for the status.
* @param standardstatus If true the status is printed to stdout in addition to stderr.
* @param statusIntervalSeconds The number of seconds between status updates.
*/ */
long _sleeptimeNs; public StatusThread(CountDownLatch completeLatch, List<ClientThread> clients,
String label, boolean standardstatus, int statusIntervalSeconds)
public StatusThread(Vector<Thread> threads, String label, boolean standardstatus, int statusIntervalSeconds)
{ {
_threads=threads; _completeLatch=completeLatch;
_clients=clients;
_label=label; _label=label;
_standardstatus=standardstatus; _standardstatus=standardstatus;
_sleeptimeNs=TimeUnit.SECONDS.toNanos(statusIntervalSeconds); _sleeptimeNs=TimeUnit.SECONDS.toNanos(statusIntervalSeconds);
...@@ -67,66 +83,105 @@ class StatusThread extends Thread ...@@ -67,66 +83,105 @@ class StatusThread extends Thread
*/ */
public void run() public void run()
{ {
final long st=System.currentTimeMillis(); final long startTimeMs=System.currentTimeMillis();
final long startTimeNanos = System.nanoTime(); final long startTimeNanos = System.nanoTime();
long deadline = startTimeNanos + _sleeptimeNs; long deadline = startTimeNanos + _sleeptimeNs;
long lasten=st; long startIntervalMs=startTimeMs;
long lasttotalops=0; long lastTotalOps=0;
boolean alldone; boolean alldone;
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
do do
{ {
alldone=true; long nowMs=System.currentTimeMillis();
int totalops=0; lastTotalOps = computeStats(startTimeMs, startIntervalMs, nowMs, lastTotalOps);
//terminate this thread when all the worker threads are done alldone = waitForClientsUntil(deadline);
for (Thread t : _threads)
{ startIntervalMs=nowMs;
if (t.getState()!=Thread.State.TERMINATED) deadline+=_sleeptimeNs;
{ }
alldone=false; while (!alldone);
}
// Print the final stats.
computeStats(startTimeMs, startIntervalMs, System.currentTimeMillis(), lastTotalOps);
}
ClientThread ct=(ClientThread)t; /**
totalops+=ct.getOpsDone(); * Computes and prints the stats.
} *
* @param startTimeMs The start time of the test.
* @param startIntervalMs The start time of this interval.
* @param endIntervalMs The end time (now) for the interval.
* @param lastTotalOps The last total operations count.
*
* @return The current operation count.
*/
private long computeStats(final long startTimeMs, long startIntervalMs, long endIntervalMs,
long lastTotalOps) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
long en=System.currentTimeMillis(); long totalops=0;
long interval=en-st; // Calculate the total number of operations completed.
//double throughput=1000.0*((double)totalops)/((double)interval); for (ClientThread t : _clients)
{
totalops+=t.getOpsDone();
}
double curthroughput=1000.0*(((double)(totalops-lasttotalops))/((double)(en-lasten)));
lasttotalops=totalops;
lasten=en;
DecimalFormat d = new DecimalFormat("#.##");
String label = _label + format.format(new Date());
StringBuilder msg = new StringBuilder(label).append(" ").append(interval/1000).append(" sec: "); long interval=endIntervalMs-startTimeMs;
msg.append(totalops).append(" operations; "); double curthroughput=1000.0*(((double)(totalops-lastTotalOps))/((double)(endIntervalMs-startIntervalMs)));
DecimalFormat d = new DecimalFormat("#.##");
String label = _label + format.format(new Date());
if (totalops != 0) { StringBuilder msg = new StringBuilder(label).append(" ").append(interval/1000).append(" sec: ");
msg.append(d.format(curthroughput)).append(" current ops/sec; "); msg.append(totalops).append(" operations; ");
}
msg.append(Measurements.getMeasurements().getSummary()); if (totalops != 0) {
msg.append(d.format(curthroughput)).append(" current ops/sec; ");
}
System.err.println(msg); msg.append(Measurements.getMeasurements().getSummary());
if (_standardstatus) { System.err.println(msg);
System.out.println(msg);
}
ClientThread.sleepUntil(deadline); if (_standardstatus) {
deadline+=_sleeptimeNs; System.out.println(msg);
} }
while (!alldone); return totalops;
} }
/**
* Waits for all of the client to finish or the deadline to expire.
*
* @param deadline The current deadline.
*
* @return True if all of the clients completed.
*/
private boolean waitForClientsUntil(long deadline) {
boolean alldone=false;
long now=System.nanoTime();
while( !alldone && now < deadline ) {
try {
alldone = _completeLatch.await(deadline-now, TimeUnit.NANOSECONDS);
}
catch( InterruptedException ie) {
// If we are interrupted the thread is being asked to shutdown.
// Return true to indicate that and reset the interrupt state
// of the thread.
Thread.currentThread().interrupt();
alldone=true;
}
now=System.nanoTime();
}
return alldone;
}
} }
/** /**
...@@ -137,6 +192,9 @@ class StatusThread extends Thread ...@@ -137,6 +192,9 @@ class StatusThread extends Thread
*/ */
class ClientThread extends Thread class ClientThread extends Thread
{ {
/** Counts down each of the clients completing. */
private final CountDownLatch _completeLatch;
private static boolean _spinSleep; private static boolean _spinSleep;
DB _db; DB _db;
boolean _dotransactions; boolean _dotransactions;
...@@ -149,8 +207,8 @@ class ClientThread extends Thread ...@@ -149,8 +207,8 @@ class ClientThread extends Thread
int _threadcount; int _threadcount;
Object _workloadstate; Object _workloadstate;
Properties _props; Properties _props;
long _targetOpsTickNs; long _targetOpsTickNs;
final Measurements _measurements; final Measurements _measurements;
/** /**
* Constructor. * Constructor.
...@@ -158,15 +216,13 @@ class ClientThread extends Thread ...@@ -158,15 +216,13 @@ class ClientThread extends Thread
* @param db the DB implementation to use * @param db the DB implementation to use
* @param dotransactions true to do transactions, false to insert data * @param dotransactions true to do transactions, false to insert data
* @param workload the workload to use * @param workload the workload to use
* @param threadid the id of this thread
* @param threadcount the total number of threads
* @param props the properties defining the experiment * @param props the properties defining the experiment
* @param opcount the number of operations (transactions or inserts) to do * @param opcount the number of operations (transactions or inserts) to do
* @param targetperthreadperms target number of operations per thread per ms * @param targetperthreadperms target number of operations per thread per ms
* @param completeLatch The latch tracking the completion of all clients.
*/ */
public ClientThread(DB db, boolean dotransactions, Workload workload, int threadid, int threadcount, Properties props, int opcount, double targetperthreadperms) public ClientThread(DB db, boolean dotransactions, Workload workload, Properties props, int opcount, double targetperthreadperms, CountDownLatch completeLatch)
{ {
//TODO: consider removing threadcount and threadid
_db=db; _db=db;
_dotransactions=dotransactions; _dotransactions=dotransactions;
_workload=workload; _workload=workload;
...@@ -176,11 +232,10 @@ class ClientThread extends Thread ...@@ -176,11 +232,10 @@ class ClientThread extends Thread
_targetOpsPerMs=targetperthreadperms; _targetOpsPerMs=targetperthreadperms;
_targetOpsTickNs=(long)(1000000/_targetOpsPerMs); _targetOpsTickNs=(long)(1000000/_targetOpsPerMs);
} }
_threadid=threadid;
_threadcount=threadcount;
_props=props; _props=props;
_measurements = Measurements.getMeasurements(); _measurements = Measurements.getMeasurements();
_spinSleep = Boolean.valueOf(_props.getProperty("spin.sleep", "false")); _spinSleep = Boolean.valueOf(_props.getProperty("spin.sleep", "false"));
_completeLatch=completeLatch;
} }
public int getOpsDone() public int getOpsDone()
...@@ -217,10 +272,10 @@ class ClientThread extends Thread ...@@ -217,10 +272,10 @@ class ClientThread extends Thread
//spread the thread operations out so they don't all hit the DB at the same time //spread the thread operations out so they don't all hit the DB at the same time
// GH issue 4 - throws exception if _target>1 because random.nextInt argument must be >0 // GH issue 4 - throws exception if _target>1 because random.nextInt argument must be >0
// and the sleep() doesn't make sense for granularities < 1 ms anyway // and the sleep() doesn't make sense for granularities < 1 ms anyway
if ((_targetOpsPerMs > 0) && (_targetOpsPerMs <= 1.0)) if ((_targetOpsPerMs > 0) && (_targetOpsPerMs <= 1.0))
{ {
long randomMinorDelay = Utils.random().nextInt((int) _targetOpsTickNs); long randomMinorDelay = Utils.random().nextInt((int) _targetOpsTickNs);
sleepUntil(System.nanoTime() + randomMinorDelay); sleepUntil(System.nanoTime() + randomMinorDelay);
} }
try try
...@@ -278,6 +333,10 @@ class ClientThread extends Thread ...@@ -278,6 +333,10 @@ class ClientThread extends Thread
e.printStackTrace(System.out); e.printStackTrace(System.out);
return; return;
} }
finally
{
_completeLatch.countDown();
}
} }
static void sleepUntil(long deadline) { static void sleepUntil(long deadline) {
...@@ -727,8 +786,9 @@ public class Client ...@@ -727,8 +786,9 @@ public class Client
opcount=Integer.parseInt(props.getProperty(RECORD_COUNT_PROPERTY, DEFAULT_RECORD_COUNT)); opcount=Integer.parseInt(props.getProperty(RECORD_COUNT_PROPERTY, DEFAULT_RECORD_COUNT));
} }
} }
Vector<Thread> threads=new Vector<Thread>();
CountDownLatch completeLatch=new CountDownLatch(threadcount);
final List<ClientThread> clients=new ArrayList<ClientThread>(threadcount);
for (int threadid=0; threadid<threadcount; threadid++) for (int threadid=0; threadid<threadcount; threadid++)
{ {
DB db=null; DB db=null;
...@@ -751,10 +811,9 @@ public class Client ...@@ -751,10 +811,9 @@ public class Client
++threadopcount; ++threadopcount;
} }
Thread t=new ClientThread(db,dotransactions,workload,threadid,threadcount,props,threadopcount, targetperthreadperms); ClientThread t=new ClientThread(db,dotransactions,workload,props,threadopcount, targetperthreadperms, completeLatch);
threads.add(t); clients.add(t);
//t.start();
} }
StatusThread statusthread=null; StatusThread statusthread=null;
...@@ -767,13 +826,13 @@ public class Client ...@@ -767,13 +826,13 @@ public class Client
standardstatus=true; standardstatus=true;
} }
int statusIntervalSeconds = Integer.parseInt(props.getProperty("status.interval","10")); int statusIntervalSeconds = Integer.parseInt(props.getProperty("status.interval","10"));
statusthread=new StatusThread(threads,label,standardstatus,statusIntervalSeconds); statusthread=new StatusThread(completeLatch,clients,label,standardstatus,statusIntervalSeconds);
statusthread.start(); statusthread.start();
} }
long st=System.currentTimeMillis(); long st=System.currentTimeMillis();
for (Thread t : threads) for (Thread t : clients)
{ {
t.start(); t.start();
} }
...@@ -781,13 +840,13 @@ public class Client ...@@ -781,13 +840,13 @@ public class Client
Thread terminator = null; Thread terminator = null;
if (maxExecutionTime > 0) { if (maxExecutionTime > 0) {
terminator = new TerminatorThread(maxExecutionTime, threads, workload); terminator = new TerminatorThread(maxExecutionTime, clients, workload);
terminator.start(); terminator.start();
} }
int opsDone = 0; int opsDone = 0;
for (Thread t : threads) for (Thread t : clients)
{ {
try try
{ {
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
*/ */
package com.yahoo.ycsb; package com.yahoo.ycsb;
import java.util.List;
import java.util.Vector; import java.util.Vector;
/** /**
...@@ -29,12 +30,12 @@ import java.util.Vector; ...@@ -29,12 +30,12 @@ import java.util.Vector;
*/ */
public class TerminatorThread extends Thread { public class TerminatorThread extends Thread {
private Vector<Thread> threads; private final List<? extends Thread> threads;
private long maxExecutionTime; private long maxExecutionTime;
private Workload workload; private Workload workload;
private long waitTimeOutInMS; private long waitTimeOutInMS;
public TerminatorThread(long maxExecutionTime, Vector<Thread> threads, public TerminatorThread(long maxExecutionTime, List<? extends Thread> threads,
Workload workload) { Workload workload) {
this.maxExecutionTime = maxExecutionTime; this.maxExecutionTime = maxExecutionTime;
this.threads = threads; this.threads = threads;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment