[VIVO-1031] Add methods to process construct / select queries without buffering / translating through streams.

This commit is contained in:
grahamtriggs 2015-10-17 22:03:12 +01:00
parent db0bb6a5e6
commit 39b0d2e6bd
10 changed files with 454 additions and 10 deletions

View file

@ -2,6 +2,8 @@
package edu.cornell.mannlib.vitro.webapp.rdfservice;
import com.hp.hpl.jena.rdf.model.Model;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
@ -72,7 +74,18 @@ public interface RDFService {
*
*/
public InputStream sparqlConstructQuery(String query, RDFService.ModelSerializationFormat resultFormat) throws RDFServiceException;
/**
* Performs a SPARQL construct query against the knowledge base. The query may have
* an embedded graph identifier. If the query does not contain a graph identifier
* the query is executed against the union of all named and unnamed graphs in the
* store.
*
* @param query - the SPARQL query to be executed against the RDF store
* @param model - the Model to add the statements to
*/
public void sparqlConstructQuery(String query, Model model) throws RDFServiceException;
/**
* Performs a SPARQL describe query against the knowledge base. The query may have
* an embedded graph identifier. If the query does not contain a graph identifier
@ -100,7 +113,18 @@ public interface RDFService {
*
*/
public InputStream sparqlSelectQuery(String query, RDFService.ResultFormat resultFormat) throws RDFServiceException;
/**
* Performs a SPARQL select query against the knowledge base. The query may have
* an embedded graph identifier. If the query does not contain a graph identifier
* the query is executed against the union of all named and unnamed graphs in the
* store.
*
* @param query - the SPARQL query to be executed against the RDF store
* @param consumer - the class to consume the results of the query
*/
public void sparqlSelectQuery(String query, ResultSetConsumer consumer) throws RDFServiceException;
/**
* Performs a SPARQL ASK query against the knowledge base. The query may have
* an embedded graph identifier. If the query does not contain a graph identifier

View file

@ -0,0 +1,144 @@
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
package edu.cornell.mannlib.vitro.webapp.rdfservice;
import com.hp.hpl.jena.query.QuerySolution;
import com.hp.hpl.jena.query.ResultSet;
import java.util.List;
/**
* Base class for creating consumers of ResultSets
*
* processQuerySolution MUST be overridden - it takes each QuerySolution in turn until the ResultSet is complete
*
* startProcessing and endProcessing may be overridden if the implementation needs to know when the processing starts,
* or when there are no more solutions left to process.
*/
public abstract class ResultSetConsumer {
private ResultSet resultSet;
public ResultSetConsumer() {
}
/**
* Method for processing each QuerySolution - must be overridden in each implementation
*
* @param qs - the current query solution
*/
protected abstract void processQuerySolution(QuerySolution qs);
/**
* Method to notify the consumer that a ResultSet is valid and is about to be processed
*/
protected void startProcessing() {
}
/**
* Method to notify the consumer that all QuerySolutions have been processed
*/
protected void endProcessing() {
}
/**
* Helper method that ensures the start / end processing is done correctly
*
* @param rs - the ResultSet to process
*/
public void processResultSet(ResultSet rs) {
if (rs != null) {
resultSet = rs;
startProcessing();
while (rs.hasNext()) {
processQuerySolution(rs.next());
}
endProcessing();
}
}
/**
* Helper method to allow an implementation to get the var names from the resultset
*
* @return list of result set var names
*/
final protected List<String> getResultVars() {
return resultSet.getResultVars();
}
/**
* Helper implemenation of ResutSetConsumer that can be used to wrap another ResultSetConsumer
* - useful for filtering implementations
*/
public static abstract class Chaining extends ResultSetConsumer {
private ResultSetConsumer innerConsumer;
protected Chaining(ResultSetConsumer innerConsumer) {
this.innerConsumer = innerConsumer;
}
protected void startProcessing() {
chainStartProcessing();
}
protected void endProcessing() {
chainEndProcessing();
}
/**
* Helper method that calls the processQuerySolution on an embedded ResultSetConsumer
* @param qs
*/
protected void chainProcessQuerySolution(QuerySolution qs) {
if (innerConsumer != null) {
innerConsumer.processQuerySolution(qs);
}
}
/**
* Helper method that calls startProcessing on an embedded ResultSetConsumer
*/
protected void chainStartProcessing() {
if (innerConsumer != null)
innerConsumer.startProcessing();
}
/**
* Helper method that calls endProcessing on an embedded ResultSetConsumer
*/
protected void chainEndProcessing() {
if (innerConsumer != null) {
innerConsumer.endProcessing();
}
}
}
/**
* Helper implementation that allows you to find out simply if there were any results in the ResultSet
*/
public static class HasResult extends ResultSetConsumer {
private boolean hasResult = false;
/**
* Override the helper method for processing results to quickly check if there are any results
* @param rs - the ResultSet to process
*/
@Override
public void processResultSet(ResultSet rs) {
hasResult = (rs != null && rs.hasNext());
}
@Override
protected void processQuerySolution(QuerySolution qs) {
hasResult = true;
}
/**
* Were any results found
* @return
*/
public boolean hasResult() {
return hasResult;
}
}
}

View file

@ -12,6 +12,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -85,6 +86,12 @@ public class LanguageFilteringRDFService implements RDFService {
return in;
}
@Override
public void sparqlConstructQuery(String query, Model model)
throws RDFServiceException {
s.sparqlConstructQuery(query, model);
}
@Override
public InputStream sparqlDescribeQuery(String query,
ModelSerializationFormat resultFormat)
@ -237,7 +244,89 @@ public class LanguageFilteringRDFService implements RDFService {
}
return new ByteArrayInputStream(outputStream.toByteArray());
}
@Override
public void sparqlSelectQuery(String query, ResultSetConsumer consumer) throws RDFServiceException {
log.debug("sparqlSelectQuery: " + query.replaceAll("\\s+", " "));
s.sparqlSelectQuery(query, new ResultSetConsumer.Chaining(consumer) {
List<String> vars;
List<QuerySolution> solnList = new ArrayList<QuerySolution>();
@Override
protected void processQuerySolution(QuerySolution qs) {
solnList.add(qs);
}
@Override
protected void startProcessing() {
vars = getResultVars();
}
@Override
protected void endProcessing() {
chainStartProcessing();
Iterator<String> varIt = vars.iterator();
while (varIt.hasNext()) {
String var = varIt.next();
for (int i = 0 ; i < solnList.size(); i ++ ) {
QuerySolution s = solnList.get(i);
if (s == null) {
continue;
}
RDFNode node = s.get(var);
if (node == null || !node.isLiteral()) {
continue;
}
List<RowIndexedLiteral> candidatesForRemoval =
new ArrayList<RowIndexedLiteral>();
candidatesForRemoval.add(new RowIndexedLiteral(node.asLiteral(), i));
for (int j = i + 1; j < solnList.size(); j ++) {
QuerySolution t = solnList.get(j);
if (t == null) {
continue;
}
if (matchesExceptForVar(s, t, var, vars)) {
candidatesForRemoval.add(
new RowIndexedLiteral(t.getLiteral(var), j));
}
}
if (candidatesForRemoval.size() == 1) {
continue;
}
Collections.sort(candidatesForRemoval, new RowIndexedLiteralSortByLang());
log.debug("sorted RowIndexedLiterals: " + showSortedRILs(candidatesForRemoval));
Iterator<RowIndexedLiteral> candIt = candidatesForRemoval.iterator();
String langRegister = null;
boolean chuckRemaining = false;
while(candIt.hasNext()) {
RowIndexedLiteral rlit = candIt.next();
if (chuckRemaining) {
solnList.set(rlit.getIndex(), null);
} else if (langRegister == null) {
langRegister = rlit.getLiteral().getLanguage();
} else if (!langRegister.equals(rlit.getLiteral().getLanguage())) {
chuckRemaining = true;
solnList.set(rlit.getIndex(), null);
}
}
}
}
Iterator<QuerySolution> solIt = solnList.iterator();
while(solIt.hasNext()) {
QuerySolution soln = solIt.next();
if (soln != null) {
chainProcessQuerySolution(soln);
}
}
chainEndProcessing();
}
});
}
private String showSortedRILs(List<RowIndexedLiteral> candidatesForRemoval) {
List<String> langstrings = new ArrayList<String>();
for (RowIndexedLiteral ril: candidatesForRemoval) {

View file

@ -10,6 +10,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -104,8 +105,23 @@ public class SameAsFilteringRDFServiceFactory implements RDFServiceFactory {
resultFormat));
return new ByteArrayInputStream(out.toByteArray());
}
@Override
@Override
public void sparqlConstructQuery(String query, Model model)
throws RDFServiceException {
Model m = ModelFactory.createDefaultModel();
s.sparqlConstructQuery(query, m);
StmtIterator stmtIt = m.listStatements();
while (stmtIt.hasNext()) {
Statement stmt = stmtIt.nextStatement();
if (!isRedundant(stmt)) {
model.add(stmt);
}
}
}
@Override
public InputStream sparqlSelectQuery(String query, ResultFormat resultFormat)
throws RDFServiceException {
ResultSet rs = ResultSetFactory.load(
@ -138,7 +154,21 @@ public class SameAsFilteringRDFServiceFactory implements RDFServiceFactory {
}
return new ByteArrayInputStream(outputStream.toByteArray());
}
@Override
public void sparqlSelectQuery(String query, ResultSetConsumer consumer)
throws RDFServiceException {
s.sparqlSelectQuery(query, new ResultSetConsumer.Chaining(consumer) {
@Override
public void processQuerySolution(QuerySolution qs) {
if (!isRedundant(qs)) {
chainProcessQuerySolution(qs);
}
}
});
}
private boolean isRedundant(Statement s) {
List<Resource> sameAsResources = getSameAsResources(s.getSubject());
if (sameAsResources.size() > 0 && !sameAsResources.get(0).equals(s.getSubject())) {

View file

@ -6,11 +6,13 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import com.hp.hpl.jena.rdf.model.Model;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceFactory;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
import edu.cornell.mannlib.vitro.webapp.utils.logging.ToString;
/**
@ -80,6 +82,12 @@ public class RDFServiceFactorySingle implements RDFServiceFactory {
return s.sparqlConstructQuery(query, resultFormat);
}
@Override
public void sparqlConstructQuery(String query, Model model)
throws RDFServiceException {
s.sparqlConstructQuery(query, model);
}
@Override
public InputStream sparqlDescribeQuery(String query,
ModelSerializationFormat resultFormat)
@ -93,6 +101,11 @@ public class RDFServiceFactorySingle implements RDFServiceFactory {
return s.sparqlSelectQuery(query, resultFormat);
}
@Override
public void sparqlSelectQuery(String query, ResultSetConsumer consumer) throws RDFServiceException {
s.sparqlSelectQuery(query, consumer);
}
@Override
public boolean sparqlAskQuery(String query) throws RDFServiceException {
return s.sparqlAskQuery(query);

View file

@ -12,6 +12,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -420,6 +421,23 @@ public abstract class RDFServiceJena extends RDFServiceImpl implements RDFServic
}
}
private void getRDFModel(String query, boolean construct, Model model) throws RDFServiceException {
DatasetWrapper dw = getDatasetWrapper();
try {
Dataset d = dw.getDataset();
Query q = createQuery(query);
QueryExecution qe = createQueryExecution(query, q, d);
ByteArrayOutputStream serializedModel = new ByteArrayOutputStream();
try {
Model m = construct ? qe.execConstruct(model) : qe.execDescribe(model);
} finally {
qe.close();
}
} finally {
dw.close();
}
}
private static final boolean CONSTRUCT = true;
private static final boolean DESCRIBE = false;
@ -430,6 +448,10 @@ public abstract class RDFServiceJena extends RDFServiceImpl implements RDFServic
return getRDFResultStream(query, CONSTRUCT, resultFormat);
}
public void sparqlConstructQuery(String query, Model model) throws RDFServiceException {
getRDFModel(query, CONSTRUCT, model);
}
@Override
public InputStream sparqlDescribeQuery(String query,
ModelSerializationFormat resultFormat) throws RDFServiceException {
@ -476,6 +498,24 @@ public abstract class RDFServiceJena extends RDFServiceImpl implements RDFServic
}
}
@Override
public void sparqlSelectQuery(String query, ResultSetConsumer consumer)
throws RDFServiceException {
DatasetWrapper dw = getDatasetWrapper();
try {
Dataset d = dw.getDataset();
Query q = createQuery(query);
QueryExecution qe = createQueryExecution(query, q, d);
try {
consumer.processResultSet(qe.execSelect());
} finally {
qe.close();
}
} finally {
dw.close();
}
}
@Override
public boolean sparqlAskQuery(String query) throws RDFServiceException {
DatasetWrapper dw = getDatasetWrapper();

View file

@ -11,6 +11,8 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import com.hp.hpl.jena.rdf.model.Model;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -107,6 +109,16 @@ public class RDFServiceTDB extends RDFServiceJena {
}
}
@Override
public void sparqlConstructQuery(String query, Model model) throws RDFServiceException {
dataset.begin(ReadWrite.READ);
try {
super.sparqlConstructQuery(query, model);
} finally {
dataset.end();
}
}
@Override
public InputStream sparqlDescribeQuery(String query,
ModelSerializationFormat resultFormat) throws RDFServiceException {
@ -129,6 +141,17 @@ public class RDFServiceTDB extends RDFServiceJena {
}
}
@Override
public void sparqlSelectQuery(String query, ResultSetConsumer consumer)
throws RDFServiceException {
dataset.begin(ReadWrite.READ);
try {
super.sparqlSelectQuery(query, consumer);
} finally {
dataset.end();
}
}
@Override
public boolean sparqlAskQuery(String query) throws RDFServiceException {
dataset.begin(ReadWrite.READ);

View file

@ -6,10 +6,12 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import com.hp.hpl.jena.rdf.model.Model;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeListener;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
/**
* This RDFService wrapper adds instrumentation to the time-consuming methods of
@ -44,6 +46,13 @@ public class LoggingRDFService implements RDFService {
}
}
@Override
public void sparqlConstructQuery(String query, Model model) throws RDFServiceException {
try (RDFServiceLogger l = new RDFServiceLogger(query)) {
innerService.sparqlConstructQuery(query, model);
}
}
@Override
public InputStream sparqlDescribeQuery(String query,
ModelSerializationFormat resultFormat) throws RDFServiceException {
@ -60,6 +69,14 @@ public class LoggingRDFService implements RDFService {
}
}
@Override
public void sparqlSelectQuery(String query, ResultSetConsumer consumer)
throws RDFServiceException {
try (RDFServiceLogger l = new RDFServiceLogger(query)) {
innerService.sparqlSelectQuery(query, consumer);
}
}
@Override
public boolean sparqlAskQuery(String query) throws RDFServiceException {
try (RDFServiceLogger l = new RDFServiceLogger(query)) {

View file

@ -15,6 +15,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -237,7 +238,21 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
InputStream result = new ByteArrayInputStream(serializedModel.toByteArray());
return result;
}
public void sparqlConstructQuery(String queryStr, Model model) throws RDFServiceException {
Query query = createQuery(queryStr);
QueryExecution qe = QueryExecutionFactory.sparqlService(readEndpointURI, query);
try {
qe.execConstruct(model);
} catch (Exception e) {
log.error("Error executing CONSTRUCT against remote endpoint: " + queryStr);
} finally {
qe.close();
}
}
/**
* Performs a SPARQL describe query against the knowledge base. The query may have
* an embedded graph identifier.
@ -323,7 +338,32 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
throw new RuntimeException(e);
}
}
public void sparqlSelectQuery(String queryStr, ResultSetConsumer consumer) throws RDFServiceException {
//QueryEngineHTTP qh = new QueryEngineHTTP(readEndpointURI, queryStr);
try {
HttpGet meth = new HttpGet(new URIBuilder(readEndpointURI).addParameter("query", queryStr).build());
meth.addHeader("Accept", "application/sparql-results+xml");
HttpResponse response = httpClient.execute(meth);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode > 399) {
log.error("response " + statusCode + " to query. \n");
log.debug("update string: \n" + queryStr);
throw new RDFServiceException("Unable to perform SPARQL UPDATE");
}
try (InputStream in = response.getEntity().getContent()) {
consumer.processResultSet(ResultSetFactory.fromXML(in));
}
} catch (IOException ioe) {
throw new RuntimeException(ioe);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
/**
* Performs a SPARQL ASK query against the knowledge base. The query may have
* an embedded graph identifier.

View file

@ -16,6 +16,7 @@ import com.hp.hpl.jena.sparql.engine.http.QueryEngineHTTP;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException;
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceImpl;
/**
@ -72,7 +73,18 @@ public class RDFServiceSparqlHttp extends RDFServiceSparql {
InputStream result = new ByteArrayInputStream(serializedModel.toByteArray());
return result;
}
@Override
public void sparqlConstructQuery(String queryStr, Model model) throws RDFServiceException {
QueryEngineHTTP qeh = new QueryEngineHTTP( readEndpointURI, queryStr);
try {
qeh.execConstruct(model);
} finally {
qeh.close();
}
}
/**
* Performs a SPARQL describe query against the knowledge base. The query may have
* an embedded graph identifier.
@ -145,7 +157,19 @@ public class RDFServiceSparqlHttp extends RDFServiceSparql {
qeh.close();
}
}
@Override
public void sparqlSelectQuery(String queryStr, ResultSetConsumer consumer) throws RDFServiceException {
QueryEngineHTTP qeh = new QueryEngineHTTP( readEndpointURI, queryStr);
try {
consumer.processResultSet(qeh.execSelect());
} finally {
qeh.close();
}
}
/**
* Performs a SPARQL ASK query against the knowledge base. The query may have
* an embedded graph identifier.