diff --git a/core/src/main/java/com/yahoo/ycsb/Status.java b/core/src/main/java/com/yahoo/ycsb/Status.java index bc32b86c65d273d76bb702b6ae27b22b4d2cd609..5e5b3a887d3f3bb6c871c950977ab2585c3fc28f 100644 --- a/core/src/main/java/com/yahoo/ycsb/Status.java +++ b/core/src/main/java/com/yahoo/ycsb/Status.java @@ -87,6 +87,6 @@ public class Status { public static final Status BAD_REQUEST = new Status("BAD_REQUEST", "The request was not valid."); public static final Status FORBIDDEN = new Status("FORBIDDEN", "The operation is forbidden."); public static final Status SERVICE_UNAVAILABLE = new Status("SERVICE_UNAVAILABLE", "Dependant service for the current binding is not available."); - + public static final Status BATCHED_OK = new Status("BATCHED_OK", "The operation has been batched by the binding to be executed later."); } diff --git a/jdbc/README.md b/jdbc/README.md index 841375a8a809a7cb6ee813dc9c9c8882e1845722..f74ff54f019a1d576205463ef97c4b4a7cf12688 100644 --- a/jdbc/README.md +++ b/jdbc/README.md @@ -98,6 +98,7 @@ db.driver=com.mysql.jdbc.Driver # The JDBC driver class to use. db.url=jdbc:mysql://127.0.0.1:3306/ycsb # The Database connection URL. db.user=admin # User name for the connection. db.passwd=admin # Password for the connection. +db.batchsize=1000 # The batch size for doing batched inserts. Defaults to 0. Set to >0 to use batching. jdbc.fetchsize=10 # The JDBC fetch size hinted to the driver. jdbc.autocommit=true # The JDBC connection auto-commit property for the driver. ``` diff --git a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java index f6d5c3e2426df8f8e85b03883023ec197c3632f1..80dd338ef221361bb918c44e21910ec934bd13bd 100644 --- a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java +++ b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java @@ -1,18 +1,18 @@ /** * Copyright (c) 2010 - 2016 Yahoo! Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you + * + * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * + * may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing - * permissions and limitations under the License. See accompanying - * LICENSE file. + * permissions and limitations under the License. See accompanying + * LICENSE file. */ package com.yahoo.ycsb.db; @@ -31,11 +31,11 @@ import java.util.concurrent.ConcurrentMap; * A class that wraps a JDBC compliant database to allow it to be interfaced * with YCSB. This class extends {@link DB} and implements the database * interface used by YCSB client. - * + * * <br> * Each client will have its own instance of this class. This client is not * thread safe. - * + * * <br> * This interface expects a schema <key> <field1> <field2> <field3> ... All * attributes are of type VARCHAR. All accesses are through the primary key. @@ -55,6 +55,9 @@ public class JdbcDBClient extends DB { /** The password to use for establishing the connection. */ public static final String CONNECTION_PASSWD = "db.passwd"; + /** The batch size for batched inserts. Set to >0 to use batching */ + public static final String DB_BATCH_SIZE = "db.batchsize"; + /** The JDBC fetch size hinted to the driver. */ public static final String JDBC_FETCH_SIZE = "jdbc.fetchsize"; @@ -79,9 +82,11 @@ public class JdbcDBClient extends DB { private ArrayList<Connection> conns; private boolean initialized = false; private Properties props; - private Integer jdbcFetchSize; + private int jdbcFetchSize; + private int batchSize; private static final String DEFAULT_PROP = ""; private ConcurrentMap<StatementType, PreparedStatement> cachedStatements; + private long numRowsInBatch = 0; /** * Ordered field information for insert and update statements. @@ -213,6 +218,20 @@ public class JdbcDBClient extends DB { } } + /** Returns parsed int value from the properties if set, otherwise returns -1. */ + private static int getIntProperty(Properties props, String key) throws DBException { + String valueStr = props.getProperty(key); + if (valueStr != null) { + try { + return Integer.parseInt(valueStr); + } catch (NumberFormatException nfe) { + System.err.println("Invalid " + key + " specified: " + valueStr); + throw new DBException(nfe); + } + } + return -1; + } + @Override public void init() throws DBException { if (initialized) { @@ -225,15 +244,8 @@ public class JdbcDBClient extends DB { String passwd = props.getProperty(CONNECTION_PASSWD, DEFAULT_PROP); String driver = props.getProperty(DRIVER_CLASS); - String jdbcFetchSizeStr = props.getProperty(JDBC_FETCH_SIZE); - if (jdbcFetchSizeStr != null) { - try { - this.jdbcFetchSize = Integer.parseInt(jdbcFetchSizeStr); - } catch (NumberFormatException nfe) { - System.err.println("Invalid JDBC fetch size specified: " + jdbcFetchSizeStr); - throw new DBException(nfe); - } - } + this.jdbcFetchSize = getIntProperty(props, JDBC_FETCH_SIZE); + this.batchSize = getIntProperty(props, DB_BATCH_SIZE); String autoCommitStr = props.getProperty(JDBC_AUTO_COMMIT, Boolean.TRUE.toString()); Boolean autoCommit = Boolean.parseBoolean(autoCommitStr); @@ -258,7 +270,7 @@ public class JdbcDBClient extends DB { conns.add(conn); } - System.out.println("Using " + shardCount + " shards"); + System.out.println("Using shards: " + shardCount + ", batchSize:" + batchSize + ", fetchSize: " + jdbcFetchSize); cachedStatements = new ConcurrentHashMap<StatementType, PreparedStatement>(); } catch (ClassNotFoundException e) { @@ -276,6 +288,20 @@ public class JdbcDBClient extends DB { @Override public void cleanup() throws DBException { + if (batchSize > 0) { + try { + // commit un-finished batches + for (PreparedStatement st : cachedStatements.values()) { + if (!st.getConnection().isClosed() && !st.isClosed() && (numRowsInBatch % batchSize != 0)) { + st.executeBatch(); + } + } + } catch (SQLException e) { + System.err.println("Error in cleanup execution. " + e); + throw new DBException(e); + } + } + try { cleanupAllConnections(); } catch (SQLException e) { @@ -363,7 +389,7 @@ public class JdbcDBClient extends DB { select.append(PRIMARY_KEY); select.append(" LIMIT ?"); PreparedStatement scanStatement = getShardConnectionByKey(key).prepareStatement(select.toString()); - if (this.jdbcFetchSize != null) { + if (this.jdbcFetchSize > 0) { scanStatement.setFetchSize(this.jdbcFetchSize); } PreparedStatement stmt = cachedStatements.putIfAbsent(scanType, scanStatement); @@ -474,7 +500,22 @@ public class JdbcDBClient extends DB { for (String value: fieldInfo.getFieldValues()) { insertStatement.setString(index++, value); } - int result = insertStatement.executeUpdate(); + int result; + if (batchSize > 0) { + insertStatement.addBatch(); + if (++numRowsInBatch % batchSize == 0) { + int[] results = insertStatement.executeBatch(); + for (int r : results) { + if (r != 1) { + return Status.ERROR; + } + } + return Status.OK; + } + return Status.BATCHED_OK; + } else { + result = insertStatement.executeUpdate(); + } if (result == 1) { return Status.OK; } @@ -520,4 +561,4 @@ public class JdbcDBClient extends DB { return new OrderedFieldInfo(fieldKeys, fieldValues); } -} \ No newline at end of file +} diff --git a/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java b/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java index c3cc3024b547de02b2a96a8abb95f99831076dfc..6317f483c3f63aecf9cbf0ab216c23d473db2469 100644 --- a/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java +++ b/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java @@ -46,24 +46,29 @@ public class JdbcDBClientTest { @BeforeClass public static void setup() { - try { - jdbcConnection = DriverManager.getConnection(TEST_DB_URL); - jdbcDBClient = new JdbcDBClient(); - - Properties p = new Properties(); - p.setProperty(JdbcDBClient.CONNECTION_URL, TEST_DB_URL); - p.setProperty(JdbcDBClient.DRIVER_CLASS, TEST_DB_DRIVER); - p.setProperty(JdbcDBClient.CONNECTION_USER, TEST_DB_USER); + setupWithBatch(1); + } - jdbcDBClient.setProperties(p); - jdbcDBClient.init(); - } catch (SQLException e) { - e.printStackTrace(); - fail("Could not create local Database"); - } catch (DBException e) { - e.printStackTrace(); - fail("Could not create JdbcDBClient instance"); - } + public static void setupWithBatch(int batchSize) { + try { + jdbcConnection = DriverManager.getConnection(TEST_DB_URL); + jdbcDBClient = new JdbcDBClient(); + + Properties p = new Properties(); + p.setProperty(JdbcDBClient.CONNECTION_URL, TEST_DB_URL); + p.setProperty(JdbcDBClient.DRIVER_CLASS, TEST_DB_DRIVER); + p.setProperty(JdbcDBClient.CONNECTION_USER, TEST_DB_USER); + p.setProperty(JdbcDBClient.DB_BATCH_SIZE, Integer.toString(batchSize)); + + jdbcDBClient.setProperties(p); + jdbcDBClient.init(); + } catch (SQLException e) { + e.printStackTrace(); + fail("Could not create local Database"); + } catch (DBException e) { + e.printStackTrace(); + fail("Could not create JdbcDBClient instance"); + } } @AfterClass @@ -75,7 +80,7 @@ public class JdbcDBClientTest { } catch (SQLException e) { e.printStackTrace(); } - + try { if (jdbcDBClient != null) { jdbcDBClient.cleanup(); @@ -319,4 +324,65 @@ public class JdbcDBClientTest { testIndex++; } } + + @Test + public void insertBatchTest() throws DBException { + insertBatchTest(20); + } + + @Test + public void insertPartialBatchTest() throws DBException { + insertBatchTest(19); + } + + public void insertBatchTest(int numRows) throws DBException { + teardown(); + setupWithBatch(10); + + try { + String insertKey = "user0"; + HashMap<String, ByteIterator> insertMap = insertRow(insertKey); + + ResultSet resultSet = jdbcConnection.prepareStatement( + String.format("SELECT * FROM %s", TABLE_NAME) + ).executeQuery(); + + // Check we do not have a result Row (because batch is not full yet + assertFalse(resultSet.next()); + + // insert more rows, completing 1 batch (still results are partial). + for (int i = 1; i < numRows; i++) { + insertMap = insertRow("user" + i); + } + + // + assertNumRows(10 * (numRows / 10)); + + // call cleanup, which should insert the partial batch + jdbcDBClient.cleanup(); + + // Check that we have all rows + assertNumRows(numRows); + + } catch (SQLException e) { + e.printStackTrace(); + fail("Failed insertBatchTest"); + } finally { + teardown(); // for next tests + setup(); + } + } + + private void assertNumRows(long numRows) throws SQLException { + ResultSet resultSet = jdbcConnection.prepareStatement( + String.format("SELECT * FROM %s", TABLE_NAME) + ).executeQuery(); + + for (int i = 0; i < numRows; i++) { + assertTrue("expecting " + numRows + " results, received only " + i, resultSet.next()); + } + assertFalse("expecting " + numRows + " results, received more", resultSet.next()); + + resultSet.close(); + } } diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java index 8b7920810e36684e79f55fa33e3f109db3013f1e..a977fbe456182a563e424ca1b7cc2d700d016da1 100644 --- a/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java +++ b/mongodb/src/main/java/com/yahoo/ycsb/db/AsyncMongoDbClient.java @@ -286,7 +286,7 @@ public class AsyncMongoDbClient extends DB { batchedWriteCount += 1; if (batchedWriteCount < batchSize) { - return OptionsSupport.BATCHED_OK; + return Status.BATCHED_OK; } long count = collection.write(batchedWrite); diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java index 1db8f0e1cfcd0e7d28fa52b6c73b34ddf77803a8..2b7cb114fe7a7c09ccdee7d27cede1221c612bbc 100644 --- a/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java +++ b/mongodb/src/main/java/com/yahoo/ycsb/db/MongoDbClient.java @@ -286,7 +286,7 @@ public class MongoDbClient extends DB { } bulkInserts.clear(); } else { - return OptionsSupport.BATCHED_OK; + return Status.BATCHED_OK; } } return Status.OK; diff --git a/mongodb/src/main/java/com/yahoo/ycsb/db/OptionsSupport.java b/mongodb/src/main/java/com/yahoo/ycsb/db/OptionsSupport.java index c8aacd464f76946fa670f377ff0c0f853232b3ea..62092a0d10a51e4384156235f4a59504585ea853 100644 --- a/mongodb/src/main/java/com/yahoo/ycsb/db/OptionsSupport.java +++ b/mongodb/src/main/java/com/yahoo/ycsb/db/OptionsSupport.java @@ -18,8 +18,6 @@ package com.yahoo.ycsb.db; import java.util.Properties; -import com.yahoo.ycsb.Status; - /** * OptionsSupport provides methods for handling legacy options. * @@ -27,13 +25,6 @@ import com.yahoo.ycsb.Status; */ public final class OptionsSupport { - /** - * Status used for operations that have not been send to the server and have - * only been batched by the client. - */ - public static final Status BATCHED_OK = new Status("BATCHED_OK", - "The operation has been batched by the binding."); - /** Value for an unavailable property. */ private static final String UNAVAILABLE = "n/a";