merge r. 9953 from maint-rel-1.5 : NIHVIVO-3886 change to RDFServiceSparql to add blank node structures in appropriate chunks

This commit is contained in:
brianjlowe 2012-07-09 19:14:03 +00:00
parent ef0afcdffa
commit 863536d6ee
2 changed files with 82 additions and 19 deletions

View file

@ -10,11 +10,14 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.hp.hpl.jena.graph.Graph;
import com.hp.hpl.jena.graph.Node;
import com.hp.hpl.jena.graph.Triple;
import com.hp.hpl.jena.query.ResultSet;
import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.rdf.model.Statement;
import com.hp.hpl.jena.rdf.model.StmtIterator;
import com.hp.hpl.jena.sparql.resultset.XMLInput;
import com.hp.hpl.jena.vocabulary.RDF;
@ -229,4 +232,29 @@ public abstract class RDFServiceImpl implements RDFService {
sbuff.append(c) ;
}
}
/**
* Returns a pair of models. The first contains any statement containing at
* least one blank node. The second contains all remaining statements.
* @param g
* @return
*/
protected Model[] separateStatementsWithBlankNodes(Model gm) {
Model blankNodeModel = ModelFactory.createDefaultModel();
Model nonBlankNodeModel = ModelFactory.createDefaultModel();
StmtIterator sit = gm.listStatements();
while (sit.hasNext()) {
Statement stmt = sit.nextStatement();
if (!stmt.getSubject().isAnon() && !stmt.getObject().isAnon()) {
nonBlankNodeModel.add(stmt);
} else {
blankNodeModel.add(stmt);
}
}
Model[] result = new Model[2];
result[0] = blankNodeModel;
result[1] = nonBlankNodeModel;
return result;
}
}

View file

@ -11,6 +11,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.NameValuePair;
import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
import org.apache.commons.httpclient.methods.PostMethod;
@ -62,7 +63,11 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
private String updateEndpointURI;
private HTTPRepository readRepository;
private HTTPRepository updateRepository;
private HttpClient httpClient;
private boolean useSesameContextQuery = true;
// the number of triples to be
private static final int CHUNK_SIZE = 1000; // added/removed in a single
// SPARQL UPDATE
/**
* Returns an RDFService for a remote repository
@ -82,6 +87,10 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
this.updateRepository = new HTTPRepository(updateEndpointURI);
testConnection();
MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
mgr.getParams().setDefaultMaxConnectionsPerHost(10);
this.httpClient = new HttpClient(mgr);
}
private void testConnection() {
@ -458,18 +467,21 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
protected void executeUpdate(String updateString) throws RDFServiceException {
try {
HttpClient httpClient = new HttpClient();
PostMethod meth = new PostMethod(updateEndpointURI);
meth.addRequestHeader("Content-Type", "application/x-www-form-urlencoded");
NameValuePair[] body = new NameValuePair[1];
body[0] = new NameValuePair("update", updateString);
meth.setRequestBody(body);
int response = httpClient.executeMethod(meth);
if (response > 399) {
log.error("response " + response + " to update. \n");
log.debug("update string: \n" + updateString);
throw new RDFServiceException("Unable to perform SPARQL UPDATE");
}
try {
meth.addRequestHeader("Content-Type", "application/x-www-form-urlencoded");
NameValuePair[] body = new NameValuePair[1];
body[0] = new NameValuePair("update", updateString);
meth.setRequestBody(body);
int response = httpClient.executeMethod(meth);
if (response > 399) {
log.error("response " + response + " to update. \n");
log.debug("update string: \n" + updateString);
throw new RDFServiceException("Unable to perform SPARQL UPDATE");
}
} finally {
meth.releaseConnection();
}
} catch (Exception e) {
throw new RDFServiceException("Unable to perform change set update", e);
}
@ -485,14 +497,13 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
private void verbModel(Model model, String graphURI, String verb) throws RDFServiceException {
Model m = ModelFactory.createDefaultModel();
int testLimit = 1000;
StmtIterator stmtIt = model.listStatements();
int count = 0;
try {
while (stmtIt.hasNext()) {
count++;
m.add(stmtIt.nextStatement());
if (count % testLimit == 0 || !stmtIt.hasNext()) {
if (count % CHUNK_SIZE == 0 || !stmtIt.hasNext()) {
StringWriter sw = new StringWriter();
m.write(sw, "N-TRIPLE");
StringBuffer updateStringBuff = new StringBuffer();
@ -585,7 +596,9 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
private void performChange(ModelChange modelChange) throws RDFServiceException {
Model model = parseModel(modelChange);
if (modelChange.getOperation() == ModelChange.Operation.ADD) {
addModel(model, modelChange.getGraphURI());
Model[] separatedModel = separateStatementsWithBlankNodes(model);
addModel(separatedModel[1], modelChange.getGraphURI());
addBlankNodesWithSparqlUpdate(separatedModel[0], modelChange.getGraphURI());
} else if (modelChange.getOperation() == ModelChange.Operation.REMOVE) {
deleteModel(model, modelChange.getGraphURI());
removeBlankNodesWithSparqlUpdate(model, modelChange.getGraphURI());
@ -594,8 +607,21 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
}
}
private void addBlankNodesWithSparqlUpdate(Model model, String graphURI)
throws RDFServiceException {
updateBlankNodesWithSparqlUpdate(model, graphURI, ADD);
}
private void removeBlankNodesWithSparqlUpdate(Model model, String graphURI)
throws RDFServiceException {
updateBlankNodesWithSparqlUpdate(model, graphURI, REMOVE);
}
private static final boolean ADD = true;
private static final boolean REMOVE = false;
private void updateBlankNodesWithSparqlUpdate(Model model, String graphURI, boolean add)
throws RDFServiceException {
List<Statement> blankNodeStatements = new ArrayList<Statement>();
StmtIterator stmtIt = model.listStatements();
while (stmtIt.hasNext()) {
@ -612,10 +638,10 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
Model blankNodeModel = ModelFactory.createDefaultModel();
blankNodeModel.add(blankNodeStatements);
log.debug("removal model size " + model.size());
log.debug("update model size " + model.size());
log.debug("blank node model size " + blankNodeModel.size());
if (blankNodeModel.size() == 1) {
if (!add && blankNodeModel.size() == 1) {
log.warn("Deleting single triple with blank node: " + blankNodeModel);
log.warn("This likely indicates a problem; excessive data may be deleted.");
}
@ -633,7 +659,11 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
try {
Model tree = qee.execDescribe();
if (s.isAnon()) {
removeUsingSparqlUpdate(tree, graphURI);
if (add) {
addModel(tree, graphURI);
} else {
removeUsingSparqlUpdate(tree, graphURI);
}
} else {
StmtIterator sit = tree.listStatements(s, null, (RDFNode) null);
while (sit.hasNext()) {
@ -655,7 +685,11 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
}
}
m2.add(stmt);
removeUsingSparqlUpdate(m2, graphURI);
if (add) {
addModel(m2, graphURI);
} else {
removeUsingSparqlUpdate(m2, graphURI);
}
}
}
} finally {
@ -667,7 +701,8 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
}
}
private void removeUsingSparqlUpdate(Model model, String graphURI) throws RDFServiceException {
private void removeUsingSparqlUpdate(Model model, String graphURI)
throws RDFServiceException {
StmtIterator stmtIt = model.listStatements();