NIHVIVO-3978 chunking statement iteration to avoid OutOfMemoryErrors when recomputing inferences

This commit is contained in:
brianjlowe 2012-10-03 17:22:16 +00:00
parent b7e2c71cc1
commit ec06bfd240

View file

@ -18,6 +18,7 @@ import com.hp.hpl.jena.query.QuerySolution;
import com.hp.hpl.jena.query.ResultSet;
import com.hp.hpl.jena.query.ResultSetFactory;
import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.rdf.model.RDFNode;
import com.hp.hpl.jena.rdf.model.Resource;
import com.hp.hpl.jena.rdf.model.ResourceFactory;
@ -30,6 +31,9 @@ import com.hp.hpl.jena.vocabulary.RDF;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService;
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException;
import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceUtils;
import edu.cornell.mannlib.vitro.webapp.servlet.setup.JenaDataSourceSetupBase;
import edu.cornell.mannlib.vitro.webapp.servlet.setup.SimpleReasonerSetup;
public class ABoxRecomputer {
@ -278,6 +282,7 @@ public class ABoxRecomputer {
log.error("Exception while reconciling the current and recomputed ABox inference model for class subsumption inferences. Halting processing." , e);
}
} catch (Exception e) {
e.printStackTrace();
log.error("Exception while recomputing ABox inferences. Halting processing.", e);
} finally {
inferenceRebuildModel.removeAll();
@ -362,20 +367,21 @@ public class ABoxRecomputer {
* reconcile a set of inferences into the application inference model
*/
protected boolean updateInferenceModel(Model inferenceRebuildModel) {
log.info("Updating ABox inference model");
StmtIterator iter = null;
Iterator<Statement> iter = null;
// Remove everything from the current inference model that is not
// in the recomputed inference model
int num = 0;
scratchpadModel.enterCriticalSection(Lock.WRITE);
scratchpadModel.removeAll();
log.info("Updating ABox inference model (checking for outdated inferences)");
try {
inferenceModel.enterCriticalSection(Lock.READ);
try {
iter = inferenceModel.listStatements();
iter = listModelStatements(inferenceModel, JenaDataSourceSetupBase.JENA_INF_MODEL);
while (iter.hasNext()) {
Statement stmt = iter.next();
if (!inferenceRebuildModel.contains(stmt)) {
@ -384,7 +390,7 @@ public class ABoxRecomputer {
num++;
if ((num % 10000) == 0) {
log.info("Still updating ABox inference model (removing outdated inferences)...");
log.info("Still updating ABox inference model (checking for outdated inferences)...");
}
if (stopRequested) {
@ -392,14 +398,14 @@ public class ABoxRecomputer {
}
}
} finally {
if (iter != null) {
iter.close();
}
// if (iter != null) {
// iter.close();
// }
inferenceModel.leaveCriticalSection();
}
try {
iter = scratchpadModel.listStatements();
iter = listModelStatements(scratchpadModel, SimpleReasonerSetup.JENA_INF_MODEL_SCRATCHPAD);
while (iter.hasNext()) {
Statement stmt = iter.next();
@ -411,16 +417,17 @@ public class ABoxRecomputer {
}
}
} finally {
if (iter != null) {
iter.close();
}
// if (iter != null) {
// iter.close();
// }
}
// Add everything from the recomputed inference model that is not already
// in the current inference model to the current inference model.
try {
scratchpadModel.removeAll();
iter = inferenceRebuildModel.listStatements();
log.info("Updating ABox inference model (adding any new inferences)");
iter = listModelStatements(inferenceRebuildModel, SimpleReasonerSetup.JENA_INF_MODEL_REBUILD);
while (iter.hasNext()) {
Statement stmt = iter.next();
@ -436,7 +443,7 @@ public class ABoxRecomputer {
num++;
if ((num % 10000) == 0) {
log.info("Still updating ABox inference model (adding new inferences)...");
log.info("Still updating ABox inference model (adding any new inferences)...");
}
if (stopRequested) {
@ -444,12 +451,12 @@ public class ABoxRecomputer {
}
}
} finally {
if (iter != null) {
iter.close();
}
// if (iter != null) {
// iter.close();
// }
}
iter = scratchpadModel.listStatements();
iter = listModelStatements(scratchpadModel, SimpleReasonerSetup.JENA_INF_MODEL_SCRATCHPAD);
try {
while (iter.hasNext()) {
Statement stmt = iter.next();
@ -462,9 +469,9 @@ public class ABoxRecomputer {
}
}
} finally {
if (iter != null) {
iter.close();
}
// if (iter != null) {
// iter.close();
// }
}
} finally {
scratchpadModel.removeAll();
@ -475,6 +482,80 @@ public class ABoxRecomputer {
return false;
}
private Iterator<Statement> listModelStatements(Model model, String graphURI) {
// the RDFServices supplied by the unit tests won't have the right
// named graphs. So if the graphURI-based chunked iterator is empty,
// we'll try listStatements() on the model instead.
Iterator<Statement> it = new ChunkedStatementIterator(graphURI);
if (it.hasNext()) {
return it;
} else {
return model.listStatements();
}
}
// avoids OutOfMemory errors by retrieving triples in batches
private class ChunkedStatementIterator implements Iterator<Statement> {
final int CHUNK_SIZE = 50000;
private int offset = 0;
private String queryString;
private Model temp = ModelFactory.createDefaultModel();
private StmtIterator tempIt;
public ChunkedStatementIterator(String graphURI) {
this.queryString = "CONSTRUCT { ?s ?p ?o } WHERE { GRAPH <" +
graphURI + "> { ?s ?p ?o } }";
}
public Statement next() {
if (tempIt.hasNext()) {
return tempIt.nextStatement();
} else {
return null;
}
}
public void remove() {
throw new UnsupportedOperationException(this.getClass().getName() +
" does not support .remove()");
}
public boolean hasNext() {
if (tempIt != null && tempIt.hasNext()) {
return true;
} else {
getNextChunk();
if (temp.size() > 0) {
tempIt = temp.listStatements();
return true;
} else {
return false;
}
}
}
private void getNextChunk() {
String chunkQueryString = queryString + " LIMIT " + CHUNK_SIZE + " OFFSET " + offset;
offset += CHUNK_SIZE;
try {
InputStream in = rdfService.sparqlConstructQuery(
chunkQueryString, RDFService.ModelSerializationFormat.NTRIPLE);
temp.removeAll();
temp.add(RDFServiceUtils.parseModel(
in, RDFService.ModelSerializationFormat.NTRIPLE));
} catch (RDFServiceException e) {
throw new RuntimeException(e);
}
}
}
/**
* This is called when the application shuts down.
*/