From 863536d6ee96352f870c126804e6679028621c6d Mon Sep 17 00:00:00 2001 From: brianjlowe Date: Mon, 9 Jul 2012 19:14:03 +0000 Subject: [PATCH] merge r. 9953 from maint-rel-1.5 : NIHVIVO-3886 change to RDFServiceSparql to add blank node structures in appropriate chunks --- .../rdfservice/impl/RDFServiceImpl.java | 28 +++++++ .../impl/sparql/RDFServiceSparql.java | 73 ++++++++++++++----- 2 files changed, 82 insertions(+), 19 deletions(-) diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/RDFServiceImpl.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/RDFServiceImpl.java index ee1291ec4..af27d6222 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/RDFServiceImpl.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/RDFServiceImpl.java @@ -10,11 +10,14 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import com.hp.hpl.jena.graph.Graph; import com.hp.hpl.jena.graph.Node; import com.hp.hpl.jena.graph.Triple; import com.hp.hpl.jena.query.ResultSet; import com.hp.hpl.jena.rdf.model.Model; import com.hp.hpl.jena.rdf.model.ModelFactory; +import com.hp.hpl.jena.rdf.model.Statement; +import com.hp.hpl.jena.rdf.model.StmtIterator; import com.hp.hpl.jena.sparql.resultset.XMLInput; import com.hp.hpl.jena.vocabulary.RDF; @@ -229,4 +232,29 @@ public abstract class RDFServiceImpl implements RDFService { sbuff.append(c) ; } } + + /** + * Returns a pair of models. The first contains any statement containing at + * least one blank node. The second contains all remaining statements. + * @param g + * @return + */ + + protected Model[] separateStatementsWithBlankNodes(Model gm) { + Model blankNodeModel = ModelFactory.createDefaultModel(); + Model nonBlankNodeModel = ModelFactory.createDefaultModel(); + StmtIterator sit = gm.listStatements(); + while (sit.hasNext()) { + Statement stmt = sit.nextStatement(); + if (!stmt.getSubject().isAnon() && !stmt.getObject().isAnon()) { + nonBlankNodeModel.add(stmt); + } else { + blankNodeModel.add(stmt); + } + } + Model[] result = new Model[2]; + result[0] = blankNodeModel; + result[1] = nonBlankNodeModel; + return result; + } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/sparql/RDFServiceSparql.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/sparql/RDFServiceSparql.java index ab4697ace..153fce696 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/sparql/RDFServiceSparql.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/rdfservice/impl/sparql/RDFServiceSparql.java @@ -11,6 +11,7 @@ import java.util.Iterator; import java.util.List; import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; import org.apache.commons.httpclient.NameValuePair; import org.apache.commons.httpclient.methods.ByteArrayRequestEntity; import org.apache.commons.httpclient.methods.PostMethod; @@ -62,7 +63,11 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { private String updateEndpointURI; private HTTPRepository readRepository; private HTTPRepository updateRepository; + private HttpClient httpClient; private boolean useSesameContextQuery = true; + // the number of triples to be + private static final int CHUNK_SIZE = 1000; // added/removed in a single + // SPARQL UPDATE /** * Returns an RDFService for a remote repository @@ -82,6 +87,10 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { this.updateRepository = new HTTPRepository(updateEndpointURI); testConnection(); + + MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager(); + mgr.getParams().setDefaultMaxConnectionsPerHost(10); + this.httpClient = new HttpClient(mgr); } private void testConnection() { @@ -458,18 +467,21 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { protected void executeUpdate(String updateString) throws RDFServiceException { try { - HttpClient httpClient = new HttpClient(); PostMethod meth = new PostMethod(updateEndpointURI); - meth.addRequestHeader("Content-Type", "application/x-www-form-urlencoded"); - NameValuePair[] body = new NameValuePair[1]; - body[0] = new NameValuePair("update", updateString); - meth.setRequestBody(body); - int response = httpClient.executeMethod(meth); - if (response > 399) { - log.error("response " + response + " to update. \n"); - log.debug("update string: \n" + updateString); - throw new RDFServiceException("Unable to perform SPARQL UPDATE"); - } + try { + meth.addRequestHeader("Content-Type", "application/x-www-form-urlencoded"); + NameValuePair[] body = new NameValuePair[1]; + body[0] = new NameValuePair("update", updateString); + meth.setRequestBody(body); + int response = httpClient.executeMethod(meth); + if (response > 399) { + log.error("response " + response + " to update. \n"); + log.debug("update string: \n" + updateString); + throw new RDFServiceException("Unable to perform SPARQL UPDATE"); + } + } finally { + meth.releaseConnection(); + } } catch (Exception e) { throw new RDFServiceException("Unable to perform change set update", e); } @@ -485,14 +497,13 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { private void verbModel(Model model, String graphURI, String verb) throws RDFServiceException { Model m = ModelFactory.createDefaultModel(); - int testLimit = 1000; StmtIterator stmtIt = model.listStatements(); int count = 0; try { while (stmtIt.hasNext()) { count++; m.add(stmtIt.nextStatement()); - if (count % testLimit == 0 || !stmtIt.hasNext()) { + if (count % CHUNK_SIZE == 0 || !stmtIt.hasNext()) { StringWriter sw = new StringWriter(); m.write(sw, "N-TRIPLE"); StringBuffer updateStringBuff = new StringBuffer(); @@ -585,7 +596,9 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { private void performChange(ModelChange modelChange) throws RDFServiceException { Model model = parseModel(modelChange); if (modelChange.getOperation() == ModelChange.Operation.ADD) { - addModel(model, modelChange.getGraphURI()); + Model[] separatedModel = separateStatementsWithBlankNodes(model); + addModel(separatedModel[1], modelChange.getGraphURI()); + addBlankNodesWithSparqlUpdate(separatedModel[0], modelChange.getGraphURI()); } else if (modelChange.getOperation() == ModelChange.Operation.REMOVE) { deleteModel(model, modelChange.getGraphURI()); removeBlankNodesWithSparqlUpdate(model, modelChange.getGraphURI()); @@ -594,8 +607,21 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { } } + private void addBlankNodesWithSparqlUpdate(Model model, String graphURI) + throws RDFServiceException { + updateBlankNodesWithSparqlUpdate(model, graphURI, ADD); + } + private void removeBlankNodesWithSparqlUpdate(Model model, String graphURI) throws RDFServiceException { + updateBlankNodesWithSparqlUpdate(model, graphURI, REMOVE); + } + + private static final boolean ADD = true; + private static final boolean REMOVE = false; + + private void updateBlankNodesWithSparqlUpdate(Model model, String graphURI, boolean add) + throws RDFServiceException { List blankNodeStatements = new ArrayList(); StmtIterator stmtIt = model.listStatements(); while (stmtIt.hasNext()) { @@ -612,10 +638,10 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { Model blankNodeModel = ModelFactory.createDefaultModel(); blankNodeModel.add(blankNodeStatements); - log.debug("removal model size " + model.size()); + log.debug("update model size " + model.size()); log.debug("blank node model size " + blankNodeModel.size()); - if (blankNodeModel.size() == 1) { + if (!add && blankNodeModel.size() == 1) { log.warn("Deleting single triple with blank node: " + blankNodeModel); log.warn("This likely indicates a problem; excessive data may be deleted."); } @@ -633,7 +659,11 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { try { Model tree = qee.execDescribe(); if (s.isAnon()) { - removeUsingSparqlUpdate(tree, graphURI); + if (add) { + addModel(tree, graphURI); + } else { + removeUsingSparqlUpdate(tree, graphURI); + } } else { StmtIterator sit = tree.listStatements(s, null, (RDFNode) null); while (sit.hasNext()) { @@ -655,7 +685,11 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { } } m2.add(stmt); - removeUsingSparqlUpdate(m2, graphURI); + if (add) { + addModel(m2, graphURI); + } else { + removeUsingSparqlUpdate(m2, graphURI); + } } } } finally { @@ -667,7 +701,8 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService { } } - private void removeUsingSparqlUpdate(Model model, String graphURI) throws RDFServiceException { + private void removeUsingSparqlUpdate(Model model, String graphURI) + throws RDFServiceException { StmtIterator stmtIt = model.listStatements();