Skip to content
Snippets Groups Projects
Commit 563925ed authored by Adam Retter's avatar Adam Retter Committed by Sean Busbey
Browse files

[rocksdb] Added support for RocksDB Java API (#1052)

parent d2d9a3fd
No related branches found
No related tags found
No related merge requests found
......@@ -68,6 +68,7 @@ rados:com.yahoo.ycsb.db.RadosClient
redis:com.yahoo.ycsb.db.RedisClient
rest:com.yahoo.ycsb.webservice.rest.RestClient
riak:com.yahoo.ycsb.db.riak.RiakKVClient
rocksdb:com.yahoo.ycsb.db.rocksdb.RocksDBClient
s3:com.yahoo.ycsb.db.S3Client
solr:com.yahoo.ycsb.db.solr.SolrClient
solr6:com.yahoo.ycsb.db.solr6.SolrClient
......
......@@ -95,6 +95,7 @@ DATABASES = {
"redis" : "com.yahoo.ycsb.db.RedisClient",
"rest" : "com.yahoo.ycsb.webservice.rest.RestClient",
"riak" : "com.yahoo.ycsb.db.riak.RiakKVClient",
"rocksdb" : "com.yahoo.ycsb.db.rocksdb.RocksDBClient",
"s3" : "com.yahoo.ycsb.db.S3Client",
"solr" : "com.yahoo.ycsb.db.solr.SolrClient",
"solr6" : "com.yahoo.ycsb.db.solr6.SolrClient",
......
......@@ -229,6 +229,11 @@ LICENSE file.
<artifactId>riak-binding</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>rocksdb-binding</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>s3-binding</artifactId>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2012 - 2017 YCSB contributors. All rights reserved.
Copyright (c) 2012 - 2018 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
......@@ -99,6 +99,7 @@ LICENSE file.
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<redis.version>2.9.0</redis.version>
<riak.version>2.0.5</riak.version>
<rocksdb.version>5.11.3</rocksdb.version>
<s3.version>1.10.20</s3.version>
<solr.version>5.5.3</solr.version>
<solr6.version>6.4.1</solr6.version>
......@@ -152,6 +153,7 @@ LICENSE file.
<module>redis</module>
<module>rest</module>
<module>riak</module>
<module>rocksdb</module>
<module>s3</module>
<module>solr</module>
<module>solr6</module>
......
<!--
Copyright (c) 2012 - 2018 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
## Quick Start
This section describes how to run YCSB on RocksDB running locally (within the same JVM).
NOTE: RocksDB is an embedded database and so articles like [How to run in parallel](https://github.com/brianfrankcooper/YCSB/wiki/Running-a-Workload-in-Parallel) are not applicable here.
### 1. Set Up YCSB
Clone the YCSB git repository and compile:
git clone https://github.com/brianfrankcooper/YCSB.git
cd YCSB
mvn clean package
### 2. Run YCSB
Now you are ready to run! First, load the data:
./bin/ycsb load rocksdb -s -P workloads/workloada -p rocksdb.dir=/tmp/ycsb-rocksdb-data
Then, run the workload:
./bin/ycsb run rocksdb -s -P workloads/workloada -p rocksdb.dir=/tmp/ycsb-rocksdb-data
## RocksDB Configuration Parameters
* ```rocksdb.dir``` - (required) A path to a folder to hold the RocksDB data files.
* EX. ```/tmp/ycsb-rocksdb-data```
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2017 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>binding-parent</artifactId>
<version>0.15.0-SNAPSHOT</version>
<relativePath>../binding-parent</relativePath>
</parent>
<artifactId>rocksdb-binding</artifactId>
<name>RocksDB Java Binding</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>${rocksdb.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
/*
* Copyright (c) 2018 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db.rocksdb;
import com.yahoo.ycsb.*;
import com.yahoo.ycsb.Status;
import net.jcip.annotations.GuardedBy;
import org.rocksdb.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
* RocksDB binding for <a href="http://rocksdb.org/">RocksDB</a>.
*
* See {@code rocksdb/README.md} for details.
*/
public class RocksDBClient extends DB {
static final String PROPERTY_ROCKSDB_DIR = "rocksdb.dir";
private static final String COLUMN_FAMILY_NAMES_FILENAME = "CF_NAMES";
private static final Logger LOGGER = LoggerFactory.getLogger(RocksDBClient.class);
@GuardedBy("RocksDBClient.class") private static Path rocksDbDir = null;
@GuardedBy("RocksDBClient.class") private static RocksObject dbOptions = null;
@GuardedBy("RocksDBClient.class") private static RocksDB rocksDb = null;
@GuardedBy("RocksDBClient.class") private static int references = 0;
private static final ConcurrentMap<String, ColumnFamily> COLUMN_FAMILIES = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, Lock> COLUMN_FAMILY_LOCKS = new ConcurrentHashMap<>();
@Override
public void init() throws DBException {
synchronized(RocksDBClient.class) {
if(rocksDb == null) {
rocksDbDir = Paths.get(getProperties().getProperty(PROPERTY_ROCKSDB_DIR));
LOGGER.info("RocksDB data dir: " + rocksDbDir);
try {
rocksDb = initRocksDB();
} catch (final IOException | RocksDBException e) {
throw new DBException(e);
}
}
references++;
}
}
/**
* Initializes and opens the RocksDB database.
*
* Should only be called with a {@code synchronized(RocksDBClient.class)` block}.
*
* @return The initialized and open RocksDB instance.
*/
private RocksDB initRocksDB() throws IOException, RocksDBException {
if(!Files.exists(rocksDbDir)) {
Files.createDirectories(rocksDbDir);
}
final List<String> cfNames = loadColumnFamilyNames();
final List<ColumnFamilyOptions> cfOptionss = new ArrayList<>();
final List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
for(final String cfName : cfNames) {
final ColumnFamilyOptions cfOptions = new ColumnFamilyOptions()
.optimizeLevelStyleCompaction();
final ColumnFamilyDescriptor cfDescriptor = new ColumnFamilyDescriptor(
cfName.getBytes(UTF_8),
cfOptions
);
cfOptionss.add(cfOptions);
cfDescriptors.add(cfDescriptor);
}
final int rocksThreads = Runtime.getRuntime().availableProcessors() * 2;
if(cfDescriptors.isEmpty()) {
final Options options = new Options()
.optimizeLevelStyleCompaction()
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true)
.setIncreaseParallelism(rocksThreads)
.setMaxBackgroundCompactions(rocksThreads)
.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
dbOptions = options;
return RocksDB.open(options, rocksDbDir.toAbsolutePath().toString());
} else {
final DBOptions options = new DBOptions()
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true)
.setIncreaseParallelism(rocksThreads)
.setMaxBackgroundCompactions(rocksThreads)
.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
dbOptions = options;
final List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
final RocksDB db = RocksDB.open(options, rocksDbDir.toAbsolutePath().toString(), cfDescriptors, cfHandles);
for(int i = 0; i < cfNames.size(); i++) {
COLUMN_FAMILIES.put(cfNames.get(i), new ColumnFamily(cfHandles.get(i), cfOptionss.get(i)));
}
return db;
}
}
@Override
public void cleanup() throws DBException {
super.cleanup();
synchronized (RocksDBClient.class) {
try {
if (references == 1) {
for (final ColumnFamily cf : COLUMN_FAMILIES.values()) {
cf.getHandle().close();
}
rocksDb.close();
rocksDb = null;
dbOptions.close();
dbOptions = null;
for (final ColumnFamily cf : COLUMN_FAMILIES.values()) {
cf.getOptions().close();
}
saveColumnFamilyNames();
COLUMN_FAMILIES.clear();
rocksDbDir = null;
}
} catch (final IOException e) {
throw new DBException(e);
} finally {
references--;
}
}
}
@Override
public Status read(final String table, final String key, final Set<String> fields,
final Map<String, ByteIterator> result) {
try {
if (!COLUMN_FAMILIES.containsKey(table)) {
createColumnFamily(table);
}
final ColumnFamilyHandle cf = COLUMN_FAMILIES.get(table).getHandle();
final byte[] values = rocksDb.get(cf, key.getBytes(UTF_8));
if(values == null) {
return Status.NOT_FOUND;
}
deserializeValues(values, fields, result);
return Status.OK;
} catch(final RocksDBException e) {
LOGGER.error(e.getMessage(), e);
return Status.ERROR;
}
}
@Override
public Status scan(final String table, final String startkey, final int recordcount, final Set<String> fields,
final Vector<HashMap<String, ByteIterator>> result) {
try {
if (!COLUMN_FAMILIES.containsKey(table)) {
createColumnFamily(table);
}
final ColumnFamilyHandle cf = COLUMN_FAMILIES.get(table).getHandle();
try(final RocksIterator iterator = rocksDb.newIterator(cf)) {
int iterations = 0;
for (iterator.seek(startkey.getBytes(UTF_8)); iterator.isValid() && iterations < recordcount;
iterator.next()) {
final HashMap<String, ByteIterator> values = new HashMap<>();
deserializeValues(iterator.value(), fields, values);
result.add(values);
iterations++;
}
}
return Status.OK;
} catch(final RocksDBException e) {
LOGGER.error(e.getMessage(), e);
return Status.ERROR;
}
}
@Override
public Status update(final String table, final String key, final Map<String, ByteIterator> values) {
//TODO(AR) consider if this would be faster with merge operator
try {
if (!COLUMN_FAMILIES.containsKey(table)) {
createColumnFamily(table);
}
final ColumnFamilyHandle cf = COLUMN_FAMILIES.get(table).getHandle();
final Map<String, ByteIterator> result = new HashMap<>();
final byte[] currentValues = rocksDb.get(cf, key.getBytes(UTF_8));
if(currentValues == null) {
return Status.NOT_FOUND;
}
deserializeValues(currentValues, null, result);
//update
result.putAll(values);
//store
rocksDb.put(cf, key.getBytes(UTF_8), serializeValues(result));
return Status.OK;
} catch(final RocksDBException | IOException e) {
LOGGER.error(e.getMessage(), e);
return Status.ERROR;
}
}
@Override
public Status insert(final String table, final String key, final Map<String, ByteIterator> values) {
try {
if (!COLUMN_FAMILIES.containsKey(table)) {
createColumnFamily(table);
}
final ColumnFamilyHandle cf = COLUMN_FAMILIES.get(table).getHandle();
rocksDb.put(cf, key.getBytes(UTF_8), serializeValues(values));
return Status.OK;
} catch(final RocksDBException | IOException e) {
LOGGER.error(e.getMessage(), e);
return Status.ERROR;
}
}
@Override
public Status delete(final String table, final String key) {
try {
if (!COLUMN_FAMILIES.containsKey(table)) {
createColumnFamily(table);
}
final ColumnFamilyHandle cf = COLUMN_FAMILIES.get(table).getHandle();
rocksDb.delete(cf, key.getBytes(UTF_8));
return Status.OK;
} catch(final RocksDBException e) {
LOGGER.error(e.getMessage(), e);
return Status.ERROR;
}
}
private void saveColumnFamilyNames() throws IOException {
final Path file = rocksDbDir.resolve(COLUMN_FAMILY_NAMES_FILENAME);
try(final PrintWriter writer = new PrintWriter(Files.newBufferedWriter(file, UTF_8))) {
writer.println(new String(RocksDB.DEFAULT_COLUMN_FAMILY, UTF_8));
for(final String cfName : COLUMN_FAMILIES.keySet()) {
writer.println(cfName);
}
}
}
private List<String> loadColumnFamilyNames() throws IOException {
final List<String> cfNames = new ArrayList<>();
final Path file = rocksDbDir.resolve(COLUMN_FAMILY_NAMES_FILENAME);
if(Files.exists(file)) {
try (final LineNumberReader reader =
new LineNumberReader(Files.newBufferedReader(file, UTF_8))) {
String line = null;
while ((line = reader.readLine()) != null) {
cfNames.add(line);
}
}
}
return cfNames;
}
private Map<String, ByteIterator> deserializeValues(final byte[] values, final Set<String> fields,
final Map<String, ByteIterator> result) {
final ByteBuffer buf = ByteBuffer.allocate(4);
int offset = 0;
while(offset < values.length) {
buf.put(values, offset, 4);
buf.flip();
final int keyLen = buf.getInt();
buf.clear();
offset += 4;
final String key = new String(values, offset, keyLen);
offset += keyLen;
buf.put(values, offset, 4);
buf.flip();
final int valueLen = buf.getInt();
buf.clear();
offset += 4;
if(fields == null || fields.contains(key)) {
result.put(key, new ByteArrayByteIterator(values, offset, valueLen));
}
offset += valueLen;
}
return result;
}
private byte[] serializeValues(final Map<String, ByteIterator> values) throws IOException {
try(final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
final ByteBuffer buf = ByteBuffer.allocate(4);
for(final Map.Entry<String, ByteIterator> value : values.entrySet()) {
final byte[] keyBytes = value.getKey().getBytes(UTF_8);
final byte[] valueBytes = value.getValue().toArray();
buf.putInt(keyBytes.length);
baos.write(buf.array());
baos.write(keyBytes);
buf.clear();
buf.putInt(valueBytes.length);
baos.write(buf.array());
baos.write(valueBytes);
buf.clear();
}
return baos.toByteArray();
}
}
private void createColumnFamily(final String name) throws RocksDBException {
COLUMN_FAMILY_LOCKS.putIfAbsent(name, new ReentrantLock());
final Lock l = COLUMN_FAMILY_LOCKS.get(name);
l.lock();
try {
if(!COLUMN_FAMILIES.containsKey(name)) {
final ColumnFamilyOptions cfOptions = new ColumnFamilyOptions().optimizeLevelStyleCompaction();
final ColumnFamilyHandle cfHandle = rocksDb.createColumnFamily(
new ColumnFamilyDescriptor(name.getBytes(UTF_8), cfOptions)
);
COLUMN_FAMILIES.put(name, new ColumnFamily(cfHandle, cfOptions));
}
} finally {
l.unlock();
}
}
private static final class ColumnFamily {
private final ColumnFamilyHandle handle;
private final ColumnFamilyOptions options;
private ColumnFamily(final ColumnFamilyHandle handle, final ColumnFamilyOptions options) {
this.handle = handle;
this.options = options;
}
public ColumnFamilyHandle getHandle() {
return handle;
}
public ColumnFamilyOptions getOptions() {
return options;
}
}
}
/*
* Copyright (c) 2018 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
/**
* The RocksDB Java binding for <a href="http://rocksdb.org/">RocksDB</a>.
*/
package com.yahoo.ycsb.db.rocksdb;
/*
* Copyright (c) 2018 YCSB contributors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/
package com.yahoo.ycsb.db.rocksdb;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.Status;
import com.yahoo.ycsb.StringByteIterator;
import org.junit.*;
import org.junit.rules.TemporaryFolder;
import java.util.*;
import static org.junit.Assert.assertEquals;
public class RocksDBClientTest {
@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();
private static final String MOCK_TABLE = "ycsb";
private static final String MOCK_KEY0 = "0";
private static final String MOCK_KEY1 = "1";
private static final String MOCK_KEY2 = "2";
private static final String MOCK_KEY3 = "3";
private static final int NUM_RECORDS = 10;
private static final Map<String, ByteIterator> MOCK_DATA;
static {
MOCK_DATA = new HashMap<>(NUM_RECORDS);
for (int i = 0; i < NUM_RECORDS; i++) {
MOCK_DATA.put("field" + i, new StringByteIterator("value" + i));
}
}
private RocksDBClient instance;
@Before
public void setup() throws Exception {
instance = new RocksDBClient();
final Properties properties = new Properties();
properties.setProperty(RocksDBClient.PROPERTY_ROCKSDB_DIR, tmpFolder.getRoot().getAbsolutePath());
instance.setProperties(properties);
instance.init();
}
@After
public void tearDown() throws Exception {
instance.cleanup();
}
@Test
public void insertAndRead() throws Exception {
final Status insertResult = instance.insert(MOCK_TABLE, MOCK_KEY0, MOCK_DATA);
assertEquals(Status.OK, insertResult);
final Set<String> fields = MOCK_DATA.keySet();
final Map<String, ByteIterator> resultParam = new HashMap<>(NUM_RECORDS);
final Status readResult = instance.read(MOCK_TABLE, MOCK_KEY0, fields, resultParam);
assertEquals(Status.OK, readResult);
}
@Test
public void insertAndDelete() throws Exception {
final Status insertResult = instance.insert(MOCK_TABLE, MOCK_KEY1, MOCK_DATA);
assertEquals(Status.OK, insertResult);
final Status result = instance.delete(MOCK_TABLE, MOCK_KEY1);
assertEquals(Status.OK, result);
}
@Test
public void insertUpdateAndRead() throws Exception {
final Map<String, ByteIterator> newValues = new HashMap<>(NUM_RECORDS);
final Status insertResult = instance.insert(MOCK_TABLE, MOCK_KEY2, MOCK_DATA);
assertEquals(Status.OK, insertResult);
for (int i = 0; i < NUM_RECORDS; i++) {
newValues.put("field" + i, new StringByteIterator("newvalue" + i));
}
final Status result = instance.update(MOCK_TABLE, MOCK_KEY2, newValues);
assertEquals(Status.OK, result);
//validate that the values changed
final Map<String, ByteIterator> resultParam = new HashMap<>(NUM_RECORDS);
instance.read(MOCK_TABLE, MOCK_KEY2, MOCK_DATA.keySet(), resultParam);
for (int i = 0; i < NUM_RECORDS; i++) {
assertEquals("newvalue" + i, resultParam.get("field" + i).toString());
}
}
@Test
public void insertAndScan() throws Exception {
final Status insertResult = instance.insert(MOCK_TABLE, MOCK_KEY3, MOCK_DATA);
assertEquals(Status.OK, insertResult);
final Set<String> fields = MOCK_DATA.keySet();
final Vector<HashMap<String, ByteIterator>> resultParam = new Vector<>(NUM_RECORDS);
final Status result = instance.scan(MOCK_TABLE, MOCK_KEY3, NUM_RECORDS, fields, resultParam);
assertEquals(Status.OK, result);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment