diff --git a/bin/ycsb b/bin/ycsb
index ea1f653011d2fd28750a2be77815d2b79b8cf60e..cdb809b9aad3fbe0f530cecc8529e061e88d5c59 100755
--- a/bin/ycsb
+++ b/bin/ycsb
@@ -51,6 +51,7 @@ DATABASES = {
     "nosqldb"      : "com.yahoo.ycsb.db.NoSqlDbClient",
     "orientdb"     : "com.yahoo.ycsb.db.OrientDBClient",
     "redis"        : "com.yahoo.ycsb.db.RedisClient",
+    "s3"           : "com.yahoo.ycsb.db.S3Client",
     "tarantool"    : "com.yahoo.ycsb.db.TarantoolClient",
     "voldemort"    : "com.yahoo.ycsb.db.VoldemortClient"
 }
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 254199700dbb092976c7724321ed93f512fb5b03..e8ebd7f983b1349bfc2562e6f2e7de721b3da7cb 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -92,6 +92,11 @@
       <artifactId>redis-binding</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.yahoo.ycsb</groupId>
+      <artifactId>s3-binding</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>com.yahoo.ycsb</groupId>
       <artifactId>tarantool-binding</artifactId>
diff --git a/pom.xml b/pom.xml
index 89570cf29f5e5e78bbd091aae35dda399af8c99a..60d19a8ceccce9916944f17f8b9223829ddbd614 100644
--- a/pom.xml
+++ b/pom.xml
@@ -64,6 +64,7 @@
     <mongodb.async.version>2.0.1</mongodb.async.version>
     <orientdb.version>1.0.1</orientdb.version>
     <redis.version>2.0.0</redis.version>
+    <s3.version>1.10.6</s3.version>
     <voldemort.version>0.81</voldemort.version>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <thrift.version>0.8.0</thrift.version>
@@ -95,6 +96,7 @@
     <!--module>nosqldb</module-->
     <module>orientdb</module>
     <module>redis</module>
+    <module>s3</module>
     <module>tarantool</module>
     <!--<module>voldemort</module>-->
   </modules>
diff --git a/s3/README.md b/s3/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..3385a7b48927ef6ea723d1e678d01fb8748fcc71
--- /dev/null
+++ b/s3/README.md
@@ -0,0 +1,41 @@
+Quick Start
+===============
+### 1. Set Up YCSB
+
+Clone the YCSB git repository and compile:
+
+    git clone git://github.com/brianfrankcooper/YCSB.git
+    cd YCSB
+    mvn clean package
+
+### 2. Run YCSB
+
+To execute the benchmark using the S3 storage binding, first files must be uploaded using the "load" option with this command:
+
+    ./bin/ycsb load s3 -p table=theBucket -p s3.endPoint=s3.amazonaws.com -p s3.accessKeyId=yourAccessKeyId -p s3.secretKey=yourSecretKey -p recordcount=10 -p operationcount=10 -p workload=com.yahoo.ycsb.workloads.CoreWorkload -p readallfields=true -p readproportion=0.5 -p updateproportion=0.5 -p scanproportion=0 -p insertproportion=0 -p readmodifywriteproportion=0  -p fieldlength=10 -p fieldcount=20 -threads 10 -p requestdistribution=zipfian
+
+With this command, 10 files will be uploaded. The file size is determined by the number of fields (fieldcount) and by the field size (fieldlength). In this case each file is 200 bytes (10 bytes for each field multiplied by 20 fields). With this command 10 threads will be used.
+
+Running the command:
+
+       ./bin/ycsb load s3 -p table=theBucket -p s3.endPoint=s3.amazonaws.com -p s3.accessKeyId=yourAccessKeyId -p s3.secretKey=yourSecretKey -p recordcount=10 -p operationcount=10 -p workload=com.yahoo.ycsb.workloads.CoreWorkload -p readallfields=true -p readproportion=0.5 -p updateproportion=0.5 -p scanproportion=0 -p insertproportion=0 -p readmodifywriteproportion=0  -p fieldlength=10 -p fieldcount=20 -threads 10 -p requestdistribution=zipfian
+
+the workload A will be executed with 50/50 updates/reads. 
+
+#### S3 Storage Configuration Parameters
+- `s3.table`
+  - This should be a S3 Storage bucket. 
+ 
+- `s3.endpoint`
+  - This indicate the endpoint used to connect to the S3 Storage service.
+  - Default value is `s3.amazonaws.com`.
+
+- `s3.region`
+  - This indicate the region where your buckets are.
+  - Default value is `us-east-1`.
+ 
+- `s3.accessKeyId`
+  - This is the accessKey of your S3 account.
+ 
+- `s3.secretKey`
+  - This is the secret associated with your S3 account.
diff --git a/s3/pom.xml b/s3/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..4af3802779482e067c01ca26af1a3ebd5a3695aa
--- /dev/null
+++ b/s3/pom.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>com.yahoo.ycsb</groupId>
+        <artifactId>binding-parent</artifactId>
+        <version>0.4.0-SNAPSHOT</version>
+        <relativePath>../binding-parent</relativePath>
+    </parent>
+  
+  <artifactId>s3-binding</artifactId>
+  <name>S3 Storage Binding</name>
+  <packaging>jar</packaging>
+
+  <dependencies>
+    <dependency>
+	<groupId>com.amazonaws</groupId>
+	<artifactId>aws-java-sdk-s3</artifactId>
+	<version>${s3.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.yahoo.ycsb</groupId>
+            <artifactId>core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+    </dependency>
+   </dependencies>
+</project>
diff --git a/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java b/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java
new file mode 100644
index 0000000000000000000000000000000000000000..3a4ce9fd768bc3beb48b43381b8881a920e7faa8
--- /dev/null
+++ b/s3/src/main/java/com/yahoo/ycsb/db/S3Client.java
@@ -0,0 +1,305 @@
+/**
+ * S3 storage client binding for YCSB.
+ *
+ * Submitted by Ivan Baldinotti on 14/07/2015
+ *
+ */
+package com.yahoo.ycsb.db;
+
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Vector;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.*;
+import java.io.IOException;
+
+import com.yahoo.ycsb.ByteArrayByteIterator;
+import com.yahoo.ycsb.ByteIterator;
+import com.yahoo.ycsb.DB;
+import com.yahoo.ycsb.DBException;
+
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.*;
+import com.amazonaws.auth.*;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.s3.model.DeleteObjectRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+/**
+ * S3 Storage client for YCSB framework.
+ * 
+ * Properties to set:
+ * 
+ * s3.accessKeyId=access key S3 aws
+ * s3.secretKey=secret key S3 aws
+ * s3.endPoint=s3.amazonaws.com
+ * s3.region=us-east-1
+ *
+ * @author ivanB1975
+ */
+public class S3Client extends DB {
+ 
+	private static String key;
+    	private static String bucket;
+	private static String accessKeyId;
+	private static String secretKey;
+	private static String endPoint;
+	private static String region;
+	private static BasicAWSCredentials s3Credentials;
+	private static AmazonS3Client s3Client;
+	private static ClientConfiguration clientConfig;
+
+	/**
+	* Cleanup any state for this storage.
+	* Called once per S3 instance; there is one S3 instance per client thread.
+	*/
+	@Override
+	public void cleanup() throws DBException {
+		try {
+		//this.s3Client.shutdown(); //this should not be used
+		} catch (Exception e){
+			e.printStackTrace();
+		}
+    	}
+	/**
+	* Delete a file from S3 Storage.
+	* 
+	* @param bucket
+	*            The name of the bucket
+	* @param key
+	*            The record key of the file to delete.
+	* @return Zero on success, a non-zero error code on error. See the
+	*         {@link DB} class's description for a discussion of error codes.
+	*/
+	@Override
+    	public int delete(String bucket, String key) {
+		try {
+			this.s3Client.deleteObject(new DeleteObjectRequest(bucket, key));
+		} catch (Exception e){
+			e.printStackTrace();
+			return 1;
+		}
+     		return 0;
+    	}
+	/**
+	* Initialize any state for the storage.
+	* Called once per S3 instance; there is one S3 instance per client thread.
+	*/
+	@Override
+	public void init() throws DBException {
+       
+		Properties props = getProperties();
+		accessKeyId = props.getProperty("s3.accessKeyId","accessKeyId");
+		secretKey = props.getProperty("s3.secretKey","secretKey");
+		endPoint = props.getProperty("s3.endPoint","s3.amazonaws.com");
+		region = props.getProperty("s3.region","us-east-1");
+		System.out.println("Inizializing the S3 connection");
+		s3Credentials = new BasicAWSCredentials(accessKeyId,secretKey);
+		clientConfig = new ClientConfiguration();
+		clientConfig.setMaxErrorRetry(15);
+		try {
+		s3Client = new AmazonS3Client(s3Credentials,clientConfig);
+		s3Client.setRegion(Region.getRegion(Regions.fromName(region)));
+		s3Client.setEndpoint(endPoint);
+		System.out.println("Connection successfully initialized");
+		} catch (Exception e){
+			System.err.println("Could not connect to S3 storage because: "+ e.toString());
+			e.printStackTrace();
+			return;
+		}
+    	}
+	/**
+	* Create a new File in the Bucket. Any field/value pairs in the specified
+	* values HashMap will be written into the file with the specified record
+	* key.
+	* 
+	* @param bucket
+	*            The name of the bucket
+	* @param key
+	*            The record key of the file to insert.
+	* @param values
+	*            A HashMap of field/value pairs to insert in the file
+	* @return Zero on success, a non-zero error code on error. See the
+	*         {@link DB} class's description for a discussion of error codes.
+	*/    
+	@Override
+	public int insert(String bucket, String key, HashMap<String, ByteIterator> values) {
+		return writeToStorage(bucket,key,values,0);
+	}
+  	
+	/**
+	* Read a file from the Bucket. Each field/value pair from the result
+	* will be stored in a HashMap.
+	* 
+	* @param bucket
+     	*            The name of the bucket
+	* @param key
+	*            The record key of the file to read.
+	* @param fields
+	*            The list of fields to read, or null for all of them, it is null by default
+	* @param result
+	*            A HashMap of field/value pairs for the result
+	* @return Zero on success, a non-zero error code on error or "not found".
+	*/
+	@Override
+	public int read(String bucket, String key, Set<String> fields,HashMap<String, ByteIterator> result) {
+		return readFromStorage(bucket,key,result);	
+	}
+	/**
+	* Update a file in the database. Any field/value pairs in the specified
+	* values HashMap will be written into the file with the specified file
+	* key, overwriting any existing values with the same field name.
+	* 
+	* @param bucket
+	*            The name of the bucket
+	* @param key
+	*            The file key of the file to write.
+	* @param values
+	*            A HashMap of field/value pairs to update in the record
+	* @return Zero on success, a non-zero error code on error. See this class's
+	*         description for a discussion of error codes.
+	*/
+	@Override
+	public int update(String bucket, String key,HashMap<String, ByteIterator> values) {
+		return writeToStorage(bucket,key,values,1);
+	}
+	/**
+	* Perform a range scan for a set of files in the bucket. Each
+	* field/value pair from the result will be stored in a HashMap.
+	* 
+	* @param bucket
+	*            The name of the bucket
+	* @param startkey
+	*            The file key of the first file to read.
+	* @param recordcount
+	*            The number of files to read
+	* @param fields
+	*            The list of fields to read, or null for all of them
+	* @param result
+	*            A Vector of HashMaps, where each HashMap is a set field/value
+	*            pairs for one file
+	* @return Zero on success, a non-zero error code on error. See the
+	*         {@link DB} class's description for a discussion of error codes.
+	*/
+	@Override
+	public int scan(String bucket, String startkey, int recordcount, Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
+		return scanFromStorage(bucket,startkey,recordcount,result);
+	}
+
+	protected int writeToStorage(String bucket,String key,HashMap<String, ByteIterator> values, int updateMarker) {
+		int totalSize = 0;
+		int fieldCount = values.size(); //number of fields to concatenate
+		Object keyToSearch = values.keySet().toArray()[0]; // getting the first field in the values
+		byte[] sourceArray = values.get(keyToSearch).toArray(); // getting the content of just one field
+		int sizeArray = sourceArray.length; //size of each array
+		if (updateMarker == 0){
+			totalSize = sizeArray*fieldCount;
+		} else {
+			try {
+				S3Object object = this.s3Client.getObject(new GetObjectRequest(bucket, key));
+				ObjectMetadata objectMetadata = this.s3Client.getObjectMetadata(bucket, key);
+				int sizeOfFile = (int)objectMetadata.getContentLength();
+				fieldCount = sizeOfFile/sizeArray;
+				totalSize = sizeOfFile;
+			} catch (Exception e){
+				e.printStackTrace();
+            			return 1;
+			}
+		}
+		
+		byte[] destinationArray = new byte[totalSize];
+		int offset = 0;
+    		for (int i = 0; i < fieldCount; i++) {
+        		System.arraycopy(sourceArray, 0, destinationArray, offset, sizeArray);
+        		offset += sizeArray;
+    		}
+		InputStream input = new ByteArrayInputStream(destinationArray);
+		ObjectMetadata metadata = new ObjectMetadata();
+		metadata.setContentLength(totalSize);
+		try {
+			PutObjectResult res = this.s3Client.putObject(bucket,key,input,metadata);
+        	} catch (Exception e) {
+            		e.printStackTrace();
+            		return 1;
+        	} finally {
+	    		try {
+        			input.close();
+    			} catch (Exception e) {
+        			e.printStackTrace();
+            			return 1;
+    			}
+		return 0;
+        	}
+	}
+
+
+	protected int readFromStorage(String bucket,String key, HashMap<String, ByteIterator> result) {
+		try {
+			S3Object object = this.s3Client.getObject(new GetObjectRequest(bucket, key));
+			ObjectMetadata objectMetadata = this.s3Client.getObjectMetadata(bucket, key);
+			InputStream objectData = object.getObjectContent(); // consuming the stream
+			// writing the stream to bytes and to results
+			int sizeOfFile = (int)objectMetadata.getContentLength();
+			byte[] inputStreamToByte = new byte[sizeOfFile];
+    			objectData.read(inputStreamToByte,0,sizeOfFile); // reading the stream to bytes
+			result.put(key,new ByteArrayByteIterator(inputStreamToByte));
+			objectData.close();
+		} catch (Exception e){
+			e.printStackTrace();
+            		return 1;
+		} finally {
+            		return 0;
+        	}
+   	}
+
+	protected int scanFromStorage(String bucket,String startkey, int recordcount, Vector<HashMap<String, ByteIterator>> result) {
+
+		int counter = 0;	
+		ObjectListing listing = s3Client.listObjects(bucket);
+		List<S3ObjectSummary> summaries = listing.getObjectSummaries();
+		List<String> keyList = new ArrayList();
+		int startkeyNumber = 0;
+		int numberOfIteration = 0;
+		// getting the list of files in the bucket
+		while (listing.isTruncated()) {
+	   		listing = s3Client.listNextBatchOfObjects(listing);
+	   		summaries.addAll (listing.getObjectSummaries());
+		}
+		for (S3ObjectSummary summary : summaries) {
+		    String summaryKey = summary.getKey();               
+		    keyList.add(summaryKey);
+		}
+		// Sorting the list of files in Alphabetical order
+		Collections.sort(keyList); // sorting the list
+		// Getting the position of the startingfile for the scan
+		for (String key : keyList) {
+			if (key.equals(startkey)){
+				startkeyNumber = counter;
+		    	} else {
+				counter = counter + 1;
+		    	}
+		}
+		// Checking if the total number of file is bigger than the file to read, if not using the total number of Files
+		if (recordcount < keyList.size()) {
+			numberOfIteration = recordcount;
+		} else {
+			numberOfIteration = keyList.size();
+		}
+		// Reading the Files starting from the startkey File till the end of the Files or Till the recordcount number
+		for (int i = startkeyNumber; i < numberOfIteration; i++){
+			HashMap<String, ByteIterator> resultTemp = new HashMap<String, ByteIterator>();
+			readFromStorage(bucket,keyList.get(i),resultTemp);
+			result.add(resultTemp);	
+		}
+		return 0;
+	}
+}