From ec06bfd240a7598d97b14ebb25ecadd6e6788df9 Mon Sep 17 00:00:00 2001 From: brianjlowe Date: Wed, 3 Oct 2012 17:22:16 +0000 Subject: [PATCH] NIHVIVO-3978 chunking statement iteration to avoid OutOfMemoryErrors when recomputing inferences --- .../vitro/webapp/reasoner/ABoxRecomputer.java | 123 +++++++++++++++--- 1 file changed, 102 insertions(+), 21 deletions(-) diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java index 3f459a6db..78244216b 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/ABoxRecomputer.java @@ -18,6 +18,7 @@ import com.hp.hpl.jena.query.QuerySolution; import com.hp.hpl.jena.query.ResultSet; import com.hp.hpl.jena.query.ResultSetFactory; import com.hp.hpl.jena.rdf.model.Model; +import com.hp.hpl.jena.rdf.model.ModelFactory; import com.hp.hpl.jena.rdf.model.RDFNode; import com.hp.hpl.jena.rdf.model.Resource; import com.hp.hpl.jena.rdf.model.ResourceFactory; @@ -30,6 +31,9 @@ import com.hp.hpl.jena.vocabulary.RDF; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; +import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceUtils; +import edu.cornell.mannlib.vitro.webapp.servlet.setup.JenaDataSourceSetupBase; +import edu.cornell.mannlib.vitro.webapp.servlet.setup.SimpleReasonerSetup; public class ABoxRecomputer { @@ -278,6 +282,7 @@ public class ABoxRecomputer { log.error("Exception while reconciling the current and recomputed ABox inference model for class subsumption inferences. Halting processing." , e); } } catch (Exception e) { + e.printStackTrace(); log.error("Exception while recomputing ABox inferences. Halting processing.", e); } finally { inferenceRebuildModel.removeAll(); @@ -362,20 +367,21 @@ public class ABoxRecomputer { * reconcile a set of inferences into the application inference model */ protected boolean updateInferenceModel(Model inferenceRebuildModel) { - + log.info("Updating ABox inference model"); - StmtIterator iter = null; + Iterator iter = null; // Remove everything from the current inference model that is not // in the recomputed inference model int num = 0; scratchpadModel.enterCriticalSection(Lock.WRITE); scratchpadModel.removeAll(); + log.info("Updating ABox inference model (checking for outdated inferences)"); try { inferenceModel.enterCriticalSection(Lock.READ); + try { - iter = inferenceModel.listStatements(); - + iter = listModelStatements(inferenceModel, JenaDataSourceSetupBase.JENA_INF_MODEL); while (iter.hasNext()) { Statement stmt = iter.next(); if (!inferenceRebuildModel.contains(stmt)) { @@ -384,7 +390,7 @@ public class ABoxRecomputer { num++; if ((num % 10000) == 0) { - log.info("Still updating ABox inference model (removing outdated inferences)..."); + log.info("Still updating ABox inference model (checking for outdated inferences)..."); } if (stopRequested) { @@ -392,14 +398,14 @@ public class ABoxRecomputer { } } } finally { - if (iter != null) { - iter.close(); - } +// if (iter != null) { +// iter.close(); +// } inferenceModel.leaveCriticalSection(); } try { - iter = scratchpadModel.listStatements(); + iter = listModelStatements(scratchpadModel, SimpleReasonerSetup.JENA_INF_MODEL_SCRATCHPAD); while (iter.hasNext()) { Statement stmt = iter.next(); @@ -411,16 +417,17 @@ public class ABoxRecomputer { } } } finally { - if (iter != null) { - iter.close(); - } +// if (iter != null) { +// iter.close(); +// } } // Add everything from the recomputed inference model that is not already // in the current inference model to the current inference model. try { scratchpadModel.removeAll(); - iter = inferenceRebuildModel.listStatements(); + log.info("Updating ABox inference model (adding any new inferences)"); + iter = listModelStatements(inferenceRebuildModel, SimpleReasonerSetup.JENA_INF_MODEL_REBUILD); while (iter.hasNext()) { Statement stmt = iter.next(); @@ -436,7 +443,7 @@ public class ABoxRecomputer { num++; if ((num % 10000) == 0) { - log.info("Still updating ABox inference model (adding new inferences)..."); + log.info("Still updating ABox inference model (adding any new inferences)..."); } if (stopRequested) { @@ -444,12 +451,12 @@ public class ABoxRecomputer { } } } finally { - if (iter != null) { - iter.close(); - } +// if (iter != null) { +// iter.close(); +// } } - iter = scratchpadModel.listStatements(); + iter = listModelStatements(scratchpadModel, SimpleReasonerSetup.JENA_INF_MODEL_SCRATCHPAD); try { while (iter.hasNext()) { Statement stmt = iter.next(); @@ -462,9 +469,9 @@ public class ABoxRecomputer { } } } finally { - if (iter != null) { - iter.close(); - } +// if (iter != null) { +// iter.close(); +// } } } finally { scratchpadModel.removeAll(); @@ -475,6 +482,80 @@ public class ABoxRecomputer { return false; } + private Iterator listModelStatements(Model model, String graphURI) { + // the RDFServices supplied by the unit tests won't have the right + // named graphs. So if the graphURI-based chunked iterator is empty, + // we'll try listStatements() on the model instead. + Iterator it = new ChunkedStatementIterator(graphURI); + if (it.hasNext()) { + return it; + } else { + return model.listStatements(); + } + } + + // avoids OutOfMemory errors by retrieving triples in batches + private class ChunkedStatementIterator implements Iterator { + + final int CHUNK_SIZE = 50000; + private int offset = 0; + + private String queryString; + + private Model temp = ModelFactory.createDefaultModel(); + private StmtIterator tempIt; + + public ChunkedStatementIterator(String graphURI) { + this.queryString = "CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <" + + graphURI + "> { ?s ?p ?o } }"; + } + + public Statement next() { + if (tempIt.hasNext()) { + return tempIt.nextStatement(); + } else { + return null; + } + } + + public void remove() { + throw new UnsupportedOperationException(this.getClass().getName() + + " does not support .remove()"); + } + + public boolean hasNext() { + if (tempIt != null && tempIt.hasNext()) { + return true; + } else { + getNextChunk(); + if (temp.size() > 0) { + tempIt = temp.listStatements(); + return true; + } else { + return false; + } + } + } + + private void getNextChunk() { + + String chunkQueryString = queryString + " LIMIT " + CHUNK_SIZE + " OFFSET " + offset; + offset += CHUNK_SIZE; + + try { + InputStream in = rdfService.sparqlConstructQuery( + chunkQueryString, RDFService.ModelSerializationFormat.NTRIPLE); + temp.removeAll(); + temp.add(RDFServiceUtils.parseModel( + in, RDFService.ModelSerializationFormat.NTRIPLE)); + } catch (RDFServiceException e) { + throw new RuntimeException(e); + } + } + + } + + /** * This is called when the application shuts down. */