DeltaComputer timing issues (plus unrelated logging update to TBoxListener)

This commit is contained in:
stellamit 2012-03-01 18:59:26 +00:00
parent 6ad2d16c94
commit c3b33b9cca
2 changed files with 179 additions and 159 deletions

View file

@ -150,27 +150,31 @@ public class SimpleReasoner extends StatementListener {
public void removedStatement(Statement stmt) { public void removedStatement(Statement stmt) {
try { try {
if (!isInterestedInRemovedStatement(stmt)) { return; }
if (!isInterestedInRemovedStatement(stmt)) return;
handleRemovedStatement(stmt);
if (batchMode1) {
aBoxDeltaModeler1.removedStatement(stmt);
} else if (batchMode2) {
aBoxDeltaModeler2.removedStatement(stmt);
} else {
if (stmt.getPredicate().equals(RDF.type)) {
removedABoxTypeAssertion(stmt, inferenceModel);
setMostSpecificTypes(stmt.getSubject(), inferenceModel, new HashSet<String>());
}
doPlugins(ModelUpdate.Operation.RETRACT,stmt);
}
} catch (Exception e) { } catch (Exception e) {
// don't stop the edit if there's an exception // don't stop the edit if there's an exception
log.error("Exception while retracting inferences: ", e); log.error("Exception while retracting inferences: ", e);
} }
} }
public synchronized void handleRemovedStatement(Statement stmt) {
if (batchMode1) {
aBoxDeltaModeler1.removedStatement(stmt);
} else if (batchMode2) {
aBoxDeltaModeler2.removedStatement(stmt);
} else {
if (stmt.getPredicate().equals(RDF.type)) {
removedABoxTypeAssertion(stmt, inferenceModel);
setMostSpecificTypes(stmt.getSubject(), inferenceModel, new HashSet<String>());
}
doPlugins(ModelUpdate.Operation.RETRACT,stmt);
}
}
/* /*
* Performs incremental selected ABox reasoning based * Performs incremental selected ABox reasoning based
* on changes to the class hierarchy. * on changes to the class hierarchy.
@ -651,9 +655,6 @@ public class SimpleReasoner extends StatementListener {
} }
} }
} }
} finally { } finally {
inferenceModel.leaveCriticalSection(); inferenceModel.leaveCriticalSection();
aboxModel.leaveCriticalSection(); aboxModel.leaveCriticalSection();
@ -920,144 +921,7 @@ public class SimpleReasoner extends StatementListener {
} }
} }
@Override
public synchronized void notifyEvent(Model model, Object event) {
if (event instanceof BulkUpdateEvent) {
if (((BulkUpdateEvent) event).getBegin()) {
log.info("received BulkUpdateEvent(begin)");
if (batchMode1 || batchMode2) {
log.info("received a BulkUpdateEvent(begin) while already in batch update mode; this event will be ignored.");
return;
} else {
batchMode1 = true;
batchMode2 = false;
aBoxDeltaModeler1.getRetractions().removeAll();
log.info("started processing retractions in batch mode");
}
} else {
if (!batchMode1 && !batchMode2) {
log.warn("SimpleReasoner received an end batch mode request when not currently in batch mode. No action was taken");
return;
}
log.info("received BulkUpdateEvent(end)");
if (!deltaComputerProcessing) {
deltaComputerProcessing = true;
new Thread(new DeltaComputer(),"DeltaComputer").start();
}
}
}
}
private volatile boolean deltaComputerProcessing = false;
private class DeltaComputer extends Thread {
public DeltaComputer() {
}
@Override
public void run() {
log.info("starting DeltaComputer.run");
Model retractions = aBoxDeltaModeler1.getRetractions();
boolean finished = (retractions.size() == 0);
boolean abort = false;
String qualifier = "(1)";
while (!finished && !stopRequested) {
retractions.enterCriticalSection(Lock.READ);
StmtIterator iter = null;
try {
log.info("run: started computing inferences for batch " + qualifier + " update");
iter = retractions.listStatements();
int num = 0;
while (iter.hasNext() && !stopRequested) {
Statement stmt = iter.next();
try {
if (stmt.getPredicate().equals(RDF.type)) {
removedABoxTypeAssertion(stmt, inferenceModel);
}
setMostSpecificTypes(stmt.getSubject(), inferenceModel, new HashSet<String>());
doPlugins(ModelUpdate.Operation.RETRACT,stmt);
} catch (NullPointerException npe) {
abort = true;
break;
} catch (Exception e) {
log.error("exception in batch mode ",e);
}
num++;
if ((num % 6000) == 0) {
log.info("still computing inferences for batch " + qualifier + " update...");
}
if (stopRequested) {
log.info("a stopRequested signal was received during DeltaComputer.run. Halting Processing.");
return;
}
}
} finally {
iter.close();
retractions.removeAll();
retractions.leaveCriticalSection();
}
if (stopRequested) {
log.info("a stopRequested signal was received during DeltaComputer.run. Halting Processing.");
return;
}
if (abort) {
log.error("a NullPointerException was received while computing inferences in batch " + qualifier + " mode. Halting inference computation.");
return;
}
log.info("finished computing inferences for batch " + qualifier + " update");
if (batchMode1 && (aBoxDeltaModeler2.getRetractions().size() > 0)) {
batchMode2 = true;
batchMode1 = false;
retractions = aBoxDeltaModeler2.getRetractions();
qualifier = "(2)";
log.info("switching from batch mode 1 to batch mode 2");
} else if (batchMode2 && (aBoxDeltaModeler1.getRetractions().size() > 0)) {
batchMode1 = true;
batchMode2 = false;
retractions = aBoxDeltaModeler1.getRetractions();
qualifier = "(1)";
log.info("switching from batch mode 2 to batch mode 1");
} else {
deltaComputerProcessing = false;
finished = true;
batchMode1 = false;
batchMode2 = false;
log.info("finished processing retractions in batch mode");
}
}
deltaComputerProcessing = false;
if (batchMode1 || batchMode2) {
log.warn("Unexpected condition at the end of DeltaComputer.run method: batchMode1=" + batchMode1 + ", batchMode2 =" + batchMode2 + ". (both should be false)" );
batchMode1 = false;
batchMode2 = false;
}
if (aBoxDeltaModeler1.getRetractions().size() > 0) {
log.warn("Unexpected condition: the aBoxDeltaModeler1 retractions model was not empty at the end of the DeltaComputer.run method");
}
if (aBoxDeltaModeler2.getRetractions().size() > 0) {
log.warn("Unexpected condition: the aBoxDeltaModeler2 retractions model was not empty at the end of the DeltaComputer.run method");
}
}
}
public ArrayList<String> getAllIndividualURIs() { public ArrayList<String> getAllIndividualURIs() {
@ -1096,9 +960,6 @@ public class SimpleReasoner extends StatementListener {
return individuals; return individuals;
} }
/**
*
*/
protected void doPlugins(ModelUpdate.Operation op, Statement stmt) { protected void doPlugins(ModelUpdate.Operation op, Statement stmt) {
for (ReasonerPlugin plugin : getPluginList()) { for (ReasonerPlugin plugin : getPluginList()) {
@ -1133,6 +994,11 @@ public class SimpleReasoner extends StatementListener {
return false; return false;
} }
//TODO remove this for 1.5
public synchronized void computeMostSpecificType() {
}
/** /**
* This is called when the system shuts down. * This is called when the system shuts down.
*/ */
@ -1145,5 +1011,159 @@ public class SimpleReasoner extends StatementListener {
"] [property = " + statement.getPredicate().getURI() + "] [property = " + statement.getPredicate().getURI() +
"] [object = " + (statement.getObject().isLiteral() ? ((Literal)statement.getObject()).getLexicalForm() + " (Literal)" "] [object = " + (statement.getObject().isLiteral() ? ((Literal)statement.getObject()).getLexicalForm() + " (Literal)"
: ((Resource)statement.getObject()).getURI() + " (Resource)") + "]"; : ((Resource)statement.getObject()).getURI() + " (Resource)") + "]";
}
private volatile boolean deltaComputerProcessing = false;
private int eventCount = 0;
@Override
public synchronized void notifyEvent(Model model, Object event) {
if (event instanceof BulkUpdateEvent) {
if (((BulkUpdateEvent) event).getBegin()) {
eventCount++;
log.info("received a bulk update begin event.");
if (batchMode1 || batchMode2) {
log.info("received a bulk update begin event while already in batch update mode; this event will be ignored.");
return;
} else {
batchMode1 = true;
batchMode2 = false;
if (aBoxDeltaModeler1.getRetractions().size() > 0) {
log.warn("Unexpected condition: the aBoxDeltaModeler1 retractions model was not empty when entering batch mode.");
}
if (aBoxDeltaModeler2.getRetractions().size() > 0) {
log.warn("Unexpected condition: the aBoxDeltaModeler2 retractions model was not empty when entering batch mode.");
}
log.info("initializing batch mode 1");
}
} else {
eventCount--;
log.info("received a bulk update end event");
if (!deltaComputerProcessing) {
deltaComputerProcessing = true;
new Thread(new DeltaComputer(),"DeltaComputer").start();
} else {
log.info("received a bulk update end event while currently processing in aynchronous mode");
}
}
}
}
private synchronized boolean switchBatchModes() {
if (batchMode1) {
aBoxDeltaModeler2.getRetractions().removeAll();
if (aBoxDeltaModeler1.getRetractions().size() > 0) {
batchMode2 = true;
batchMode1 = false;
log.info("entering batch mode 2");
} else {
deltaComputerProcessing = false;
}
} else if (batchMode2) {
aBoxDeltaModeler1.getRetractions().removeAll();
if (aBoxDeltaModeler2.getRetractions().size() > 0) {
batchMode1 = true;
batchMode2 = false;
log.info("entering batch mode 1");
} else {
deltaComputerProcessing = false;
}
} else {
log.warn("unexpected condition, invoked when batchMode1 and batchMode2 were both false");
deltaComputerProcessing = false;
}
return deltaComputerProcessing;
}
private class DeltaComputer extends Thread {
public DeltaComputer() {
}
@Override
public void run() {
log.info("starting DeltaComputer.run");
boolean abort = false;
Model retractions = ModelFactory.createDefaultModel();
String qualifier = "";
while (deltaComputerProcessing && !stopRequested) {
if (switchBatchModes()) {
if (batchMode1) {
qualifier = "2";
retractions = aBoxDeltaModeler2.getRetractions();
} else if (batchMode2) {
qualifier = "1";
retractions = aBoxDeltaModeler1.getRetractions();
}
} else {
break;
}
retractions.enterCriticalSection(Lock.READ);
StmtIterator iter = null;
int num = 0;
try {
log.info("started computing inferences for batch " + qualifier + " updates");
iter = retractions.listStatements();
while (iter.hasNext() && !stopRequested) {
Statement stmt = iter.next();
try {
if (stmt.getPredicate().equals(RDF.type)) {
removedABoxTypeAssertion(stmt, inferenceModel);
}
setMostSpecificTypes(stmt.getSubject(), inferenceModel, new HashSet<String>());
doPlugins(ModelUpdate.Operation.RETRACT,stmt);
} catch (NullPointerException npe) {
abort = true;
break;
} catch (Exception e) {
log.error("exception in batch mode ",e);
}
num++;
if ((num % 6000) == 0) {
log.info("still computing inferences for batch " + qualifier + " update...");
}
if (stopRequested) {
log.info("a stopRequested signal was received during DeltaComputer.run. Halting Processing.");
return;
}
}
} finally {
iter.close();
retractions.removeAll();
retractions.leaveCriticalSection();
}
if (stopRequested) {
log.info("a stopRequested signal was received during DeltaComputer.run. Halting Processing.");
deltaComputerProcessing = false;
return;
}
if (abort) {
log.error("a NullPointerException was received while computing inferences in batch " + qualifier + " mode. Halting inference computation.");
deltaComputerProcessing = false;
return;
}
log.info("finished computing inferences for batch " + qualifier + " updates. Processed " + num + " statements.");
}
log.info("ending DeltaComputer.run");
}
} }
} }

View file

@ -62,7 +62,7 @@ public class SimpleReasonerTBoxListener extends StatementListener {
private synchronized void processUpdate(ModelUpdate mu) { private synchronized void processUpdate(ModelUpdate mu) {
if (!processingUpdates && (modelUpdates.peek() != null)) { if (!processingUpdates && (modelUpdates.peek() != null)) {
log.error("TBoxProcessor thread was not running and work queue is not empty. size = " + modelUpdates.size()); log.warn("TBoxProcessor thread was not running and work queue is not empty. size = " + modelUpdates.size() + " The work will be processed now.");
} }
modelUpdates.add(mu); modelUpdates.add(mu);