VIVO-719 Create a method on RDFService for a streaming SELECT query.

By passing an OutputStream to the method, we don't buffer the entire response into memory.
This commit is contained in:
Jim Blake 2014-06-05 15:57:01 -04:00
parent 0cd42e211e
commit 2ea6a5d8cb
7 changed files with 127 additions and 35 deletions

View file

@ -3,6 +3,7 @@
package edu.cornell.mannlib.vitro.webapp.rdfservice;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
/**
@ -86,6 +87,21 @@ public interface RDFService {
*/
public InputStream sparqlDescribeQuery(String query, RDFService.ModelSerializationFormat resultFormat) throws RDFServiceException;
/**
* Performs a SPARQL select query against the knowledge base. The query may have
* an embedded graph identifier. If the query does not contain a graph identifier
* the query is executed against the union of all named and unnamed graphs in the
* store.
*
* Preferred for streaming because it avoids in-memory buffering.
*
* @param query - the SPARQL query to be executed against the RDF store
* @param resultFormat - format for the result of the Select query
* @param outputStream - receives the result of the query
*
*/
public void sparqlSelectQuery(String query, RDFService.ResultFormat resultFormat, OutputStream outputStream) throws RDFServiceException;
/**
* Performs a SPARQL select query against the knowledge base. The query may have
* an embedded graph identifier. If the query does not contain a graph identifier

View file

@ -4,15 +4,16 @@ package edu.cornell.mannlib.vitro.webapp.rdfservice.filter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -156,6 +157,20 @@ public class LanguageFilteringRDFService implements RDFService {
return langStrings.toString();
}
/**
* TODO rewrite the filtering to use this form - avoid one level of
* buffering.
*/
@Override
public void sparqlSelectQuery(String query, ResultFormat resultFormat,
OutputStream outputStream) throws RDFServiceException {
try (InputStream input = sparqlSelectQuery(query, resultFormat)){
IOUtils.copy(input, outputStream);
} catch (IOException e) {
throw new RDFServiceException(e);
}
}
@Override
public InputStream sparqlSelectQuery(String query,
ResultFormat resultFormat) throws RDFServiceException {

View file

@ -4,11 +4,14 @@ package edu.cornell.mannlib.vitro.webapp.rdfservice.filter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -104,6 +107,20 @@ public class SameAsFilteringRDFServiceFactory implements RDFServiceFactory {
return new ByteArrayInputStream(out.toByteArray());
}
/**
* TODO rewrite the filtering to use this form instead - avoid one level of
* buffering.
*/
@Override
public void sparqlSelectQuery(String query, ResultFormat resultFormat,
OutputStream outputStream) throws RDFServiceException {
try (InputStream input = sparqlSelectQuery(query, resultFormat)) {
IOUtils.copy(input, outputStream);
} catch (IOException e) {
throw new RDFServiceException(e);
}
}
@Override
public InputStream sparqlSelectQuery(String query, ResultFormat resultFormat)
throws RDFServiceException {

View file

@ -3,6 +3,7 @@
package edu.cornell.mannlib.vitro.webapp.rdfservice.impl;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
@ -85,6 +86,12 @@ public class RDFServiceFactorySingle implements RDFServiceFactory {
return s.sparqlDescribeQuery(query, resultFormat);
}
@Override
public void sparqlSelectQuery(String query, ResultFormat resultFormat,
OutputStream outputStream) throws RDFServiceException {
s.sparqlSelectQuery(query, resultFormat, outputStream);
}
@Override
public InputStream sparqlSelectQuery(String query,
ResultFormat resultFormat) throws RDFServiceException {

View file

@ -6,13 +6,13 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.lf5.util.StreamUtils;
@ -33,6 +33,7 @@ import com.hp.hpl.jena.rdf.model.RDFNode;
import com.hp.hpl.jena.rdf.model.Resource;
import com.hp.hpl.jena.rdf.model.Statement;
import com.hp.hpl.jena.rdf.model.StmtIterator;
import com.hp.hpl.jena.sdb.SDB;
import com.hp.hpl.jena.shared.Lock;
import edu.cornell.mannlib.vitro.webapp.dao.jena.DatasetWrapper;
@ -432,41 +433,54 @@ public abstract class RDFServiceJena extends RDFServiceImpl implements RDFServic
return getRDFResultStream(query, DESCRIBE, resultFormat);
}
@Override
public void sparqlSelectQuery(String query, ResultFormat resultFormat,
OutputStream outputStream) throws RDFServiceException {
DatasetWrapper dw = getDatasetWrapper();
try {
Dataset d = dw.getDataset();
Query q = createQuery(query);
QueryExecution qe = createQueryExecution(query, q, d);
// These properties only help for SDB, but shouldn't hurt for TDB.
qe.getContext().set(SDB.jdbcFetchSize, Integer.MIN_VALUE);
qe.getContext().set(SDB.jdbcStream, true);
qe.getContext().set(SDB.streamGraphAPI, true);
try {
ResultSet resultSet = qe.execSelect();
switch (resultFormat) {
case CSV:
ResultSetFormatter.outputAsCSV(outputStream, resultSet);
break;
case TEXT:
ResultSetFormatter.out(outputStream, resultSet);
break;
case JSON:
ResultSetFormatter.outputAsJSON(outputStream, resultSet);
break;
case XML:
ResultSetFormatter.outputAsXML(outputStream, resultSet);
break;
default:
throw new RDFServiceException("unrecognized result format");
}
} finally {
qe.close();
}
} finally {
dw.close();
}
}
/**
* TODO Is there a way to accomplish this without buffering the entire result?
*/
@Override
public InputStream sparqlSelectQuery(String query, ResultFormat resultFormat)
throws RDFServiceException {
DatasetWrapper dw = getDatasetWrapper();
try {
Dataset d = dw.getDataset();
Query q = createQuery(query);
QueryExecution qe = createQueryExecution(query, q, d);
try {
ResultSet resultSet = qe.execSelect();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
switch (resultFormat) {
case CSV:
ResultSetFormatter.outputAsCSV(outputStream,resultSet);
break;
case TEXT:
ResultSetFormatter.out(outputStream,resultSet);
break;
case JSON:
ResultSetFormatter.outputAsJSON(outputStream, resultSet);
break;
case XML:
ResultSetFormatter.outputAsXML(outputStream, resultSet);
break;
default:
throw new RDFServiceException("unrecognized result format");
}
InputStream result = new ByteArrayInputStream(outputStream.toByteArray());
return result;
} finally {
qe.close();
}
} finally {
dw.close();
}
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
sparqlSelectQuery(query, resultFormat, outputStream);
return new ByteArrayInputStream(outputStream.toByteArray());
}
@Override

View file

@ -3,6 +3,7 @@
package edu.cornell.mannlib.vitro.webapp.rdfservice.impl.logging;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
@ -51,6 +52,13 @@ public class LoggingRDFService implements RDFService {
}
}
@Override
public void sparqlSelectQuery(String query, ResultFormat resultFormat,
OutputStream outputStream) throws RDFServiceException {
innerService.sparqlSelectQuery(query, resultFormat,
outputStream);
}
@Override
public InputStream sparqlSelectQuery(String query, ResultFormat resultFormat)
throws RDFServiceException {

View file

@ -6,6 +6,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringWriter;
import java.net.URISyntaxException;
import java.util.ArrayList;
@ -262,6 +263,20 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
return result;
}
/**
* TODO rewrite the query to use this form instead - avoid one level of
* buffering.
*/
@Override
public void sparqlSelectQuery(String query, ResultFormat resultFormat,
OutputStream outputStream) throws RDFServiceException {
try (InputStream input = sparqlSelectQuery(query, resultFormat)) {
IOUtils.copy(input, outputStream);
} catch (IOException e) {
throw new RDFServiceException(e);
}
}
/**
* Performs a SPARQL select query against the knowledge base. The query may have
* an embedded graph identifier.