From d4b1d247a31cf9fbf42b2e324b5b1cc27fdd9565 Mon Sep 17 00:00:00 2001
From: Misha Brukman <>
Date: Fri, 27 Nov 2015 20:09:01 +0000
Subject: [PATCH] [memcached] Added memcached binding.

The memcached support was extracted from PR #98 by @jbellis, with cleanups to
bring it in line with current APIs and style guide.

This PR also addresses issue #326.
 NOTICE.txt                                    |   4 +
 bin/ycsb                                      |   1 +
 distribution/pom.xml                          |   5 +
 memcached/                           |  97 ++++++
 memcached/conf/           |  52 ++++
 memcached/pom.xml                             |  99 ++++++
 .../com/yahoo/ycsb/db/    | 294 ++++++++++++++++++
 .../java/com/yahoo/ycsb/db/  |  21 ++
 pom.xml                                       |   1 +
 9 files changed, 574 insertions(+)
 create mode 100644 memcached/
 create mode 100644 memcached/conf/
 create mode 100644 memcached/pom.xml
 create mode 100644 memcached/src/main/java/com/yahoo/ycsb/db/
 create mode 100644 memcached/src/main/java/com/yahoo/ycsb/db/

diff --git a/NOTICE.txt b/NOTICE.txt
index e516aff2..cd1f104f 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -7,3 +7,7 @@ in this case for the YCSB project.
    This product includes software developed by
    Yahoo! Inc. (
    Copyright (c) 2010 Yahoo! Inc.  All rights reserved.
+   This product includes software developed by
+   Google Inc. (
+   Copyright (c) 2015 Google Inc.  All rights reserved.
diff --git a/bin/ycsb b/bin/ycsb
index b839e91a..7f374406 100755
--- a/bin/ycsb
+++ b/bin/ycsb
@@ -66,6 +66,7 @@ DATABASES = {
     "jdbc"         : "",
     "kudu"         : "",
     "mapkeeper"    : "",
+    "memcached"    : "",
     "mongodb"      : "",
     "mongodb-async": "",
     "nosqldb"      : "",
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 998daa56..5cceaef1 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -114,6 +114,11 @@ LICENSE file.
+    <dependency>
+      <groupId></groupId>
+      <artifactId>memcached-binding</artifactId>
+      <version>${project.version}</version>
+    </dependency>
diff --git a/memcached/ b/memcached/
new file mode 100644
index 00000000..2126b2da
--- /dev/null
+++ b/memcached/
@@ -0,0 +1,97 @@
+Copyright (c) 2015 YCSB contributors. All rights reserved.
+Licensed under the Apache License, Version 2.0 (the "License"); you
+may not use this file except in compliance with the License. You
+may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+implied. See the License for the specific language governing
+permissions and limitations under the License. See accompanying
+LICENSE file.
+# YCSB Memcached binding
+This section describes how to run YCSB on memcached.
+## 1. Install and start memcached service on the host(s)
+Debian / Ubuntu:
+    sudo apt-get install memcached
+RedHat / CentOS:
+    sudo yum install memcached
+## 2. Install Java and Maven
+See step 2 in [`../mongodb/`](../mongodb/
+## 3. Set up YCSB
+Git clone YCSB and compile:
+    git clone
+    cd YCSB
+    mvn -pl -am clean package
+## 4. Load data and run tests
+Load the data:
+    ./bin/ycsb load memcached -s -P workloads/workloada > outputLoad.txt
+Run the workload test:
+    ./bin/ycsb run memcached -s -P workloads/workloada > outputRun.txt
+## 5. memcached Connection Parameters
+A sample configuration is provided in
+### Required params
+- `memcached.hosts`
+  This is a comma-separated list of hosts providing the memcached interface.
+  You can use IPs or hostnames. The port is optional and defaults to the
+  memcached standard port of `11211` if not specified.
+### Optional params
+- `memcached.shutdownTimeoutMillis`
+  Shutdown timeout in milliseconds.
+- `memcached.objectExpirationTime`
+  Object expiration time for memcached; defaults to `Integer.MAX_VALUE`.
+- `memcached.checkOperationStatus`
+  Whether to verify the success of each operation; defaults to true.
+- `memcached.readBufferSize`
+  Read buffer size, in bytes.
+- `memcached.opTimeoutMillis`
+  Operation timeout, in milliseconds.
+- `memcached.failureMode`
+  What to do with failures; this is one of `net.spy.memcached.FailureMode` enum
+  values, which are currently: `Redistribute`, `Retry`, or `Cancel`.
+You can set properties on the command line via `-p`, e.g.:
+    ./bin/ycsb load memcached -s -P workloads/workloada \
+        -p "memcached.hosts=" > outputLoad.txt
diff --git a/memcached/conf/ b/memcached/conf/
new file mode 100644
index 00000000..e65f2fa7
--- /dev/null
+++ b/memcached/conf/
@@ -0,0 +1,52 @@
+# Copyright (c) 2015 YCSB contributors. All rights reserved.
+# Licensed under the Apache License, Version 2.0 (the "License"); you
+# may not use this file except in compliance with the License. You
+# may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# implied. See the License for the specific language governing
+# permissions and limitations under the License. See accompanying
+# LICENSE file.
+# Sample property file for Memcached Client
+## Mandatory parameters
+# A comma-separated list of memcached server endpoints, each being an IP or
+# hostname with an optional port; the port defaults to the memcached-standard
+# port of 11211 if not specified.
+# memcached.hosts =
+## Optional parameters
+# Shutdown timeout in milliseconds.
+# memcached.shutdownTimeoutMillis = 30000
+# Object expiration time for memcached; defaults to `Integer.MAX_VALUE`.
+# memcached.objectExpirationTime = 2147483647
+# Whether to verify the success of each operation; defaults to true.
+# memcached.checkOperationStatus = true
+# Read buffer size, in bytes.
+# memcached.readBufferSize = 3000000
+# Operation timeout, in milliseconds.
+# memcached.opTimeoutMillis = 60000
+# What to do with failures; this is one of `net.spy.memcached.FailureMode` enum
+# values, which are currently: `Redistribute`, `Retry`, or `Cancel`.
+# memcached.failureMode = Redistribute
diff --git a/memcached/pom.xml b/memcached/pom.xml
new file mode 100644
index 00000000..7f6044a3
--- /dev/null
+++ b/memcached/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+Copyright (c) 2014-2015 YCSB contributors. All rights reserved.
+Licensed under the Apache License, Version 2.0 (the "License"); you
+may not use this file except in compliance with the License. You
+may obtain a copy of the License at
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+implied. See the License for the specific language governing
+permissions and limitations under the License. See accompanying
+LICENSE file.
+<project xmlns="" xmlns:xsi="" xsi:schemaLocation="">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId></groupId>
+    <artifactId>binding-parent</artifactId>
+    <version>0.6.0-SNAPSHOT</version>
+    <relativePath>../binding-parent</relativePath>
+  </parent>
+  <artifactId>memcached-binding</artifactId>
+  <name>memcached binding</name>
+  <groupId></groupId>
+  <packaging>jar</packaging>
+  <dependencies>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>1.2.17</version>
+    </dependency>
+    <dependency>
+      <groupId></groupId>
+      <artifactId>core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+      <version>1.9.13</version>
+    </dependency>
+    <dependency>
+      <groupId>net.spy</groupId>
+      <artifactId>spymemcached</artifactId>
+      <version>2.11.4</version>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>2.15</version>
+        <configuration>
+          <consoleOutput>true</consoleOutput>
+          <configLocation>../checkstyle.xml</configLocation>
+          <failOnViolation>true</failOnViolation>
+          <failsOnError>true</failsOnError>
+        </configuration>
+        <executions>
+          <execution>
+            <id>validate</id>
+            <phase>validate</phase>
+            <goals>
+              <goal>checkstyle</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>${maven.assembly.version}</version>
+        <configuration>
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+          <appendAssemblyId>false</appendAssemblyId>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
diff --git a/memcached/src/main/java/com/yahoo/ycsb/db/ b/memcached/src/main/java/com/yahoo/ycsb/db/
new file mode 100644
index 00000000..8a95d8fb
--- /dev/null
+++ b/memcached/src/main/java/com/yahoo/ycsb/db/
@@ -0,0 +1,294 @@
+ * Copyright (c) 2014-2015 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Vector;
+import net.spy.memcached.ConnectionFactoryBuilder;
+import net.spy.memcached.FailureMode;
+// We also use `net.spy.memcached.MemcachedClient`; it is not imported
+// explicitly and referred to with its full path to avoid conflicts with the
+// class of the same name in this file.
+import net.spy.memcached.internal.GetFuture;
+import net.spy.memcached.internal.OperationFuture;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.ObjectNode;
+import org.apache.log4j.Logger;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+ * Concrete Memcached client implementation.
+ */
+public class MemcachedClient extends DB {
+  private final Logger logger = Logger.getLogger(getClass());
+  protected static final ObjectMapper MAPPER = new ObjectMapper();
+  private boolean checkOperationStatus;
+  private long shutdownTimeoutMillis;
+  private int objectExpirationTime;
+  public static final String HOSTS_PROPERTY = "memcached.hosts";
+  public static final int DEFAULT_PORT = 11211;
+  private static final String TEMPORARY_FAILURE_MSG = "Temporary failure";
+  private static final String CANCELLED_MSG = "cancelled";
+  public static final String SHUTDOWN_TIMEOUT_MILLIS_PROPERTY =
+      "memcached.shutdownTimeoutMillis";
+  public static final String DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = "30000";
+  public static final String OBJECT_EXPIRATION_TIME_PROPERTY =
+      "memcached.objectExpirationTime";
+  public static final String DEFAULT_OBJECT_EXPIRATION_TIME =
+      String.valueOf(Integer.MAX_VALUE);
+  public static final String CHECK_OPERATION_STATUS_PROPERTY =
+      "memcached.checkOperationStatus";
+  public static final String CHECK_OPERATION_STATUS_DEFAULT = "true";
+  public static final String READ_BUFFER_SIZE_PROPERTY =
+      "memcached.readBufferSize";
+  public static final String DEFAULT_READ_BUFFER_SIZE = "3000000";
+  public static final String OP_TIMEOUT_PROPERTY = "memcached.opTimeoutMillis";
+  public static final String DEFAULT_OP_TIMEOUT = "60000";
+  public static final String FAILURE_MODE_PROPERTY = "memcached.failureMode";
+  public static final FailureMode FAILURE_MODE_PROPERTY_DEFAULT =
+      FailureMode.Redistribute;
+  /**
+   * The MemcachedClient implementation that will be used to communicate
+   * with the memcached server.
+   */
+  private net.spy.memcached.MemcachedClient client;
+  /**
+   * @returns Underlying Memcached protocol client, implemented by
+   *     SpyMemcached.
+   */
+  protected net.spy.memcached.MemcachedClient memcachedClient() {
+    return client;
+  }
+  @Override
+  public void init() throws DBException {
+    try {
+      client = createMemcachedClient();
+      checkOperationStatus = Boolean.parseBoolean(
+          getProperties().getProperty(CHECK_OPERATION_STATUS_PROPERTY,
+                                      CHECK_OPERATION_STATUS_DEFAULT));
+      objectExpirationTime = Integer.parseInt(
+          getProperties().getProperty(OBJECT_EXPIRATION_TIME_PROPERTY,
+                                      DEFAULT_OBJECT_EXPIRATION_TIME));
+      shutdownTimeoutMillis = Integer.parseInt(
+          getProperties().getProperty(SHUTDOWN_TIMEOUT_MILLIS_PROPERTY,
+                                      DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
+    } catch (Exception e) {
+      throw new DBException(e);
+    }
+  }
+  protected net.spy.memcached.MemcachedClient createMemcachedClient()
+      throws Exception {
+    ConnectionFactoryBuilder connectionFactoryBuilder =
+        new ConnectionFactoryBuilder();
+    connectionFactoryBuilder.setReadBufferSize(Integer.parseInt(
+        getProperties().getProperty(READ_BUFFER_SIZE_PROPERTY,
+                                    DEFAULT_READ_BUFFER_SIZE)));
+    connectionFactoryBuilder.setOpTimeout(Integer.parseInt(
+        getProperties().getProperty(OP_TIMEOUT_PROPERTY, DEFAULT_OP_TIMEOUT)));
+    String failureString = getProperties().getProperty(FAILURE_MODE_PROPERTY);
+    connectionFactoryBuilder.setFailureMode(
+        failureString == null ? FAILURE_MODE_PROPERTY_DEFAULT
+                              : FailureMode.valueOf(failureString));
+    // Note: this only works with IPv4 addresses due to its assumption of
+    // ":" being the separator of hostname/IP and port; this is not the case
+    // when dealing with IPv6 addresses.
+    //
+    // TODO(mbrukman): fix this.
+    List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
+    String[] hosts = getProperties().getProperty(HOSTS_PROPERTY).split(",");
+    for (String address : hosts) {
+      int colon = address.indexOf(":");
+      int port = DEFAULT_PORT;
+      String host = address;
+      if (colon != -1) {
+        port = Integer.parseInt(address.substring(colon + 1));
+        host = address.substring(0, colon);
+      }
+      addresses.add(new InetSocketAddress(host, port));
+    }
+    return new net.spy.memcached.MemcachedClient(
+, addresses);
+  }
+  @Override
+  public Status read(
+      String table, String key, Set<String> fields,
+      HashMap<String, ByteIterator> result) {
+    key = createQualifiedKey(table, key);
+    try {
+      GetFuture<Object> future = memcachedClient().asyncGet(key);
+      Object document = future.get();
+      if (document != null) {
+        fromJson((String) document, fields, result);
+      }
+      return Status.OK;
+    } catch (Exception e) {
+      logger.error("Error encountered for key: " + key, e);
+      return Status.ERROR;
+    }
+  }
+  @Override
+  public Status scan(
+      String table, String startkey, int recordcount, Set<String> fields,
+      Vector<HashMap<String, ByteIterator>> result){
+    return Status.NOT_IMPLEMENTED;
+  }
+  @Override
+  public Status update(
+      String table, String key, HashMap<String, ByteIterator> values) {
+    key = createQualifiedKey(table, key);
+    try {
+      OperationFuture<Boolean> future =
+          memcachedClient().replace(key, objectExpirationTime, toJson(values));
+      return getReturnCode(future);
+    } catch (Exception e) {
+      logger.error("Error updating value with key: " + key, e);
+      return Status.ERROR;
+    }
+  }
+  @Override
+  public Status insert(
+      String table, String key, HashMap<String, ByteIterator> values) {
+    key = createQualifiedKey(table, key);
+    try {
+      OperationFuture<Boolean> future =
+          memcachedClient().add(key, objectExpirationTime, toJson(values));
+      return getReturnCode(future);
+    } catch (Exception e) {
+      logger.error("Error inserting value", e);
+      return Status.ERROR;
+    }
+  }
+  @Override
+  public Status delete(String table, String key) {
+    key = createQualifiedKey(table, key);
+    try {
+      OperationFuture<Boolean> future = memcachedClient().delete(key);
+      return getReturnCode(future);
+    } catch (Exception e) {
+      logger.error("Error deleting value", e);
+      return Status.ERROR;
+    }
+  }
+  protected Status getReturnCode(OperationFuture<Boolean> future) {
+    if (!checkOperationStatus) {
+      return Status.OK;
+    }
+    if (future.getStatus().isSuccess()) {
+      return Status.OK;
+    } else if (TEMPORARY_FAILURE_MSG.equals(future.getStatus().getMessage())) {
+    } else if (CANCELLED_MSG.equals(future.getStatus().getMessage())) {
+      return new Status("CANCELLED_MSG", CANCELLED_MSG);
+    }
+    return new Status("ERROR", future.getStatus().getMessage());
+  }
+  @Override
+  public void cleanup() throws DBException {
+    if (client != null) {
+      memcachedClient().shutdown(shutdownTimeoutMillis, MILLISECONDS);
+    }
+  }
+  protected static String createQualifiedKey(String table, String key) {
+    return MessageFormat.format("{0}-{1}", table, key);
+  }
+  protected static void fromJson(
+      String value, Set<String> fields,
+      Map<String, ByteIterator> result) throws IOException {
+    JsonNode json = MAPPER.readTree(value);
+    boolean checkFields = fields != null && fields.size() > 0;
+    for (Iterator<Map.Entry<String, JsonNode>> jsonFields = json.getFields();
+         jsonFields.hasNext();
+         /* increment in loop body */) {
+      Map.Entry<String, JsonNode> jsonField =;
+      String name = jsonField.getKey();
+      if (checkFields && fields.contains(name)) {
+        continue;
+      }
+      JsonNode jsonValue = jsonField.getValue();
+      if (jsonValue != null && !jsonValue.isNull()) {
+        result.put(name, new StringByteIterator(jsonValue.asText()));
+      }
+    }
+  }
+  protected static String toJson(Map<String, ByteIterator> values)
+      throws IOException {
+    ObjectNode node = MAPPER.createObjectNode();
+    HashMap<String, String> stringMap = StringByteIterator.getStringMap(values);
+    for (Map.Entry<String, String> pair : stringMap.entrySet()) {
+      node.put(pair.getKey(), pair.getValue());
+    }
+    JsonFactory jsonFactory = new JsonFactory();
+    Writer writer = new StringWriter();
+    JsonGenerator jsonGenerator = jsonFactory.createJsonGenerator(writer);
+    MAPPER.writeTree(jsonGenerator, node);
+    return writer.toString();
+  }
diff --git a/memcached/src/main/java/com/yahoo/ycsb/db/ b/memcached/src/main/java/com/yahoo/ycsb/db/
new file mode 100644
index 00000000..27dbc34c
--- /dev/null
+++ b/memcached/src/main/java/com/yahoo/ycsb/db/
@@ -0,0 +1,21 @@
+ * Copyright (c) 2015 YCSB contributors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License. See accompanying
+ * LICENSE file.
+ */
+ * YCSB binding for memcached.
+ */
diff --git a/pom.xml b/pom.xml
index b42d651d..84ec41a8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -118,6 +118,7 @@ LICENSE file.
+    <module>memcached</module>