Skip to content
Snippets Groups Projects
Commit 73e2b188 authored by Michael Nitschinger's avatar Michael Nitschinger
Browse files

[couchbase2] Refactor scan operation.

This changeset refactors the scan operation slightly, using a List
with a predefined size (since we know it), as well as using the
document ID directly for the response rather than doing full blown
JSON decoding which is not needed.
parent 5b50c794
No related branches found
No related tags found
No related merge requests found
......@@ -102,6 +102,7 @@ public class Couchbase2Client extends DB {
System.setProperty("com.couchbase.query.encodedPlanEnabled", "false");
}
private static final String SEPARATOR = ":";
private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(Couchbase2Client.class);
private static final Object INIT_COORDINATOR = new Object();
......@@ -125,6 +126,7 @@ public class Couchbase2Client extends DB {
private int boost;
private int networkMetricsInterval;
private int runtimeMetricsInterval;
private String scanAllQuery;
@Override
public void init() throws DBException {
......@@ -147,6 +149,7 @@ public class Couchbase2Client extends DB {
boost = Integer.parseInt(props.getProperty("couchbase.boost", "3"));
networkMetricsInterval = Integer.parseInt(props.getProperty("couchbase.networkMetricsInterval", "0"));
runtimeMetricsInterval = Integer.parseInt(props.getProperty("couchbase.runtimeMetricsInterval", "0"));
scanAllQuery = "SELECT meta().id as id FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2";
try {
synchronized (INIT_COORDINATOR) {
......@@ -603,18 +606,19 @@ public class Couchbase2Client extends DB {
*/
private Status scanAllFields(final String table, final String startkey, final int recordcount,
final Vector<HashMap<String, ByteIterator>> result) {
final String scanQuery = "SELECT meta().id as id FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2";
Collection<HashMap<String, ByteIterator>> documents = bucket.async()
final List<HashMap<String, ByteIterator>> data = new ArrayList<HashMap<String, ByteIterator>>(recordcount);
bucket.async()
.query(N1qlQuery.parameterized(
scanQuery,
JsonArray.from(formatId(table, startkey), recordcount),
N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism)
))
scanAllQuery,
JsonArray.from(formatId(table, startkey), recordcount),
N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism)
))
.doOnNext(new Action1<AsyncN1qlQueryResult>() {
@Override
public void call(AsyncN1qlQueryResult result) {
if (!result.parseSuccess()) {
throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanQuery
throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanAllQuery
+ ", Errors: " + result.errors());
}
}
......@@ -628,7 +632,11 @@ public class Couchbase2Client extends DB {
.flatMap(new Func1<AsyncN1qlQueryRow, Observable<RawJsonDocument>>() {
@Override
public Observable<RawJsonDocument> call(AsyncN1qlQueryRow row) {
return bucket.async().get(row.value().getString("id"), RawJsonDocument.class);
String id = new String(row.byteValue());
return bucket.async().get(
id.substring(id.indexOf(table + SEPARATOR), id.lastIndexOf('"')),
RawJsonDocument.class
);
}
})
.map(new Func1<RawJsonDocument, HashMap<String, ByteIterator>>() {
......@@ -639,11 +647,15 @@ public class Couchbase2Client extends DB {
return tuple;
}
})
.toList()
.toBlocking()
.single();
.forEach(new Action1<HashMap<String, ByteIterator>>() {
@Override
public void call(HashMap<String, ByteIterator> tuple) {
data.add(tuple);
}
});
result.addAll(documents);
result.addAll(data);
return Status.OK;
}
......@@ -659,15 +671,16 @@ public class Couchbase2Client extends DB {
*/
private Status scanSpecificFields(final String table, final String startkey, final int recordcount,
final Set<String> fields, final Vector<HashMap<String, ByteIterator>> result) {
String scanQuery = "SELECT " + joinFields(fields) + " FROM `" + bucketName + "` WHERE meta().id >= '$1' LIMIT $2";
String scanSpecQuery = "SELECT " + joinFields(fields) + " FROM `" + bucketName
+ "` WHERE meta().id >= '$1' LIMIT $2";
N1qlQueryResult queryResult = bucket.query(N1qlQuery.parameterized(
scanQuery,
scanSpecQuery,
JsonArray.from(formatId(table, startkey), recordcount),
N1qlParams.build().adhoc(adhoc).maxParallelism(maxParallelism)
));
if (!queryResult.parseSuccess() || !queryResult.finalSuccess()) {
throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanQuery
throw new RuntimeException("Error while parsing N1QL Result. Query: " + scanSpecQuery
+ ", Errors: " + queryResult.errors());
}
......@@ -780,7 +793,7 @@ public class Couchbase2Client extends DB {
* @return a document ID that can be used with Couchbase.
*/
private static String formatId(final String prefix, final String key) {
return prefix + ":" + key;
return prefix + SEPARATOR + key;
}
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment