From 050fd4414f8f6061879cef480d5739c98100ce00 Mon Sep 17 00:00:00 2001
From: Enis Soztutar <enis@apache.org>
Date: Fri, 20 May 2016 16:13:22 -0700
Subject: [PATCH] [jdbc] Add batching in insertion in JDBC client bindings

---
 jdbc/README.md                                |   1 +
 .../java/com/yahoo/ycsb/db/JdbcDBClient.java  |  93 +++++++++++-----
 .../com/yahoo/ycsb/db/JdbcDBClientTest.java   | 102 ++++++++++++++----
 3 files changed, 152 insertions(+), 44 deletions(-)

diff --git a/jdbc/README.md b/jdbc/README.md
index 841375a8..f74ff54f 100644
--- a/jdbc/README.md
+++ b/jdbc/README.md
@@ -98,6 +98,7 @@ db.driver=com.mysql.jdbc.Driver				# The JDBC driver class to use.
 db.url=jdbc:mysql://127.0.0.1:3306/ycsb		# The Database connection URL.
 db.user=admin								# User name for the connection.
 db.passwd=admin								# Password for the connection.
+db.batchsize=1000             # The batch size for doing batched inserts. Defaults to 0. Set to >0 to use batching.
 jdbc.fetchsize=10							# The JDBC fetch size hinted to the driver.
 jdbc.autocommit=true						# The JDBC connection auto-commit property for the driver.
 ```
diff --git a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java
index f6d5c3e2..80dd338e 100644
--- a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java
+++ b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java
@@ -1,18 +1,18 @@
 /**
  * Copyright (c) 2010 - 2016 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License"); you 
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
  * may not use this file except in compliance with the License. You
- * may obtain a copy of the License at 
- * 
+ * may obtain a copy of the License at
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software 
- * distributed under the License is distributed on an "AS IS" BASIS, 
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  * implied. See the License for the specific language governing
- * permissions and limitations under the License. See accompanying 
- * LICENSE file. 
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
  */
 package com.yahoo.ycsb.db;
 
@@ -31,11 +31,11 @@ import java.util.concurrent.ConcurrentMap;
  * A class that wraps a JDBC compliant database to allow it to be interfaced
  * with YCSB. This class extends {@link DB} and implements the database
  * interface used by YCSB client.
- * 
+ *
  * <br>
  * Each client will have its own instance of this class. This client is not
  * thread safe.
- * 
+ *
  * <br>
  * This interface expects a schema <key> <field1> <field2> <field3> ... All
  * attributes are of type VARCHAR. All accesses are through the primary key.
@@ -55,6 +55,9 @@ public class JdbcDBClient extends DB {
   /** The password to use for establishing the connection. */
   public static final String CONNECTION_PASSWD = "db.passwd";
 
+  /** The batch size for batched inserts. Set to >0 to use batching */
+  public static final String DB_BATCH_SIZE = "db.batchsize";
+
   /** The JDBC fetch size hinted to the driver. */
   public static final String JDBC_FETCH_SIZE = "jdbc.fetchsize";
 
@@ -79,9 +82,11 @@ public class JdbcDBClient extends DB {
   private ArrayList<Connection> conns;
   private boolean initialized = false;
   private Properties props;
-  private Integer jdbcFetchSize;
+  private int jdbcFetchSize;
+  private int batchSize;
   private static final String DEFAULT_PROP = "";
   private ConcurrentMap<StatementType, PreparedStatement> cachedStatements;
+  private long numRowsInBatch = 0;
 
   /**
    * Ordered field information for insert and update statements.
@@ -213,6 +218,20 @@ public class JdbcDBClient extends DB {
     }
   }
 
+  /** Returns parsed int value from the properties if set, otherwise returns -1. */
+  private static int getIntProperty(Properties props, String key) throws DBException {
+    String valueStr = props.getProperty(key);
+    if (valueStr != null) {
+      try {
+        return Integer.parseInt(valueStr);
+      } catch (NumberFormatException nfe) {
+        System.err.println("Invalid " + key + " specified: " + valueStr);
+        throw new DBException(nfe);
+      }
+    }
+    return -1;
+  }
+
   @Override
   public void init() throws DBException {
     if (initialized) {
@@ -225,15 +244,8 @@ public class JdbcDBClient extends DB {
     String passwd = props.getProperty(CONNECTION_PASSWD, DEFAULT_PROP);
     String driver = props.getProperty(DRIVER_CLASS);
 
-    String jdbcFetchSizeStr = props.getProperty(JDBC_FETCH_SIZE);
-    if (jdbcFetchSizeStr != null) {
-      try {
-        this.jdbcFetchSize = Integer.parseInt(jdbcFetchSizeStr);
-      } catch (NumberFormatException nfe) {
-        System.err.println("Invalid JDBC fetch size specified: " + jdbcFetchSizeStr);
-        throw new DBException(nfe);
-      }
-    }
+    this.jdbcFetchSize = getIntProperty(props, JDBC_FETCH_SIZE);
+    this.batchSize = getIntProperty(props, DB_BATCH_SIZE);
 
     String autoCommitStr = props.getProperty(JDBC_AUTO_COMMIT, Boolean.TRUE.toString());
     Boolean autoCommit = Boolean.parseBoolean(autoCommitStr);
@@ -258,7 +270,7 @@ public class JdbcDBClient extends DB {
         conns.add(conn);
       }
 
-      System.out.println("Using " + shardCount + " shards");
+      System.out.println("Using shards: " + shardCount + ", batchSize:" + batchSize + ", fetchSize: " + jdbcFetchSize);
 
       cachedStatements = new ConcurrentHashMap<StatementType, PreparedStatement>();
     } catch (ClassNotFoundException e) {
@@ -276,6 +288,20 @@ public class JdbcDBClient extends DB {
 
   @Override
   public void cleanup() throws DBException {
+    if (batchSize > 0) {
+      try {
+        // commit un-finished batches
+        for (PreparedStatement st : cachedStatements.values()) {
+          if (!st.getConnection().isClosed() && !st.isClosed() && (numRowsInBatch % batchSize != 0)) {
+            st.executeBatch();
+          }
+        }
+      } catch (SQLException e) {
+        System.err.println("Error in cleanup execution. " + e);
+        throw new DBException(e);
+      }
+    }
+
     try {
       cleanupAllConnections();
     } catch (SQLException e) {
@@ -363,7 +389,7 @@ public class JdbcDBClient extends DB {
     select.append(PRIMARY_KEY);
     select.append(" LIMIT ?");
     PreparedStatement scanStatement = getShardConnectionByKey(key).prepareStatement(select.toString());
-    if (this.jdbcFetchSize != null) {
+    if (this.jdbcFetchSize > 0) {
       scanStatement.setFetchSize(this.jdbcFetchSize);
     }
     PreparedStatement stmt = cachedStatements.putIfAbsent(scanType, scanStatement);
@@ -474,7 +500,22 @@ public class JdbcDBClient extends DB {
       for (String value: fieldInfo.getFieldValues()) {
         insertStatement.setString(index++, value);
       }
-      int result = insertStatement.executeUpdate();
+      int result;
+      if (batchSize > 0) {
+        insertStatement.addBatch();
+        if (++numRowsInBatch % batchSize == 0) {
+          int[] results = insertStatement.executeBatch();
+          for (int r : results) {
+            if (r != 1) {
+              return Status.ERROR;
+            }
+          }
+          return Status.OK;
+        }
+        return Status.BATCHED_OK;
+      } else {
+        result = insertStatement.executeUpdate();
+      }
       if (result == 1) {
         return Status.OK;
       }
@@ -520,4 +561,4 @@ public class JdbcDBClient extends DB {
 
     return new OrderedFieldInfo(fieldKeys, fieldValues);
   }
-}
\ No newline at end of file
+}
diff --git a/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java b/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java
index c3cc3024..6317f483 100644
--- a/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java
+++ b/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java
@@ -46,24 +46,29 @@ public class JdbcDBClientTest {
 
     @BeforeClass
     public static void setup() {
-        try {
-            jdbcConnection = DriverManager.getConnection(TEST_DB_URL);
-            jdbcDBClient = new JdbcDBClient();
-
-            Properties p = new Properties();
-            p.setProperty(JdbcDBClient.CONNECTION_URL, TEST_DB_URL);
-            p.setProperty(JdbcDBClient.DRIVER_CLASS, TEST_DB_DRIVER);
-            p.setProperty(JdbcDBClient.CONNECTION_USER, TEST_DB_USER);
+      setupWithBatch(1);
+    }
 
-            jdbcDBClient.setProperties(p);
-            jdbcDBClient.init();
-        } catch (SQLException e) {
-            e.printStackTrace();
-            fail("Could not create local Database");
-        } catch (DBException e) {
-            e.printStackTrace();
-            fail("Could not create JdbcDBClient instance");
-        }
+    public static void setupWithBatch(int batchSize) {
+      try {
+        jdbcConnection = DriverManager.getConnection(TEST_DB_URL);
+        jdbcDBClient = new JdbcDBClient();
+
+        Properties p = new Properties();
+        p.setProperty(JdbcDBClient.CONNECTION_URL, TEST_DB_URL);
+        p.setProperty(JdbcDBClient.DRIVER_CLASS, TEST_DB_DRIVER);
+        p.setProperty(JdbcDBClient.CONNECTION_USER, TEST_DB_USER);
+        p.setProperty(JdbcDBClient.DB_BATCH_SIZE, Integer.toString(batchSize));
+
+        jdbcDBClient.setProperties(p);
+        jdbcDBClient.init();
+      } catch (SQLException e) {
+        e.printStackTrace();
+        fail("Could not create local Database");
+      } catch (DBException e) {
+        e.printStackTrace();
+        fail("Could not create JdbcDBClient instance");
+      }
     }
 
     @AfterClass
@@ -75,7 +80,7 @@ public class JdbcDBClientTest {
         } catch (SQLException e) {
             e.printStackTrace();
         }
-        
+
         try {
             if (jdbcDBClient != null) {
                 jdbcDBClient.cleanup();
@@ -319,4 +324,65 @@ public class JdbcDBClientTest {
             testIndex++;
         }
     }
+
+    @Test
+    public void insertBatchTest() throws DBException {
+      insertBatchTest(20);
+    }
+
+    @Test
+    public void insertPartialBatchTest() throws DBException {
+      insertBatchTest(19);
+    }
+
+    public void insertBatchTest(int numRows) throws DBException {
+      teardown();
+      setupWithBatch(10);
+
+      try {
+        String insertKey = "user0";
+        HashMap<String, ByteIterator> insertMap = insertRow(insertKey);
+
+        ResultSet resultSet = jdbcConnection.prepareStatement(
+          String.format("SELECT * FROM %s", TABLE_NAME)
+            ).executeQuery();
+
+        // Check we do not have a result Row (because batch is not full yet
+        assertFalse(resultSet.next());
+
+        // insert more rows, completing 1 batch (still results are partial).
+        for (int i = 1; i < numRows; i++) {
+          insertMap = insertRow("user" + i);
+        }
+
+        //
+        assertNumRows(10 * (numRows / 10));
+
+        // call cleanup, which should insert the partial batch
+        jdbcDBClient.cleanup();
+
+        // Check that we have all rows
+        assertNumRows(numRows);
+
+      } catch (SQLException e) {
+        e.printStackTrace();
+        fail("Failed insertBatchTest");
+      } finally {
+        teardown(); // for next tests
+        setup();
+      }
+    }
+
+    private void assertNumRows(long numRows) throws SQLException {
+      ResultSet resultSet = jdbcConnection.prepareStatement(
+        String.format("SELECT * FROM %s", TABLE_NAME)
+          ).executeQuery();
+
+      for (int i = 0; i < numRows; i++) {
+        assertTrue("expecting " + numRows + " results, received only " + i, resultSet.next());
+      }
+      assertFalse("expecting " + numRows + " results, received more", resultSet.next());
+
+      resultSet.close();
+    }
 }
-- 
GitLab