diff --git a/azuretablestorage/README.md b/azuretablestorage/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..96c01281f5a9db4793b5fee7287be5633d706f6c
--- /dev/null
+++ b/azuretablestorage/README.md
@@ -0,0 +1,73 @@
+<!--
+Copyright (c) 2016 YCSB contributors. All rights reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License"); you
+may not use this file except in compliance with the License. You
+may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+implied. See the License for the specific language governing
+permissions and limitations under the License. See accompanying
+LICENSE file.
+-->
+
+## Quick Start
+
+This section describes how to run YCSB on Azure table storage. 
+
+### 1. Create an Azure Storage account.
+###    https://azure.microsoft.com/en-us/documentation/articles/storage-create-storage-account/#create-a-storage-account
+
+### 2. Install Java and Maven
+
+### 3. Set Up YCSB
+
+Git clone YCSB and compile:
+
+    git clone http://github.com/brianfrankcooper/YCSB.git
+    cd YCSB
+    mvn -pl com.yahoo.ycsb:azuretablestorage-binding -am clean package
+
+### 4. Provide Azure Storage parameters
+    
+Set the account name and access key.
+
+- `azure.account`
+- `azure.key`
+
+Or, you can set configs with the shell command, EG:
+
+    ./bin/ycsb load azuretablestorage -s -P workloads/workloada -p azure.account=YourAccountName -p azure.key=YourAccessKey > outputLoad.txt
+
+### 5. Load data and run tests
+
+Load the data:
+
+    ./bin/ycsb load azuretablestorage -s -P workloads/workloada -p azure.account=YourAccountName -p azure.key=YourAccessKey > outputLoad.txt
+
+Run the workload test:
+
+    ./bin/ycsb run azuretablestorage -s -P workloads/workloada -p azure.account=YourAccountName -p azure.key=YourAccessKey > outputRun.txt
+	
+### 6. Optional Azure Storage parameters
+
+- `azure.batchsize`	
+	Could be between 1 ~ 100. Insert records to table in batch if batchsize > 1.
+- `azure.protocol`
+	https(in default) or http.
+- `azure.table`
+	The name of the table('usertable' in default).
+- `azure.partitionkey`
+	The partitionkey('Test' in default).
+- `azure.endpoint`
+	For Azure stack WOSS.
+	
+EG:
+    ./bin/ycsb load azuretablestorage -s -P workloads/workloada -p azure.account=YourAccountName -p azure.key=YourAccessKey -p azure.batchsize=100 -p azure.protocol=http
+	
+	
+
diff --git a/azuretablestorage/pom.xml b/azuretablestorage/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..52a89b9783d8426a562068ce2a210a364bf57f4c
--- /dev/null
+++ b/azuretablestorage/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- 
+Copyright (c) 2016 YCSB contributors. All rights reserved.
+
+Licensed under the Apache License, Version 2.0 (the "License"); you
+may not use this file except in compliance with the License. You
+may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+implied. See the License for the specific language governing
+permissions and limitations under the License. See accompanying
+LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>com.yahoo.ycsb</groupId>
+        <artifactId>binding-parent</artifactId>
+        <version>0.12.0-SNAPSHOT</version>
+        <relativePath>../binding-parent</relativePath>
+    </parent>
+
+    <artifactId>azuretablestorage-binding</artifactId>
+    <name>Azure table storage Binding</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.yahoo.ycsb</groupId>
+            <artifactId>core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+    		<groupId>com.microsoft.azure</groupId>
+    		<artifactId>azure-storage</artifactId>
+    		<version>${azurestorage.version}</version>
+		</dependency>
+    </dependencies>
+</project>
diff --git a/azuretablestorage/src/main/java/com/yahoo/ycsb/db/azuretablestorage/AzureClient.java b/azuretablestorage/src/main/java/com/yahoo/ycsb/db/azuretablestorage/AzureClient.java
new file mode 100644
index 0000000000000000000000000000000000000000..87b2a208fb2d6aa9860da3c685e0411e7f3b337e
--- /dev/null
+++ b/azuretablestorage/src/main/java/com/yahoo/ycsb/db/azuretablestorage/AzureClient.java
@@ -0,0 +1,279 @@
+/**
+ * Copyright (c) 2016 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+package com.yahoo.ycsb.db.azuretablestorage;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.table.CloudTable;
+import com.microsoft.azure.storage.table.CloudTableClient;
+import com.microsoft.azure.storage.table.DynamicTableEntity;
+import com.microsoft.azure.storage.table.EntityProperty;
+import com.microsoft.azure.storage.table.EntityResolver;
+import com.microsoft.azure.storage.table.TableBatchOperation;
+import com.microsoft.azure.storage.table.TableOperation;
+import com.microsoft.azure.storage.table.TableQuery;
+import com.microsoft.azure.storage.table.TableServiceEntity;
+import com.yahoo.ycsb.ByteArrayByteIterator;
+import com.yahoo.ycsb.ByteIterator;
+import com.yahoo.ycsb.DB;
+import com.yahoo.ycsb.DBException;
+import com.yahoo.ycsb.Status;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Vector;
+
+
+/**
+ * YCSB binding for <a href="https://azure.microsoft.com/en-us/services/storage/">Azure</a>.
+ * See {@code azure/README.md} for details.
+ */
+public class AzureClient extends DB {
+
+  public static final String PROTOCOL = "azure.protocal";
+  public static final String PROTOCOL_DEFAULT = "https";
+  public static final String TABLE_ENDPOINT = "azure.endpoint";
+  public static final String ACCOUNT = "azure.account";
+  public static final String KEY = "azure.key";
+  public static final String TABLE = "azure.table";
+  public static final String TABLE_DEFAULT = "usertable";
+  public static final String PARTITIONKEY = "azure.partitionkey";
+  public static final String PARTITIONKEY_DEFAULT = "Test";
+  public static final String BATCHSIZE = "azure.batchsize";
+  public static final String BATCHSIZE_DEFAULT = "1";
+  private static final int BATCHSIZE_UPPERBOUND = 100;
+  private static final TableBatchOperation BATCH_OPERATION = new TableBatchOperation();
+  private static String partitionKey;
+  private CloudStorageAccount storageAccount = null;
+  private CloudTableClient tableClient = null;
+  private CloudTable cloudTable = null;
+  private static int batchSize;
+  private static int curIdx = 0;
+
+  @Override
+  public void init() throws DBException {
+    Properties props = getProperties();
+    String protocol = props.getProperty(PROTOCOL, PROTOCOL_DEFAULT);
+    if (protocol != "https" && protocol != "http") {
+      throw new DBException("Protocol must be 'http' or 'https'!\n");
+    }
+    String table = props.getProperty(TABLE, TABLE_DEFAULT);
+    partitionKey = props.getProperty(PARTITIONKEY, PARTITIONKEY_DEFAULT);
+    batchSize = Integer.parseInt(props.getProperty(BATCHSIZE, BATCHSIZE_DEFAULT));
+    if (batchSize < 1 || batchSize > BATCHSIZE_UPPERBOUND) {
+      throw new DBException(String.format("Batchsize must be between 1 and %d!\n", 
+          BATCHSIZE_UPPERBOUND));
+    }
+    String account = props.getProperty(ACCOUNT);
+    String key = props.getProperty(KEY);
+    String tableEndPoint = props.getProperty(TABLE_ENDPOINT);
+    String storageConnectionString = getStorageConnectionString(protocol, account, key, tableEndPoint);
+    try {
+      storageAccount = CloudStorageAccount.parse(storageConnectionString);
+    } catch (Exception e)  {
+      throw new DBException("Could not connect to the account.\n", e);
+    }
+    tableClient = storageAccount.createCloudTableClient();
+    try {
+      cloudTable = tableClient.getTableReference(table);
+      cloudTable.createIfNotExists();
+    } catch (Exception e)  {
+      throw new DBException("Could not connect to the table.\n", e);
+    }
+  }
+
+  @Override
+  public void cleanup() {
+  }
+
+  @Override
+  public Status read(String table, String key, Set<String> fields,
+      final HashMap<String, ByteIterator> result) {
+    if (fields != null) {
+      return readSubset(key, fields, result);
+    } else {
+      return readEntity(key, result);
+    }
+  }
+
+  @Override
+  public Status scan(String table, String startkey, int recordcount, 
+      Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
+    try {
+      String whereStr = String.format("(PartitionKey eq '%s') and (RowKey ge '%s')", 
+          partitionKey, startkey);
+      TableQuery<DynamicTableEntity> scanQuery = 
+          new TableQuery<DynamicTableEntity>(DynamicTableEntity.class)
+          .where(whereStr).take(recordcount);
+      int cnt = 0;
+      for (DynamicTableEntity entity : cloudTable.execute(scanQuery)) {
+        HashMap<String, EntityProperty> properties = entity.getProperties();
+        HashMap<String, ByteIterator> cur = new HashMap<String, ByteIterator>();
+        for (Entry<String, EntityProperty> entry : properties.entrySet()) {
+          String fieldName = entry.getKey();
+          ByteIterator fieldVal = new ByteArrayByteIterator(entry.getValue().getValueAsByteArray());
+          if (fields == null || fields.contains(fieldName)) {
+            cur.put(fieldName, fieldVal);
+          }
+        }
+        result.add(cur);
+        if (++cnt == recordcount) {
+          break;
+        }
+      }
+      return Status.OK;
+    } catch (Exception e) {
+      return Status.ERROR;
+    }
+  }
+
+  @Override
+  public Status update(String table, String key, HashMap<String, ByteIterator> values) {
+    return insertOrUpdate(key, values);
+  }
+
+  @Override
+  public Status insert(String table, String key, HashMap<String, ByteIterator> values) {
+    if (batchSize == 1) {
+      return insertOrUpdate(key, values);
+    } else {
+      return insertBatch(key, values);
+    }
+  }
+
+  @Override
+  public Status delete(String table, String key) {
+    try {
+      // firstly, retrieve the entity to be deleted
+      TableOperation retrieveOp = 
+          TableOperation.retrieve(partitionKey, key, TableServiceEntity.class);
+      TableServiceEntity entity = cloudTable.execute(retrieveOp).getResultAsType();
+      // secondly, delete the entity
+      TableOperation deleteOp = TableOperation.delete(entity);
+      cloudTable.execute(deleteOp);
+      return Status.OK;
+    } catch (Exception e) {
+      return Status.ERROR;
+    }
+  }
+
+  private String getStorageConnectionString(String protocol, String account, String key, String tableEndPoint) {
+    String res = 
+        String.format("DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s", 
+        protocol, account, key);
+    if (tableEndPoint != null) {
+      res = String.format("%s;TableEndpoint=%s", res, tableEndPoint);
+    }
+    return res;
+  }
+
+  /*
+   * Read subset of properties instead of full fields with projection.
+   */
+  public Status readSubset(String key, Set<String> fields, HashMap<String, ByteIterator> result) {
+    String whereStr = String.format("RowKey eq '%s'", key);
+
+    TableQuery<TableServiceEntity> projectionQuery = TableQuery.from(
+        TableServiceEntity.class).where(whereStr).select(fields.toArray(new String[0]));
+
+    EntityResolver<HashMap<String, ByteIterator>> resolver = 
+        new EntityResolver<HashMap<String, ByteIterator>>() {
+          public HashMap<String, ByteIterator> resolve(String partitionkey, String rowKey, 
+              Date timeStamp, HashMap<String, EntityProperty> properties, String etag) {
+            HashMap<String, ByteIterator> tmp = new HashMap<String, ByteIterator>();
+            for (Entry<String, EntityProperty> entry : properties.entrySet()) {
+              String key = entry.getKey();
+              ByteIterator val = new ByteArrayByteIterator(entry.getValue().getValueAsByteArray());
+              tmp.put(key, val);
+            }
+            return tmp;
+      }
+    };
+    try {
+      for (HashMap<String, ByteIterator> tmp : cloudTable.execute(projectionQuery, resolver)) {
+        for (Entry<String, ByteIterator> entry : tmp.entrySet()){
+          String fieldName = entry.getKey();
+          ByteIterator fieldVal = entry.getValue();
+          result.put(fieldName, fieldVal);
+        }
+      }
+      return Status.OK;
+    } catch (Exception e) {
+      return Status.ERROR;
+    }
+  }
+
+  private Status readEntity(String key, HashMap<String, ByteIterator> result) {
+    try {
+      // firstly, retrieve the entity to be deleted
+      TableOperation retrieveOp = 
+          TableOperation.retrieve(partitionKey, key, DynamicTableEntity.class);
+      DynamicTableEntity entity = cloudTable.execute(retrieveOp).getResultAsType();
+      HashMap<String, EntityProperty> properties = entity.getProperties();
+      for (Entry<String, EntityProperty> entry: properties.entrySet()) {
+        String fieldName = entry.getKey();
+        ByteIterator fieldVal = new ByteArrayByteIterator(entry.getValue().getValueAsByteArray());
+        result.put(fieldName, fieldVal);
+      }
+      return Status.OK;
+    } catch (Exception e) {
+      return Status.ERROR;
+    }
+  }
+
+  private Status insertBatch(String key, HashMap<String, ByteIterator> values) {
+    HashMap<String, EntityProperty> properties = new HashMap<String, EntityProperty>();
+    for (Entry<String, ByteIterator> entry : values.entrySet()) {
+      String fieldName = entry.getKey();
+      byte[] fieldVal = entry.getValue().toArray();
+      properties.put(fieldName, new EntityProperty(fieldVal));
+    }
+    DynamicTableEntity entity = new DynamicTableEntity(partitionKey, key, properties);
+    BATCH_OPERATION.insertOrReplace(entity);    
+    if (++curIdx == batchSize) {
+      try {
+        cloudTable.execute(BATCH_OPERATION);
+        BATCH_OPERATION.clear();
+        curIdx = 0;
+      } catch (Exception e) {
+        return Status.ERROR;
+      }
+    }
+    return Status.OK;
+  }
+  
+  private Status insertOrUpdate(String key, HashMap<String, ByteIterator> values) {
+    HashMap<String, EntityProperty> properties = new HashMap<String, EntityProperty>();
+    for (Entry<String, ByteIterator> entry : values.entrySet()) {
+      String fieldName = entry.getKey();
+      byte[] fieldVal = entry.getValue().toArray();
+      properties.put(fieldName, new EntityProperty(fieldVal));
+    }
+    DynamicTableEntity entity = new DynamicTableEntity(partitionKey, key, properties);
+    TableOperation insertOrReplace = TableOperation.insertOrReplace(entity);
+    try {
+      cloudTable.execute(insertOrReplace);
+      return Status.OK;
+    } catch (Exception e) {
+      return Status.ERROR;
+    }
+  }
+  
+}
diff --git a/azuretablestorage/src/main/java/com/yahoo/ycsb/db/azuretablestorage/package-info.java b/azuretablestorage/src/main/java/com/yahoo/ycsb/db/azuretablestorage/package-info.java
new file mode 100644
index 0000000000000000000000000000000000000000..ea7b7490dc7ba59dd5d1ec85df67de67ce50ffa4
--- /dev/null
+++ b/azuretablestorage/src/main/java/com/yahoo/ycsb/db/azuretablestorage/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2016 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+
+/**
+ * The YCSB binding for <a href="https://azure.microsoft.com/en-us/services/storage/">Azure table Storage</a>.
+ */
+package com.yahoo.ycsb.db.azuretablestorage;
+
diff --git a/bin/bindings.properties b/bin/bindings.properties
index da5dac7a53769fd0a792291e0fdfbc4c6cfb030e..9afdc436aa35c28c7b1a53ba21a059fafd44b86d 100644
--- a/bin/bindings.properties
+++ b/bin/bindings.properties
@@ -29,6 +29,7 @@ accumulo:com.yahoo.ycsb.db.accumulo.AccumuloClient
 aerospike:com.yahoo.ycsb.db.AerospikeClient
 asynchbase:com.yahoo.ycsb.db.AsyncHBaseClient
 arangodb:com.yahoo.ycsb.db.ArangoDBClient
+azuretablestorage:com.yahoo.ycsb.db.azuretablestorage.AzureClient
 basic:com.yahoo.ycsb.BasicDB
 cassandra-cql:com.yahoo.ycsb.db.CassandraCQLClient
 cassandra2-cql:com.yahoo.ycsb.db.CassandraCQLClient
diff --git a/pom.xml b/pom.xml
index 5357142baa9a05ac99eb1939bafece357a005fff..dc8c245c94092e9fe7cd14809baa85b80f73466d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -96,6 +96,7 @@ LICENSE file.
     <aerospike.version>3.1.2</aerospike.version>
     <solr.version>5.4.0</solr.version>
     <arangodb.version>2.7.3</arangodb.version>
+    <azurestorage.version>4.0.0</azurestorage.version>
   </properties>
 
   <modules>
@@ -107,6 +108,7 @@ LICENSE file.
     <module>aerospike</module>
     <module>arangodb</module>
     <module>asynchbase</module>
+    <module>azuretablestorage</module>
     <module>cassandra</module>
     <module>couchbase</module>
     <module>couchbase2</module>