From bf13e8153bc28661e914d68a307abe0947f09b2b Mon Sep 17 00:00:00 2001
From: "Robert J. Moore" <Robert.J.Moore@allanbank.com>
Date: Wed, 15 Jul 2015 22:52:20 -0400
Subject: [PATCH] [client] Update the client status thread to detect the client
 (worker) thread completing via a CountDownLatch.

Fixes #316
---
 core/src/main/java/com/yahoo/ycsb/Client.java | 203 +++++++++++-------
 .../java/com/yahoo/ycsb/TerminatorThread.java |   5 +-
 2 files changed, 134 insertions(+), 74 deletions(-)

diff --git a/core/src/main/java/com/yahoo/ycsb/Client.java b/core/src/main/java/com/yahoo/ycsb/Client.java
index bcf33c2e..f2e71e3b 100644
--- a/core/src/main/java/com/yahoo/ycsb/Client.java
+++ b/core/src/main/java/com/yahoo/ycsb/Client.java
@@ -1,4 +1,4 @@
-/**                                                                                                                                                                                
+/**                                                                                                                                                                            
  * Copyright (c) 2010 Yahoo! Inc. All rights reserved.                                                                                                                             
  *                                                                                                                                                                                 
  * Licensed under the Apache License, Version 2.0 (the "License"); you                                                                                                             
@@ -24,10 +24,12 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.text.DecimalFormat;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.Enumeration;
+import java.util.List;
 import java.util.Properties;
-import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.LockSupport;
 
@@ -45,18 +47,32 @@ import com.yahoo.ycsb.measurements.exporter.TextMeasurementsExporter;
  */
 class StatusThread extends Thread
 {
-	Vector<Thread> _threads;
-	String _label;
-	boolean _standardstatus;
+    /** Counts down each of the clients completing. */
+    private final CountDownLatch _completeLatch;
+    
+    /** The clients that are running. */
+    private final List<ClientThread> _clients;
+    
+    private final String _label;
+    private final boolean _standardstatus;
 	
+	/** The interval for reporting status. */
+    private long _sleeptimeNs;
+
 	/**
-	 * The interval for reporting status.
+	 * Creates a new StatusThread.
+	 * 
+	 * @param completeLatch The latch that each client thread will {@link CountDownLatch#countDown()} as they complete.
+	 * @param clients The clients to collect metrics from.
+	 * @param label The label for the status.
+	 * @param standardstatus If true the status is printed to stdout in addition to stderr.
+	 * @param statusIntervalSeconds The number of seconds between status updates.
 	 */
-	long _sleeptimeNs;
-
-	public StatusThread(Vector<Thread> threads, String label, boolean standardstatus, int statusIntervalSeconds)
+	public StatusThread(CountDownLatch completeLatch, List<ClientThread> clients, 
+	        String label, boolean standardstatus, int statusIntervalSeconds)
 	{
-		_threads=threads;
+	    _completeLatch=completeLatch;
+	    _clients=clients;
 		_label=label;
 		_standardstatus=standardstatus;
 		_sleeptimeNs=TimeUnit.SECONDS.toNanos(statusIntervalSeconds);
@@ -67,66 +83,105 @@ class StatusThread extends Thread
 	 */
 	public void run()
 	{
-		final long st=System.currentTimeMillis();
+		final long startTimeMs=System.currentTimeMillis();
 		final long startTimeNanos = System.nanoTime();
 		long deadline = startTimeNanos + _sleeptimeNs;
-		long lasten=st;
-		long lasttotalops=0;
+		long startIntervalMs=startTimeMs;
+		long lastTotalOps=0;
 		
 		boolean alldone;
-		SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
 		
 		do 
 		{
-			alldone=true;
+            long nowMs=System.currentTimeMillis();
 
-			int totalops=0;
+            lastTotalOps = computeStats(startTimeMs, startIntervalMs, nowMs, lastTotalOps);
 
-			//terminate this thread when all the worker threads are done
-			for (Thread t : _threads)
-			{
-				if (t.getState()!=Thread.State.TERMINATED)
-				{
-					alldone=false;
-				}
+			alldone = waitForClientsUntil(deadline);
+			
+            startIntervalMs=nowMs;
+			deadline+=_sleeptimeNs;
+		}
+		while (!alldone);
+		
+		// Print the final stats.
+		computeStats(startTimeMs, startIntervalMs, System.currentTimeMillis(), lastTotalOps);
+	}
 
-				ClientThread ct=(ClientThread)t;
-				totalops+=ct.getOpsDone();
-			}
+    /**
+     * Computes and prints the stats.
+     * 
+     * @param startTimeMs The start time of the test.
+     * @param startIntervalMs The start time of this interval.
+     * @param endIntervalMs The end time (now) for the interval.
+     * @param lastTotalOps The last total operations count.
+     * 
+     * @return The current operation count.
+     */
+    private long computeStats(final long startTimeMs, long startIntervalMs, long endIntervalMs,
+            long lastTotalOps) {
+        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
 
-			long en=System.currentTimeMillis();
+        long totalops=0;
 
-			long interval=en-st;
-			//double throughput=1000.0*((double)totalops)/((double)interval);
+        // Calculate the total number of operations completed.
+        for (ClientThread t : _clients)
+        {
+        	totalops+=t.getOpsDone();
+        }
 
-			double curthroughput=1000.0*(((double)(totalops-lasttotalops))/((double)(en-lasten)));
-			
-			lasttotalops=totalops;
-			lasten=en;
-			
-			DecimalFormat d = new DecimalFormat("#.##");
-			String label = _label + format.format(new Date());
 
-			StringBuilder msg = new StringBuilder(label).append(" ").append(interval/1000).append(" sec: ");
-			msg.append(totalops).append(" operations; ");
+        long interval=endIntervalMs-startTimeMs;
+        double curthroughput=1000.0*(((double)(totalops-lastTotalOps))/((double)(endIntervalMs-startIntervalMs)));
+        
+        
+        DecimalFormat d = new DecimalFormat("#.##");
+        String label = _label + format.format(new Date());
 
-			if (totalops != 0) {
-				msg.append(d.format(curthroughput)).append(" current ops/sec; ");
-			}
+        StringBuilder msg = new StringBuilder(label).append(" ").append(interval/1000).append(" sec: ");
+        msg.append(totalops).append(" operations; ");
 
-			msg.append(Measurements.getMeasurements().getSummary());
+        if (totalops != 0) {
+        	msg.append(d.format(curthroughput)).append(" current ops/sec; ");
+        }
 
-			System.err.println(msg);
+        msg.append(Measurements.getMeasurements().getSummary());
 
-			if (_standardstatus) {
-				System.out.println(msg);
-			}
+        System.err.println(msg);
 
-			ClientThread.sleepUntil(deadline);
-			deadline+=_sleeptimeNs;
-		}
-		while (!alldone);
-	}
+        if (_standardstatus) {
+        	System.out.println(msg);
+        }
+        return totalops;
+    }
+
+    /**
+     * Waits for all of the client to finish or the deadline to expire.
+     * 
+     * @param deadline The current deadline.
+     * 
+     * @return True if all of the clients completed.
+     */
+    private boolean waitForClientsUntil(long deadline) {
+        boolean alldone=false;
+        long now=System.nanoTime();
+        
+        while( !alldone && now < deadline ) {
+            try {
+                alldone = _completeLatch.await(deadline-now, TimeUnit.NANOSECONDS);
+            } 
+            catch( InterruptedException ie) {
+                // If we are interrupted the thread is being asked to shutdown.
+                // Return true to indicate that and reset the interrupt state
+                // of the thread.
+                Thread.currentThread().interrupt();
+                alldone=true;
+            }
+            now=System.nanoTime();
+        }
+        
+        return alldone;
+    }
 }
 
 /**
@@ -137,6 +192,9 @@ class StatusThread extends Thread
  */
 class ClientThread extends Thread
 {
+    /** Counts down each of the clients completing. */
+    private final CountDownLatch _completeLatch;
+    
 	private static boolean _spinSleep;
     DB _db;
 	boolean _dotransactions;
@@ -149,8 +207,8 @@ class ClientThread extends Thread
 	int _threadcount;
 	Object _workloadstate;
 	Properties _props;
-    long _targetOpsTickNs;
-    final Measurements _measurements;
+	long _targetOpsTickNs;
+	final Measurements _measurements;
 
 	/**
 	 * Constructor.
@@ -158,15 +216,13 @@ class ClientThread extends Thread
 	 * @param db the DB implementation to use
 	 * @param dotransactions true to do transactions, false to insert data
 	 * @param workload the workload to use
-	 * @param threadid the id of this thread 
-	 * @param threadcount the total number of threads 
 	 * @param props the properties defining the experiment
 	 * @param opcount the number of operations (transactions or inserts) to do
 	 * @param targetperthreadperms target number of operations per thread per ms
+	 * @param completeLatch The latch tracking the completion of all clients.
 	 */
-	public ClientThread(DB db, boolean dotransactions, Workload workload, int threadid, int threadcount, Properties props, int opcount, double targetperthreadperms)
+	public ClientThread(DB db, boolean dotransactions, Workload workload, Properties props, int opcount, double targetperthreadperms, CountDownLatch completeLatch)
 	{
-		//TODO: consider removing threadcount and threadid
 		_db=db;
 		_dotransactions=dotransactions;
 		_workload=workload;
@@ -176,11 +232,10 @@ class ClientThread extends Thread
 		_targetOpsPerMs=targetperthreadperms;
 		_targetOpsTickNs=(long)(1000000/_targetOpsPerMs);
 		}
-		_threadid=threadid;
-		_threadcount=threadcount;
 		_props=props;
 		_measurements = Measurements.getMeasurements();
 		_spinSleep = Boolean.valueOf(_props.getProperty("spin.sleep", "false"));
+		_completeLatch=completeLatch;
 	}
 
 	public int getOpsDone()
@@ -217,10 +272,10 @@ class ClientThread extends Thread
 		
 		//spread the thread operations out so they don't all hit the DB at the same time
 		// GH issue 4 - throws exception if _target>1 because random.nextInt argument must be >0
-        // and the sleep() doesn't make sense for granularities < 1 ms anyway
-        if ((_targetOpsPerMs > 0) && (_targetOpsPerMs <= 1.0))
-        {
-            long randomMinorDelay = Utils.random().nextInt((int) _targetOpsTickNs);
+		// and the sleep() doesn't make sense for granularities < 1 ms anyway
+		if ((_targetOpsPerMs > 0) && (_targetOpsPerMs <= 1.0))
+		{
+			long randomMinorDelay = Utils.random().nextInt((int) _targetOpsTickNs);
             sleepUntil(System.nanoTime() + randomMinorDelay);
         }
 		try
@@ -278,6 +333,10 @@ class ClientThread extends Thread
 			e.printStackTrace(System.out);
 			return;
 		}
+		finally
+		{
+		    _completeLatch.countDown();
+		}
 	}
 
     static void sleepUntil(long deadline) {
@@ -727,8 +786,9 @@ public class Client
 				opcount=Integer.parseInt(props.getProperty(RECORD_COUNT_PROPERTY, DEFAULT_RECORD_COUNT));
 			}
 		}
-		Vector<Thread> threads=new Vector<Thread>();
-
+		
+		CountDownLatch completeLatch=new CountDownLatch(threadcount);
+		final List<ClientThread> clients=new ArrayList<ClientThread>(threadcount);
 		for (int threadid=0; threadid<threadcount; threadid++)
 		{
 			DB db=null;
@@ -751,10 +811,9 @@ public class Client
               ++threadopcount;
             }
 
-            Thread t=new ClientThread(db,dotransactions,workload,threadid,threadcount,props,threadopcount, targetperthreadperms);
+            ClientThread t=new ClientThread(db,dotransactions,workload,props,threadopcount, targetperthreadperms, completeLatch);
 
-			threads.add(t);
-			//t.start();
+			clients.add(t);
 		}
 
 		StatusThread statusthread=null;
@@ -767,13 +826,13 @@ public class Client
 				standardstatus=true;
 			}
 			int statusIntervalSeconds = Integer.parseInt(props.getProperty("status.interval","10"));
-			statusthread=new StatusThread(threads,label,standardstatus,statusIntervalSeconds);
+			statusthread=new StatusThread(completeLatch,clients,label,standardstatus,statusIntervalSeconds);
 			statusthread.start();
 		}
 
 		long st=System.currentTimeMillis();
 
-		for (Thread t : threads)
+		for (Thread t : clients)
 		{
 			t.start();
 		}
@@ -781,13 +840,13 @@ public class Client
     Thread terminator = null;
     
     if (maxExecutionTime > 0) {
-      terminator = new TerminatorThread(maxExecutionTime, threads, workload);
+      terminator = new TerminatorThread(maxExecutionTime, clients, workload);
       terminator.start();
     }
     
     int opsDone = 0;
 
-		for (Thread t : threads)
+		for (Thread t : clients)
 		{
 			try
 			{
diff --git a/core/src/main/java/com/yahoo/ycsb/TerminatorThread.java b/core/src/main/java/com/yahoo/ycsb/TerminatorThread.java
index 7d985f82..62212f2c 100644
--- a/core/src/main/java/com/yahoo/ycsb/TerminatorThread.java
+++ b/core/src/main/java/com/yahoo/ycsb/TerminatorThread.java
@@ -16,6 +16,7 @@
  */
 package com.yahoo.ycsb;
 
+import java.util.List;
 import java.util.Vector;
 
 /**
@@ -29,12 +30,12 @@ import java.util.Vector;
  */
 public class TerminatorThread extends Thread {
   
-  private Vector<Thread> threads;
+  private final List<? extends Thread> threads;
   private long maxExecutionTime;
   private Workload workload;
   private long waitTimeOutInMS;
   
-  public TerminatorThread(long maxExecutionTime, Vector<Thread> threads, 
+  public TerminatorThread(long maxExecutionTime, List<? extends Thread> threads, 
       Workload workload) {
     this.maxExecutionTime = maxExecutionTime;
     this.threads = threads;
-- 
GitLab