From 74b5080086c589496f26e6151d2c32eac2424eae Mon Sep 17 00:00:00 2001
From: Enis Soztutar <enis@apache.org>
Date: Fri, 20 May 2016 16:53:49 -0700
Subject: [PATCH] Add db.flavor to capture syntax differences among JDBC
 implementations. Add Phoenix flavor for Apache Phoenix.

---
 .../java/com/yahoo/ycsb/db/JdbcDBClient.java  | 241 ++++++++++++++----
 1 file changed, 186 insertions(+), 55 deletions(-)

diff --git a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java
index 80dd338e..df9cb218 100644
--- a/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java
+++ b/jdbc/src/main/java/com/yahoo/ycsb/db/JdbcDBClient.java
@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2010 - 2016 Yahoo! Inc. All rights reserved.
+ * Copyright (c) 2010 - 2016 Yahoo! Inc., 2016 YCSB contributors. All rights reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you
  * may not use this file except in compliance with the License. You
@@ -87,6 +87,9 @@ public class JdbcDBClient extends DB {
   private static final String DEFAULT_PROP = "";
   private ConcurrentMap<StatementType, PreparedStatement> cachedStatements;
   private long numRowsInBatch = 0;
+  /** DB flavor defines DB-specific syntax and behavior for the
+   * particular database. Current database flavors are: {default, phoenix} */
+  private DBFlavor dbFlavor;
 
   /**
    * Ordered field information for insert and update statements.
@@ -109,6 +112,55 @@ public class JdbcDBClient extends DB {
     }
   }
 
+  /**
+   * DBFlavor captures minor differences in syntax and behavior among JDBC implementations and SQL
+   * dialects.
+   */
+  private abstract static class DBFlavor {
+    enum DBName {
+      DEFAULT,
+      PHOENIX
+    }
+
+    private final DBName dbName;
+
+    public DBFlavor(DBName dbName) {
+      this.dbName = dbName;
+    }
+
+    public static DBFlavor fromJdbcUrl(String url) {
+      if (url.startsWith("jdbc:phoenix")) {
+        return new PhoenixDBFlavor();
+      }
+      return new DefaultDBFlavor();
+    }
+
+    /**
+     * Create and return a SQL statement for inserting data.
+     */
+    abstract String createInsertStatement(StatementType insertType, String key);
+
+    /**
+     * Create and return a SQL statement for reading data.
+     */
+    abstract String createReadStatement(StatementType readType, String key);
+
+    /**
+     * Create and return a SQL statement for deleting data.
+     */
+    abstract String createDeleteStatement(StatementType deleteType, String key);
+
+    /**
+     * Create and return a SQL statement for updating data.
+     */
+    abstract String createUpdateStatement(StatementType updateType, String key);
+
+    /**
+     * Create and return a SQL statement for scanning data.
+     */
+    abstract String createScanStatement(StatementType scanType, String key);
+  }
+
   /**
    * The statement type for the prepared statements.
    */
@@ -232,6 +284,15 @@ public class JdbcDBClient extends DB {
     return -1;
   }
 
+  /** Returns parsed boolean value from the properties if set, otherwise returns defaultVal. */
+  private static boolean getBoolProperty(Properties props, String key, boolean defaultVal) {
+    String valueStr = props.getProperty(key);
+    if (valueStr != null) {
+      return Boolean.parseBoolean(valueStr);
+    }
+    return defaultVal;
+  }
+
   @Override
   public void init() throws DBException {
     if (initialized) {
@@ -247,8 +308,7 @@ public class JdbcDBClient extends DB {
     this.jdbcFetchSize = getIntProperty(props, JDBC_FETCH_SIZE);
     this.batchSize = getIntProperty(props, DB_BATCH_SIZE);
 
-    String autoCommitStr = props.getProperty(JDBC_AUTO_COMMIT, Boolean.TRUE.toString());
-    Boolean autoCommit = Boolean.parseBoolean(autoCommitStr);
+    boolean autoCommit = getBoolProperty(props, JDBC_AUTO_COMMIT, true);
 
     try {
       if (driver != null) {
@@ -256,7 +316,8 @@ public class JdbcDBClient extends DB {
       }
       int shardCount = 0;
       conns = new ArrayList<Connection>(3);
-      for (String url : urls.split(",")) {
+      String[] urlArr = urls.split(",");
+      for (String url : urlArr) {
         System.out.println("Adding shard node URL: " + url);
         Connection conn = DriverManager.getConnection(url, user, passwd);
 
@@ -273,6 +334,8 @@ public class JdbcDBClient extends DB {
       System.out.println("Using shards: " + shardCount + ", batchSize:" + batchSize + ", fetchSize: " + jdbcFetchSize);
 
       cachedStatements = new ConcurrentHashMap<StatementType, PreparedStatement>();
+
+      this.dbFlavor = DBFlavor.fromJdbcUrl(urlArr[0]);
     } catch (ClassNotFoundException e) {
       System.err.println("Error in initializing the JDBS driver: " + e);
       throw new DBException(e);
@@ -283,6 +346,7 @@ public class JdbcDBClient extends DB {
       System.err.println("Invalid value for fieldcount property. " + e);
       throw new DBException(e);
     }
+
     initialized = true;
   }
 
@@ -310,16 +374,10 @@ public class JdbcDBClient extends DB {
     }
   }
 
-  private PreparedStatement createAndCacheInsertStatement(StatementType insertType, String key) throws SQLException {
-    StringBuilder insert = new StringBuilder("INSERT INTO ");
-    insert.append(insertType.tableName);
-    insert.append(" (" + PRIMARY_KEY + "," + insertType.fieldString + ")");
-    insert.append(" VALUES(?");
-    for (int i = 0; i < insertType.numFields; i++) {
-      insert.append(",?");
-    }
-    insert.append(")");
-    PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(insert.toString());
+  private PreparedStatement createAndCacheInsertStatement(StatementType insertType, String key)
+      throws SQLException {
+    String insert = dbFlavor.createInsertStatement(insertType, key);
+    PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(insert);
     PreparedStatement stmt = cachedStatements.putIfAbsent(insertType, insertStatement);
     if (stmt == null) {
       return insertStatement;
@@ -327,14 +385,10 @@ public class JdbcDBClient extends DB {
     return stmt;
   }
 
-  private PreparedStatement createAndCacheReadStatement(StatementType readType, String key) throws SQLException {
-    StringBuilder read = new StringBuilder("SELECT * FROM ");
-    read.append(readType.tableName);
-    read.append(" WHERE ");
-    read.append(PRIMARY_KEY);
-    read.append(" = ");
-    read.append("?");
-    PreparedStatement readStatement = getShardConnectionByKey(key).prepareStatement(read.toString());
+  private PreparedStatement createAndCacheReadStatement(StatementType readType, String key)
+      throws SQLException {
+    String read = dbFlavor.createReadStatement(readType, key);
+    PreparedStatement readStatement = getShardConnectionByKey(key).prepareStatement(read);
     PreparedStatement stmt = cachedStatements.putIfAbsent(readType, readStatement);
     if (stmt == null) {
       return readStatement;
@@ -342,13 +396,10 @@ public class JdbcDBClient extends DB {
     return stmt;
   }
 
-  private PreparedStatement createAndCacheDeleteStatement(StatementType deleteType, String key) throws SQLException {
-    StringBuilder delete = new StringBuilder("DELETE FROM ");
-    delete.append(deleteType.tableName);
-    delete.append(" WHERE ");
-    delete.append(PRIMARY_KEY);
-    delete.append(" = ?");
-    PreparedStatement deleteStatement = getShardConnectionByKey(key).prepareStatement(delete.toString());
+  private PreparedStatement createAndCacheDeleteStatement(StatementType deleteType, String key)
+      throws SQLException {
+    String delete = dbFlavor.createDeleteStatement(deleteType, key);
+    PreparedStatement deleteStatement = getShardConnectionByKey(key).prepareStatement(delete);
     PreparedStatement stmt = cachedStatements.putIfAbsent(deleteType, deleteStatement);
     if (stmt == null) {
       return deleteStatement;
@@ -356,22 +407,10 @@ public class JdbcDBClient extends DB {
     return stmt;
   }
 
-  private PreparedStatement createAndCacheUpdateStatement(StatementType updateType, String key) throws SQLException {
-    String[] fieldKeys = updateType.fieldString.split(",");
-    StringBuilder update = new StringBuilder("UPDATE ");
-    update.append(updateType.tableName);
-    update.append(" SET ");
-    for (int i = 0; i < fieldKeys.length; i++) {
-      update.append(fieldKeys[i]);
-      update.append("=?");
-      if (i < fieldKeys.length - 1) {
-        update.append(", ");
-      }
-    }
-    update.append(" WHERE ");
-    update.append(PRIMARY_KEY);
-    update.append(" = ?");
-    PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(update.toString());
+  private PreparedStatement createAndCacheUpdateStatement(StatementType updateType, String key)
+      throws SQLException {
+    String update = dbFlavor.createUpdateStatement(updateType, key);
+    PreparedStatement insertStatement = getShardConnectionByKey(key).prepareStatement(update);
     PreparedStatement stmt = cachedStatements.putIfAbsent(updateType, insertStatement);
     if (stmt == null) {
       return insertStatement;
@@ -379,16 +418,10 @@ public class JdbcDBClient extends DB {
     return stmt;
   }
 
-  private PreparedStatement createAndCacheScanStatement(StatementType scanType, String key) throws SQLException {
-    StringBuilder select = new StringBuilder("SELECT * FROM ");
-    select.append(scanType.tableName);
-    select.append(" WHERE ");
-    select.append(PRIMARY_KEY);
-    select.append(" >= ?");
-    select.append(" ORDER BY ");
-    select.append(PRIMARY_KEY);
-    select.append(" LIMIT ?");
-    PreparedStatement scanStatement = getShardConnectionByKey(key).prepareStatement(select.toString());
+  private PreparedStatement createAndCacheScanStatement(StatementType scanType, String key)
+      throws SQLException {
+    String select = dbFlavor.createScanStatement(scanType, key);
+    PreparedStatement scanStatement = getShardConnectionByKey(key).prepareStatement(select);
     if (this.jdbcFetchSize > 0) {
       scanStatement.setFetchSize(this.jdbcFetchSize);
     }
@@ -399,6 +432,104 @@ public class JdbcDBClient extends DB {
     return stmt;
   }
 
+  private static class DefaultDBFlavor extends DBFlavor {
+    public DefaultDBFlavor() {
+      super(DBName.DEFAULT);
+    }
+    public DefaultDBFlavor(DBName dbName) {
+      super(dbName);
+    }
+
+    @Override
+    String createInsertStatement(StatementType insertType, String key) {
+      StringBuilder insert = new StringBuilder("INSERT INTO ");
+      insert.append(insertType.tableName);
+      insert.append(" (" + PRIMARY_KEY + "," + insertType.fieldString + ")");
+      insert.append(" VALUES(?");
+      for (int i = 0; i < insertType.numFields; i++) {
+        insert.append(",?");
+      }
+      insert.append(")");
+      return insert.toString();
+    }
+
+    @Override
+    String createReadStatement(StatementType readType, String key) {
+      StringBuilder read = new StringBuilder("SELECT * FROM ");
+      read.append(readType.tableName);
+      read.append(" WHERE ");
+      read.append(PRIMARY_KEY);
+      read.append(" = ");
+      read.append("?");
+      return read.toString();
+    }
+
+    @Override
+    String createDeleteStatement(StatementType deleteType, String key) {
+      StringBuilder delete = new StringBuilder("DELETE FROM ");
+      delete.append(deleteType.tableName);
+      delete.append(" WHERE ");
+      delete.append(PRIMARY_KEY);
+      delete.append(" = ?");
+      return delete.toString();
+    }
+
+    @Override
+    String createUpdateStatement(StatementType updateType, String key) {
+      String[] fieldKeys = updateType.fieldString.split(",");
+      StringBuilder update = new StringBuilder("UPDATE ");
+      update.append(updateType.tableName);
+      update.append(" SET ");
+      for (int i = 0; i < fieldKeys.length; i++) {
+        update.append(fieldKeys[i]);
+        update.append("=?");
+        if (i < fieldKeys.length - 1) {
+          update.append(", ");
+        }
+      }
+      update.append(" WHERE ");
+      update.append(PRIMARY_KEY);
+      update.append(" = ?");
+      return update.toString();
+    }
+
+    @Override
+    String createScanStatement(StatementType scanType, String key) {
+      StringBuilder select = new StringBuilder("SELECT * FROM ");
+      select.append(scanType.tableName);
+      select.append(" WHERE ");
+      select.append(PRIMARY_KEY);
+      select.append(" >= ?");
+      select.append(" ORDER BY ");
+      select.append(PRIMARY_KEY);
+      select.append(" LIMIT ?");
+      return select.toString();
+    }
+  }
+
+  /**
+   * Database flavor for Apache Phoenix. Captures syntax differences used by Phoenix.
+   */
+  private static class PhoenixDBFlavor extends DefaultDBFlavor {
+    public PhoenixDBFlavor() {
+      super(DBName.PHOENIX);
+    }
+
+    @Override
+    String createInsertStatement(StatementType insertType, String key) {
+      // Phoenix uses UPSERT syntax
+      StringBuilder insert = new StringBuilder("UPSERT INTO ");
+      insert.append(insertType.tableName);
+      insert.append(" (" + PRIMARY_KEY + "," + insertType.fieldString + ")");
+      insert.append(" VALUES(?");
+      for (int i = 0; i < insertType.numFields; i++) {
+        insert.append(",?");
+      }
+      insert.append(")");
+      return insert.toString();
+    }
+  }
+
   @Override
   public Status read(String tableName, String key, Set<String> fields, HashMap<String, ByteIterator> result) {
     try {
-- 
GitLab