From c3b33b9ccac97ef94c917897e1af3e9176c77928 Mon Sep 17 00:00:00 2001 From: stellamit Date: Thu, 1 Mar 2012 18:59:26 +0000 Subject: [PATCH] DeltaComputer timing issues (plus unrelated logging update to TBoxListener) --- .../vitro/webapp/reasoner/SimpleReasoner.java | 336 ++++++++++-------- .../reasoner/SimpleReasonerTBoxListener.java | 2 +- 2 files changed, 179 insertions(+), 159 deletions(-) diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasoner.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasoner.java index 43fa26341..6d2e1db9e 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasoner.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasoner.java @@ -150,27 +150,31 @@ public class SimpleReasoner extends StatementListener { public void removedStatement(Statement stmt) { try { - - if (!isInterestedInRemovedStatement(stmt)) return; - - if (batchMode1) { - aBoxDeltaModeler1.removedStatement(stmt); - } else if (batchMode2) { - aBoxDeltaModeler2.removedStatement(stmt); - } else { - if (stmt.getPredicate().equals(RDF.type)) { - removedABoxTypeAssertion(stmt, inferenceModel); - setMostSpecificTypes(stmt.getSubject(), inferenceModel, new HashSet()); - } - - doPlugins(ModelUpdate.Operation.RETRACT,stmt); - } + if (!isInterestedInRemovedStatement(stmt)) { return; } + + handleRemovedStatement(stmt); + } catch (Exception e) { // don't stop the edit if there's an exception log.error("Exception while retracting inferences: ", e); } } + public synchronized void handleRemovedStatement(Statement stmt) { + + if (batchMode1) { + aBoxDeltaModeler1.removedStatement(stmt); + } else if (batchMode2) { + aBoxDeltaModeler2.removedStatement(stmt); + } else { + if (stmt.getPredicate().equals(RDF.type)) { + removedABoxTypeAssertion(stmt, inferenceModel); + setMostSpecificTypes(stmt.getSubject(), inferenceModel, new HashSet()); + } + doPlugins(ModelUpdate.Operation.RETRACT,stmt); + } + } + /* * Performs incremental selected ABox reasoning based * on changes to the class hierarchy. @@ -651,9 +655,6 @@ public class SimpleReasoner extends StatementListener { } } } - - - } finally { inferenceModel.leaveCriticalSection(); aboxModel.leaveCriticalSection(); @@ -920,144 +921,7 @@ public class SimpleReasoner extends StatementListener { } } - @Override - public synchronized void notifyEvent(Model model, Object event) { - - if (event instanceof BulkUpdateEvent) { - if (((BulkUpdateEvent) event).getBegin()) { - - log.info("received BulkUpdateEvent(begin)"); - - if (batchMode1 || batchMode2) { - log.info("received a BulkUpdateEvent(begin) while already in batch update mode; this event will be ignored."); - return; - } else { - batchMode1 = true; - batchMode2 = false; - aBoxDeltaModeler1.getRetractions().removeAll(); - log.info("started processing retractions in batch mode"); - } - } else { - if (!batchMode1 && !batchMode2) { - log.warn("SimpleReasoner received an end batch mode request when not currently in batch mode. No action was taken"); - return; - } - log.info("received BulkUpdateEvent(end)"); - - if (!deltaComputerProcessing) { - deltaComputerProcessing = true; - new Thread(new DeltaComputer(),"DeltaComputer").start(); - } - } - } - } - - private volatile boolean deltaComputerProcessing = false; - - private class DeltaComputer extends Thread { - public DeltaComputer() { - } - - @Override - public void run() { - - log.info("starting DeltaComputer.run"); - Model retractions = aBoxDeltaModeler1.getRetractions(); - boolean finished = (retractions.size() == 0); - boolean abort = false; - String qualifier = "(1)"; - - while (!finished && !stopRequested) { - retractions.enterCriticalSection(Lock.READ); - StmtIterator iter = null; - - try { - log.info("run: started computing inferences for batch " + qualifier + " update"); - iter = retractions.listStatements(); - - int num = 0; - while (iter.hasNext() && !stopRequested) { - Statement stmt = iter.next(); - - try { - if (stmt.getPredicate().equals(RDF.type)) { - removedABoxTypeAssertion(stmt, inferenceModel); - } - setMostSpecificTypes(stmt.getSubject(), inferenceModel, new HashSet()); - doPlugins(ModelUpdate.Operation.RETRACT,stmt); - } catch (NullPointerException npe) { - abort = true; - break; - } catch (Exception e) { - log.error("exception in batch mode ",e); - } - - num++; - if ((num % 6000) == 0) { - log.info("still computing inferences for batch " + qualifier + " update..."); - } - - if (stopRequested) { - log.info("a stopRequested signal was received during DeltaComputer.run. Halting Processing."); - return; - } - } - } finally { - iter.close(); - retractions.removeAll(); - retractions.leaveCriticalSection(); - } - - if (stopRequested) { - log.info("a stopRequested signal was received during DeltaComputer.run. Halting Processing."); - return; - } - - if (abort) { - log.error("a NullPointerException was received while computing inferences in batch " + qualifier + " mode. Halting inference computation."); - return; - } - - log.info("finished computing inferences for batch " + qualifier + " update"); - - if (batchMode1 && (aBoxDeltaModeler2.getRetractions().size() > 0)) { - batchMode2 = true; - batchMode1 = false; - retractions = aBoxDeltaModeler2.getRetractions(); - qualifier = "(2)"; - log.info("switching from batch mode 1 to batch mode 2"); - } else if (batchMode2 && (aBoxDeltaModeler1.getRetractions().size() > 0)) { - batchMode1 = true; - batchMode2 = false; - retractions = aBoxDeltaModeler1.getRetractions(); - qualifier = "(1)"; - log.info("switching from batch mode 2 to batch mode 1"); - } else { - deltaComputerProcessing = false; - finished = true; - batchMode1 = false; - batchMode2 = false; - log.info("finished processing retractions in batch mode"); - } - } - - deltaComputerProcessing = false; - - if (batchMode1 || batchMode2) { - log.warn("Unexpected condition at the end of DeltaComputer.run method: batchMode1=" + batchMode1 + ", batchMode2 =" + batchMode2 + ". (both should be false)" ); - batchMode1 = false; - batchMode2 = false; - } - - if (aBoxDeltaModeler1.getRetractions().size() > 0) { - log.warn("Unexpected condition: the aBoxDeltaModeler1 retractions model was not empty at the end of the DeltaComputer.run method"); - } - if (aBoxDeltaModeler2.getRetractions().size() > 0) { - log.warn("Unexpected condition: the aBoxDeltaModeler2 retractions model was not empty at the end of the DeltaComputer.run method"); - } - } - } public ArrayList getAllIndividualURIs() { @@ -1096,9 +960,6 @@ public class SimpleReasoner extends StatementListener { return individuals; } - /** - * - */ protected void doPlugins(ModelUpdate.Operation op, Statement stmt) { for (ReasonerPlugin plugin : getPluginList()) { @@ -1133,6 +994,11 @@ public class SimpleReasoner extends StatementListener { return false; } + //TODO remove this for 1.5 + public synchronized void computeMostSpecificType() { + + } + /** * This is called when the system shuts down. */ @@ -1145,5 +1011,159 @@ public class SimpleReasoner extends StatementListener { "] [property = " + statement.getPredicate().getURI() + "] [object = " + (statement.getObject().isLiteral() ? ((Literal)statement.getObject()).getLexicalForm() + " (Literal)" : ((Resource)statement.getObject()).getURI() + " (Resource)") + "]"; + } + + private volatile boolean deltaComputerProcessing = false; + private int eventCount = 0; + + @Override + public synchronized void notifyEvent(Model model, Object event) { + + if (event instanceof BulkUpdateEvent) { + if (((BulkUpdateEvent) event).getBegin()) { + eventCount++; + log.info("received a bulk update begin event."); + if (batchMode1 || batchMode2) { + log.info("received a bulk update begin event while already in batch update mode; this event will be ignored."); + return; + } else { + batchMode1 = true; + batchMode2 = false; + + if (aBoxDeltaModeler1.getRetractions().size() > 0) { + log.warn("Unexpected condition: the aBoxDeltaModeler1 retractions model was not empty when entering batch mode."); + } + + if (aBoxDeltaModeler2.getRetractions().size() > 0) { + log.warn("Unexpected condition: the aBoxDeltaModeler2 retractions model was not empty when entering batch mode."); + } + + log.info("initializing batch mode 1"); + } + } else { + eventCount--; + log.info("received a bulk update end event"); + if (!deltaComputerProcessing) { + deltaComputerProcessing = true; + new Thread(new DeltaComputer(),"DeltaComputer").start(); + } else { + log.info("received a bulk update end event while currently processing in aynchronous mode"); + } + } + } + } + + private synchronized boolean switchBatchModes() { + + if (batchMode1) { + aBoxDeltaModeler2.getRetractions().removeAll(); + + if (aBoxDeltaModeler1.getRetractions().size() > 0) { + batchMode2 = true; + batchMode1 = false; + log.info("entering batch mode 2"); + } else { + deltaComputerProcessing = false; + } + } else if (batchMode2) { + aBoxDeltaModeler1.getRetractions().removeAll(); + + if (aBoxDeltaModeler2.getRetractions().size() > 0) { + batchMode1 = true; + batchMode2 = false; + log.info("entering batch mode 1"); + } else { + deltaComputerProcessing = false; + } + } else { + log.warn("unexpected condition, invoked when batchMode1 and batchMode2 were both false"); + deltaComputerProcessing = false; + } + + return deltaComputerProcessing; + } + + private class DeltaComputer extends Thread { + public DeltaComputer() { + } + + @Override + public void run() { + log.info("starting DeltaComputer.run"); + boolean abort = false; + Model retractions = ModelFactory.createDefaultModel(); + String qualifier = ""; + + while (deltaComputerProcessing && !stopRequested) { + + if (switchBatchModes()) { + if (batchMode1) { + qualifier = "2"; + retractions = aBoxDeltaModeler2.getRetractions(); + } else if (batchMode2) { + qualifier = "1"; + retractions = aBoxDeltaModeler1.getRetractions(); + } + } else { + break; + } + + retractions.enterCriticalSection(Lock.READ); + StmtIterator iter = null; + int num = 0; + + try { + log.info("started computing inferences for batch " + qualifier + " updates"); + iter = retractions.listStatements(); + + while (iter.hasNext() && !stopRequested) { + Statement stmt = iter.next(); + + try { + if (stmt.getPredicate().equals(RDF.type)) { + removedABoxTypeAssertion(stmt, inferenceModel); + } + setMostSpecificTypes(stmt.getSubject(), inferenceModel, new HashSet()); + doPlugins(ModelUpdate.Operation.RETRACT,stmt); + } catch (NullPointerException npe) { + abort = true; + break; + } catch (Exception e) { + log.error("exception in batch mode ",e); + } + + num++; + if ((num % 6000) == 0) { + log.info("still computing inferences for batch " + qualifier + " update..."); + } + + if (stopRequested) { + log.info("a stopRequested signal was received during DeltaComputer.run. Halting Processing."); + return; + } + } + } finally { + iter.close(); + retractions.removeAll(); + retractions.leaveCriticalSection(); + } + + if (stopRequested) { + log.info("a stopRequested signal was received during DeltaComputer.run. Halting Processing."); + deltaComputerProcessing = false; + return; + } + + if (abort) { + log.error("a NullPointerException was received while computing inferences in batch " + qualifier + " mode. Halting inference computation."); + deltaComputerProcessing = false; + return; + } + + log.info("finished computing inferences for batch " + qualifier + " updates. Processed " + num + " statements."); + } + + log.info("ending DeltaComputer.run"); + } } } diff --git a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasonerTBoxListener.java b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasonerTBoxListener.java index 340d78396..e71c98a8c 100644 --- a/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasonerTBoxListener.java +++ b/webapp/src/edu/cornell/mannlib/vitro/webapp/reasoner/SimpleReasonerTBoxListener.java @@ -62,7 +62,7 @@ public class SimpleReasonerTBoxListener extends StatementListener { private synchronized void processUpdate(ModelUpdate mu) { if (!processingUpdates && (modelUpdates.peek() != null)) { - log.error("TBoxProcessor thread was not running and work queue is not empty. size = " + modelUpdates.size()); + log.warn("TBoxProcessor thread was not running and work queue is not empty. size = " + modelUpdates.size() + " The work will be processed now."); } modelUpdates.add(mu);