diff --git a/jdbc/README.md b/jdbc/README.md index 841375a8a809a7cb6ee813dc9c9c8882e1845722..f74ff54f019a1d576205463ef97c4b4a7cf12688 100644 --- a/jdbc/README.md +++ b/jdbc/README.md @@ -98,6 +98,7 @@ db.driver=com.mysql.jdbc.Driver # The JDBC driver class to use. db.url=jdbc:mysql://127.0.0.1:3306/ycsb # The Database connection URL. db.user=admin # User name for the connection. db.passwd=admin # Password for the connection. +db.batchsize=1000 # The batch size for doing batched inserts. Defaults to 0. Set to >0 to use batching. jdbc.fetchsize=10 # The JDBC fetch size hinted to the driver. jdbc.autocommit=true # The JDBC connection auto-commit property for the driver. ``` diff --git a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java index f6d5c3e2426df8f8e85b03883023ec197c3632f1..80dd338ef221361bb918c44e21910ec934bd13bd 100644 --- a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java +++ b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java @@ -1,18 +1,18 @@ /** * Copyright (c) 2010 - 2016 Yahoo! Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you + * + * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * + * may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or * implied. See the License for the specific language governing - * permissions and limitations under the License. See accompanying - * LICENSE file. + * permissions and limitations under the License. See accompanying + * LICENSE file. */ package com.yahoo.ycsb.db; @@ -31,11 +31,11 @@ import java.util.concurrent.ConcurrentMap; * A class that wraps a JDBC compliant database to allow it to be interfaced * with YCSB. This class extends {@link DB} and implements the database * interface used by YCSB client. - * + * * <br> * Each client will have its own instance of this class. This client is not * thread safe. - * + * * <br> * This interface expects a schema <key> <field1> <field2> <field3> ... All * attributes are of type VARCHAR. All accesses are through the primary key. @@ -55,6 +55,9 @@ public class JdbcDBClient extends DB { /** The password to use for establishing the connection. */ public static final String CONNECTION_PASSWD = "db.passwd"; + /** The batch size for batched inserts. Set to >0 to use batching */ + public static final String DB_BATCH_SIZE = "db.batchsize"; + /** The JDBC fetch size hinted to the driver. */ public static final String JDBC_FETCH_SIZE = "jdbc.fetchsize"; @@ -79,9 +82,11 @@ public class JdbcDBClient extends DB { private ArrayList<Connection> conns; private boolean initialized = false; private Properties props; - private Integer jdbcFetchSize; + private int jdbcFetchSize; + private int batchSize; private static final String DEFAULT_PROP = ""; private ConcurrentMap<StatementType, PreparedStatement> cachedStatements; + private long numRowsInBatch = 0; /** * Ordered field information for insert and update statements. @@ -213,6 +218,20 @@ public class JdbcDBClient extends DB { } } + /** Returns parsed int value from the properties if set, otherwise returns -1. */ + private static int getIntProperty(Properties props, String key) throws DBException { + String valueStr = props.getProperty(key); + if (valueStr != null) { + try { + return Integer.parseInt(valueStr); + } catch (NumberFormatException nfe) { + System.err.println("Invalid " + key + " specified: " + valueStr); + throw new DBException(nfe); + } + } + return -1; + } + @Override public void init() throws DBException { if (initialized) { @@ -225,15 +244,8 @@ public class JdbcDBClient extends DB { String passwd = props.getProperty(CONNECTION_PASSWD, DEFAULT_PROP); String driver = props.getProperty(DRIVER_CLASS); - String jdbcFetchSizeStr = props.getProperty(JDBC_FETCH_SIZE); - if (jdbcFetchSizeStr != null) { - try { - this.jdbcFetchSize = Integer.parseInt(jdbcFetchSizeStr); - } catch (NumberFormatException nfe) { - System.err.println("Invalid JDBC fetch size specified: " + jdbcFetchSizeStr); - throw new DBException(nfe); - } - } + this.jdbcFetchSize = getIntProperty(props, JDBC_FETCH_SIZE); + this.batchSize = getIntProperty(props, DB_BATCH_SIZE); String autoCommitStr = props.getProperty(JDBC_AUTO_COMMIT, Boolean.TRUE.toString()); Boolean autoCommit = Boolean.parseBoolean(autoCommitStr); @@ -258,7 +270,7 @@ public class JdbcDBClient extends DB { conns.add(conn); } - System.out.println("Using " + shardCount + " shards"); + System.out.println("Using shards: " + shardCount + ", batchSize:" + batchSize + ", fetchSize: " + jdbcFetchSize); cachedStatements = new ConcurrentHashMap<StatementType, PreparedStatement>(); } catch (ClassNotFoundException e) { @@ -276,6 +288,20 @@ public class JdbcDBClient extends DB { @Override public void cleanup() throws DBException { + if (batchSize > 0) { + try { + // commit un-finished batches + for (PreparedStatement st : cachedStatements.values()) { + if (!st.getConnection().isClosed() && !st.isClosed() && (numRowsInBatch % batchSize != 0)) { + st.executeBatch(); + } + } + } catch (SQLException e) { + System.err.println("Error in cleanup execution. " + e); + throw new DBException(e); + } + } + try { cleanupAllConnections(); } catch (SQLException e) { @@ -363,7 +389,7 @@ public class JdbcDBClient extends DB { select.append(PRIMARY_KEY); select.append(" LIMIT ?"); PreparedStatement scanStatement = getShardConnectionByKey(key).prepareStatement(select.toString()); - if (this.jdbcFetchSize != null) { + if (this.jdbcFetchSize > 0) { scanStatement.setFetchSize(this.jdbcFetchSize); } PreparedStatement stmt = cachedStatements.putIfAbsent(scanType, scanStatement); @@ -474,7 +500,22 @@ public class JdbcDBClient extends DB { for (String value: fieldInfo.getFieldValues()) { insertStatement.setString(index++, value); } - int result = insertStatement.executeUpdate(); + int result; + if (batchSize > 0) { + insertStatement.addBatch(); + if (++numRowsInBatch % batchSize == 0) { + int[] results = insertStatement.executeBatch(); + for (int r : results) { + if (r != 1) { + return Status.ERROR; + } + } + return Status.OK; + } + return Status.BATCHED_OK; + } else { + result = insertStatement.executeUpdate(); + } if (result == 1) { return Status.OK; } @@ -520,4 +561,4 @@ public class JdbcDBClient extends DB { return new OrderedFieldInfo(fieldKeys, fieldValues); } -} \ No newline at end of file +} diff --git a/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java b/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java index c3cc3024b547de02b2a96a8abb95f99831076dfc..6317f483c3f63aecf9cbf0ab216c23d473db2469 100644 --- a/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java +++ b/jdbc/src/test/java/com/yahoo/ycsb/db/JdbcDBClientTest.java @@ -46,24 +46,29 @@ public class JdbcDBClientTest { @BeforeClass public static void setup() { - try { - jdbcConnection = DriverManager.getConnection(TEST_DB_URL); - jdbcDBClient = new JdbcDBClient(); - - Properties p = new Properties(); - p.setProperty(JdbcDBClient.CONNECTION_URL, TEST_DB_URL); - p.setProperty(JdbcDBClient.DRIVER_CLASS, TEST_DB_DRIVER); - p.setProperty(JdbcDBClient.CONNECTION_USER, TEST_DB_USER); + setupWithBatch(1); + } - jdbcDBClient.setProperties(p); - jdbcDBClient.init(); - } catch (SQLException e) { - e.printStackTrace(); - fail("Could not create local Database"); - } catch (DBException e) { - e.printStackTrace(); - fail("Could not create JdbcDBClient instance"); - } + public static void setupWithBatch(int batchSize) { + try { + jdbcConnection = DriverManager.getConnection(TEST_DB_URL); + jdbcDBClient = new JdbcDBClient(); + + Properties p = new Properties(); + p.setProperty(JdbcDBClient.CONNECTION_URL, TEST_DB_URL); + p.setProperty(JdbcDBClient.DRIVER_CLASS, TEST_DB_DRIVER); + p.setProperty(JdbcDBClient.CONNECTION_USER, TEST_DB_USER); + p.setProperty(JdbcDBClient.DB_BATCH_SIZE, Integer.toString(batchSize)); + + jdbcDBClient.setProperties(p); + jdbcDBClient.init(); + } catch (SQLException e) { + e.printStackTrace(); + fail("Could not create local Database"); + } catch (DBException e) { + e.printStackTrace(); + fail("Could not create JdbcDBClient instance"); + } } @AfterClass @@ -75,7 +80,7 @@ public class JdbcDBClientTest { } catch (SQLException e) { e.printStackTrace(); } - + try { if (jdbcDBClient != null) { jdbcDBClient.cleanup(); @@ -319,4 +324,65 @@ public class JdbcDBClientTest { testIndex++; } } + + @Test + public void insertBatchTest() throws DBException { + insertBatchTest(20); + } + + @Test + public void insertPartialBatchTest() throws DBException { + insertBatchTest(19); + } + + public void insertBatchTest(int numRows) throws DBException { + teardown(); + setupWithBatch(10); + + try { + String insertKey = "user0"; + HashMap<String, ByteIterator> insertMap = insertRow(insertKey); + + ResultSet resultSet = jdbcConnection.prepareStatement( + String.format("SELECT * FROM %s", TABLE_NAME) + ).executeQuery(); + + // Check we do not have a result Row (because batch is not full yet + assertFalse(resultSet.next()); + + // insert more rows, completing 1 batch (still results are partial). + for (int i = 1; i < numRows; i++) { + insertMap = insertRow("user" + i); + } + + // + assertNumRows(10 * (numRows / 10)); + + // call cleanup, which should insert the partial batch + jdbcDBClient.cleanup(); + + // Check that we have all rows + assertNumRows(numRows); + + } catch (SQLException e) { + e.printStackTrace(); + fail("Failed insertBatchTest"); + } finally { + teardown(); // for next tests + setup(); + } + } + + private void assertNumRows(long numRows) throws SQLException { + ResultSet resultSet = jdbcConnection.prepareStatement( + String.format("SELECT * FROM %s", TABLE_NAME) + ).executeQuery(); + + for (int i = 0; i < numRows; i++) { + assertTrue("expecting " + numRows + " results, received only " + i, resultSet.next()); + } + assertFalse("expecting " + numRows + " results, received more", resultSet.next()); + + resultSet.close(); + } }