From 478ad12f154faaff8e4310f6b86d731f220e8d6e Mon Sep 17 00:00:00 2001 From: Graham Triggs Date: Fri, 9 Sep 2016 17:54:47 +0100 Subject: [PATCH] [VIVO-1270] Replacement for BulkUpdateHandler - still need to test Union --- .../adapters/AbstractBulkUpdater.java | 51 +++++ .../adapters/BulkUpdatingModel.java | 211 ++++++++++++++++++ .../adapters/BulkUpdatingOntModel.java | 206 +++++++++++++++++ .../adapters/RDFServiceBulkUpdater.java | 102 +++++++++ .../adapters/SparqlBulkUpdater.java | 83 +++++++ .../adapters/VitroModelFactory.java | 23 +- 6 files changed, 664 insertions(+), 12 deletions(-) create mode 100644 api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/AbstractBulkUpdater.java create mode 100644 api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/BulkUpdatingModel.java create mode 100644 api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/BulkUpdatingOntModel.java create mode 100644 api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/RDFServiceBulkUpdater.java create mode 100644 api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/SparqlBulkUpdater.java diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/AbstractBulkUpdater.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/AbstractBulkUpdater.java new file mode 100644 index 000000000..1ddd8d26d --- /dev/null +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/AbstractBulkUpdater.java @@ -0,0 +1,51 @@ +/* $This file is distributed under the terms of the license in /doc/license.txt$ */ + +package edu.cornell.mannlib.vitro.webapp.rdfservice.adapters; + +import org.apache.jena.graph.Graph; +import org.apache.jena.rdf.model.Model; +import org.apache.jena.rdf.model.ModelFactory; +import org.apache.jena.rdf.model.Statement; +import org.apache.jena.rdf.model.StmtIterator; + +public abstract class AbstractBulkUpdater { + public void add(Graph g) { + Model[] model = separateStatementsWithBlankNodes(g); + performAddModel(model[1] /* nonBlankNodeModel */); + // replace following call with different method + performAddModel(model[0] /*blankNodeModel*/); + } + + public void remove(Graph g) { + performRemoveModel(ModelFactory.createModelForGraph(g)); + } + + public void removeAll() { + performRemoveAll(); + } + + protected abstract void performAddModel(Model model); + + protected abstract void performRemoveModel(Model model); + + protected abstract void performRemoveAll(); + + private Model[] separateStatementsWithBlankNodes(Graph g) { + Model gm = ModelFactory.createModelForGraph(g); + Model blankNodeModel = ModelFactory.createDefaultModel(); + Model nonBlankNodeModel = ModelFactory.createDefaultModel(); + StmtIterator sit = gm.listStatements(); + while (sit.hasNext()) { + Statement stmt = sit.nextStatement(); + if (!stmt.getSubject().isAnon() && !stmt.getObject().isAnon()) { + nonBlankNodeModel.add(stmt); + } else { + blankNodeModel.add(stmt); + } + } + Model[] result = new Model[2]; + result[0] = blankNodeModel; + result[1] = nonBlankNodeModel; + return result; + } +} diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/BulkUpdatingModel.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/BulkUpdatingModel.java new file mode 100644 index 000000000..5476cd1aa --- /dev/null +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/BulkUpdatingModel.java @@ -0,0 +1,211 @@ +/* $This file is distributed under the terms of the license in /doc/license.txt$ */ + +package edu.cornell.mannlib.vitro.webapp.rdfservice.adapters; + +import edu.cornell.mannlib.vitro.webapp.dao.jena.RDFServiceGraph; +import edu.cornell.mannlib.vitro.webapp.dao.jena.SparqlGraph; +import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; +import org.apache.jena.graph.Graph; +import org.apache.jena.graph.Triple; +import org.apache.jena.rdf.model.Model; +import org.apache.jena.rdf.model.ModelFactory; +import org.apache.jena.rdf.model.RDFReaderF; +import org.apache.jena.rdf.model.Statement; +import org.apache.jena.rdf.model.StmtIterator; +import org.apache.jena.rdf.model.impl.RDFReaderFImpl; +import org.apache.jena.rdf.model.impl.StatementImpl; +import org.apache.jena.shared.WrappedIOException; +import org.apache.jena.sparql.graph.GraphFactory; +import org.apache.jena.util.iterator.Map1; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.net.URL; +import java.util.Iterator; +import java.util.List; + +public class BulkUpdatingModel extends AbstractModelDecorator { + private static final RDFReaderF readerFactory = new RDFReaderFImpl(); + private Graph graph; + private AbstractBulkUpdater updater; + + protected BulkUpdatingModel(Model m) { + super(m); + graph = m.getGraph(); + if (graph instanceof RDFServiceGraph) { + updater = new RDFServiceBulkUpdater((RDFServiceGraph)graph); + } else if (graph instanceof SparqlGraph) { + updater = new SparqlBulkUpdater((SparqlGraph)graph); + } else { + updater = null; + } + } + + @Override + public Model add(StmtIterator iter) { + if (updater != null && iter != null) { + Graph g = GraphFactory.createPlainGraph(); + while (iter.hasNext()) { + g.add(iter.nextStatement().asTriple()); + } + updater.add(g); + } else { + super.add(iter); + } + return this; + } + + @Override + public Model add(Model m) { + if (updater != null && m != null) { + updater.add(m.getGraph()); + } else { + super.add(m); + } + return this; + } + + @Override + public Model add(Statement[] statements) { + if (updater != null && statements != null) { + Graph g = GraphFactory.createPlainGraph(); + for (Statement s : statements) { + g.add(s.asTriple()); + } + updater.add(g); + } else { + super.add(statements); + } + return this; + } + + @Override + public Model add(List statements) { + add(statements.toArray(new Statement[statements.size()])); + return this; + } + + @Override + public Model read(String url) { + Model m = ModelFactory.createDefaultModel(); + readerFactory.getReader().read(m, url); + return add(m); + } + + @Override + public Model read(Reader reader, String base) { + Model m = ModelFactory.createDefaultModel(); + readerFactory.getReader().read(m, reader, base); + return add(m); + } + + @Override + public Model read(InputStream reader, String base) { + Model m = ModelFactory.createDefaultModel(); + readerFactory.getReader().read(m, reader, base); + return add(m); + } + + @Override + public Model read(String url, String lang) { + Model m = ModelFactory.createDefaultModel(); + readerFactory.getReader(lang).read(m, url); + return add(m); + } + + @Override + public Model read(String url, String base, String lang) { + try { + InputStream is = new URL(url).openStream(); + try { + read(is, base, lang); + } finally { + if (null != is) { + is.close(); + } + } + } catch (IOException e) { + throw new WrappedIOException(e); + } + return this; + } + + @Override + public Model read(Reader reader, String base, String lang) { + Model m = ModelFactory.createDefaultModel(); + readerFactory.getReader(lang).read(m, reader, base); + return add(m); + } + + @Override + public Model read(InputStream reader, String base, String lang) { + Model m = ModelFactory.createDefaultModel(); + readerFactory.getReader(lang).read(m, reader, base); + return add(m); + } + + @Override + public Model remove(StmtIterator iter) { + if (updater != null && iter != null) { + Graph g = GraphFactory.createPlainGraph(); + while (iter.hasNext()) { + g.add(iter.nextStatement().asTriple()); + } + updater.remove(g); + } else { + super.remove(iter); + } + return this; + } + + @Override + public Model remove(Model m) { + if (updater != null && m != null) { + updater.remove(m.getGraph()); + } else { + super.remove(m); + } + return this; + } + + @Override + public Model remove(Statement[] statements) { + if (updater != null && statements != null) { + Graph g = GraphFactory.createPlainGraph(); + for (Statement s : statements) { + g.add(s.asTriple()); + } + updater.remove(g); + } else { + super.remove(statements); + } + return this; + } + + @Override + public Model remove(List statements) { + if (updater != null && statements != null) { + Graph g = GraphFactory.createPlainGraph(); + for (Statement s : statements) { + g.add(s.asTriple()); + } + updater.remove(g); + } else { + super.remove(statements); + } + return this; + } + + @Override + public Model removeAll() { + if (updater != null) { + updater.removeAll(); + } else { + super.removeAll(); + } + return this; + } +} + + diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/BulkUpdatingOntModel.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/BulkUpdatingOntModel.java new file mode 100644 index 000000000..c7fe01d98 --- /dev/null +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/BulkUpdatingOntModel.java @@ -0,0 +1,206 @@ +/* $This file is distributed under the terms of the license in /doc/license.txt$ */ + +package edu.cornell.mannlib.vitro.webapp.rdfservice.adapters; + +import edu.cornell.mannlib.vitro.webapp.dao.jena.RDFServiceGraph; +import edu.cornell.mannlib.vitro.webapp.dao.jena.SparqlGraph; +import org.apache.jena.graph.Graph; +import org.apache.jena.ontology.OntModel; +import org.apache.jena.rdf.model.Model; +import org.apache.jena.rdf.model.ModelFactory; +import org.apache.jena.rdf.model.RDFReaderF; +import org.apache.jena.rdf.model.Statement; +import org.apache.jena.rdf.model.StmtIterator; +import org.apache.jena.rdf.model.impl.RDFReaderFImpl; +import org.apache.jena.rdf.model.impl.StatementImpl; +import org.apache.jena.shared.WrappedIOException; +import org.apache.jena.sparql.graph.GraphFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.net.URL; +import java.util.List; + +public class BulkUpdatingOntModel extends AbstractOntModelDecorator { + private static final RDFReaderF readerFactory = new RDFReaderFImpl(); + private Graph graph; + private AbstractBulkUpdater updater; + + protected BulkUpdatingOntModel(OntModel m) { + super(m); + graph = m.getGraph(); + if (graph instanceof RDFServiceGraph) { + updater = new RDFServiceBulkUpdater((RDFServiceGraph)graph); + } else if (graph instanceof SparqlGraph) { + updater = new SparqlBulkUpdater((SparqlGraph)graph); + } else { + updater = null; + } + } + + @Override + public Model add(StmtIterator iter) { + if (updater != null && iter != null) { + Graph g = GraphFactory.createPlainGraph(); + while (iter.hasNext()) { + g.add(iter.nextStatement().asTriple()); + } + updater.add(g); + } else { + super.add(iter); + } + return this; + } + + @Override + public Model add(Model m) { + if (updater != null && m != null) { + updater.add(m.getGraph()); + } else { + super.add(m); + } + return this; + } + + @Override + public Model add(Statement[] statements) { + if (updater != null && statements != null) { + Graph g = GraphFactory.createPlainGraph(); + for (Statement s : statements) { + g.add(s.asTriple()); + } + updater.add(g); + } else { + super.add(statements); + } + return this; + } + + @Override + public Model add(List statements) { + add(statements.toArray(new Statement[statements.size()])); + return this; + } + + @Override + public Model read(String url) { + Model m = ModelFactory.createDefaultModel(); + readerFactory.getReader().read(m, url); + return add(m); + } + + @Override + public Model read(Reader reader, String base) { + Model m = ModelFactory.createDefaultModel(); + readerFactory.getReader().read(m, reader, base); + return add(m); + } + + @Override + public Model read(InputStream reader, String base) { + Model m = ModelFactory.createDefaultModel(); + readerFactory.getReader().read(m, reader, base); + return add(m); + } + + @Override + public Model read(String url, String lang) { + Model m = ModelFactory.createDefaultModel(); + readerFactory.getReader(lang).read(m, url); + return add(m); + } + + @Override + public Model read(String url, String base, String lang) { + try { + InputStream is = new URL(url).openStream(); + try { + read(is, base, lang); + } finally { + if (null != is) { + is.close(); + } + } + } catch (IOException e) { + throw new WrappedIOException(e); + } + return this; + } + + @Override + public Model read(Reader reader, String base, String lang) { + Model m = ModelFactory.createDefaultModel(); + readerFactory.getReader(lang).read(m, reader, base); + return add(m); + } + + @Override + public Model read(InputStream reader, String base, String lang) { + Model m = ModelFactory.createDefaultModel(); + readerFactory.getReader(lang).read(m, reader, base); + return add(m); + } + + @Override + public Model remove(StmtIterator iter) { + if (updater != null && iter != null) { + Graph g = GraphFactory.createPlainGraph(); + while (iter.hasNext()) { + g.add(iter.nextStatement().asTriple()); + } + updater.remove(g); + } else { + super.remove(iter); + } + return this; + } + + @Override + public Model remove(Model m) { + if (updater != null && m != null) { + updater.remove(m.getGraph()); + } else { + super.remove(m); + } + return this; + } + + @Override + public Model remove(Statement[] statements) { + if (updater != null && statements != null) { + Graph g = GraphFactory.createPlainGraph(); + for (Statement s : statements) { + g.add(s.asTriple()); + } + updater.remove(g); + } else { + super.remove(statements); + } + return this; + } + + @Override + public Model remove(List statements) { + if (updater != null && statements != null) { + Graph g = GraphFactory.createPlainGraph(); + for (Statement s : statements) { + g.add(s.asTriple()); + } + updater.remove(g); + } else { + super.remove(statements); + } + return this; + } + + @Override + public Model removeAll() { + if (updater != null) { + updater.removeAll(); + } else { + super.removeAll(); + } + return this; + } +} diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/RDFServiceBulkUpdater.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/RDFServiceBulkUpdater.java new file mode 100644 index 000000000..81f6f8693 --- /dev/null +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/RDFServiceBulkUpdater.java @@ -0,0 +1,102 @@ +/* $This file is distributed under the terms of the license in /doc/license.txt$ */ + +package edu.cornell.mannlib.vitro.webapp.rdfservice.adapters; + +import edu.cornell.mannlib.vitro.webapp.dao.jena.RDFServiceGraph; +import edu.cornell.mannlib.vitro.webapp.rdfservice.ChangeSet; +import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFService; +import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException; +import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.RDFServiceUtils; +import org.apache.jena.graph.GraphEvents; +import org.apache.jena.graph.Node; +import org.apache.jena.rdf.model.Model; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +public class RDFServiceBulkUpdater extends AbstractBulkUpdater { + RDFServiceGraph graph; + + public RDFServiceBulkUpdater(RDFServiceGraph graph) { + this.graph = graph; + } + + @Override + protected void performAddModel(Model model) { + ChangeSet changeSet = graph.getRDFService().manufactureChangeSet(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + model.write(out, "N-TRIPLE"); + changeSet.addAddition(new ByteArrayInputStream( + out.toByteArray()), RDFService.ModelSerializationFormat.N3, + graph.getGraphURI()); + try { + graph.getRDFService().changeSetUpdate(changeSet); + } catch (RDFServiceException rdfse) { + throw new RuntimeException(rdfse); + } + } + + @Override + protected void performRemoveModel(Model model) { + ChangeSet changeSet = graph.getRDFService().manufactureChangeSet(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + model.write(out, "N-TRIPLE"); + changeSet.addRemoval(new ByteArrayInputStream( + out.toByteArray()), RDFService.ModelSerializationFormat.N3, + graph.getGraphURI()); + try { + graph.getRDFService().changeSetUpdate(changeSet); + } catch (RDFServiceException rdfse) { + throw new RuntimeException(rdfse); + } + } + + @Override + protected void performRemoveAll() { + String graphURI = graph.getGraphURI(); + + String findPattern = "?s ?p ?o"; + + StringBuffer findQuery = new StringBuffer("CONSTRUCT { ") + .append(findPattern) + .append(" } WHERE { \n"); + if (graphURI != null) { + findQuery.append(" GRAPH <" + graphURI + "> { "); + } + findQuery.append(findPattern); + if (graphURI != null) { + findQuery.append(" } "); + } + findQuery.append("\n}"); + + String queryString = findQuery.toString(); + + int chunkSize = 50000; + boolean done = false; + + while (!done) { + String chunkQueryString = queryString + " LIMIT " + chunkSize; + + try { + Model chunkToRemove = RDFServiceUtils.parseModel( + graph.getRDFService().sparqlConstructQuery( + chunkQueryString, RDFService.ModelSerializationFormat.N3), + RDFService.ModelSerializationFormat.N3); + if (chunkToRemove.size() > 0) { + ChangeSet cs = graph.getRDFService().manufactureChangeSet(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + chunkToRemove.write(out, "N-TRIPLE"); + cs.addRemoval(new ByteArrayInputStream(out.toByteArray()), + RDFService.ModelSerializationFormat.N3, graphURI); + graph.getRDFService().changeSetUpdate(cs); + } else { + done = true; + } + } catch (RDFServiceException e) { + throw new RuntimeException(e); + } + } + + graph.getEventManager().notifyEvent(graph, GraphEvents.removeAll); + } +} diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/SparqlBulkUpdater.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/SparqlBulkUpdater.java new file mode 100644 index 000000000..5435db12b --- /dev/null +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/SparqlBulkUpdater.java @@ -0,0 +1,83 @@ +/* $This file is distributed under the terms of the license in /doc/license.txt$ */ + +package edu.cornell.mannlib.vitro.webapp.rdfservice.adapters; + +import edu.cornell.mannlib.vitro.webapp.dao.jena.SparqlGraph; +import org.apache.jena.graph.GraphEvents; +import org.apache.jena.graph.GraphUtil; +import org.apache.jena.graph.Triple; +import org.apache.jena.rdf.model.Model; +import org.apache.jena.rdf.model.ModelFactory; +import org.apache.jena.rdf.model.StmtIterator; +import org.apache.jena.util.iterator.ExtendedIterator; + +import java.io.StringWriter; + +public class SparqlBulkUpdater extends AbstractBulkUpdater { + private SparqlGraph graph; + + public SparqlBulkUpdater(SparqlGraph graph) { + this.graph = graph; + } + + @Override + protected void performAddModel(Model model) { + verbModel(model, "INSERT"); + } + + @Override + protected void performRemoveModel(Model model) { + verbModel(model, "DELETE"); + } + + private void verbModel(Model model, String verb) { + Model m = ModelFactory.createDefaultModel(); + int testLimit = 1000; + StmtIterator stmtIt = model.listStatements(); + int count = 0; + try { + while (stmtIt.hasNext()) { + count++; + m.add(stmtIt.nextStatement()); + if (count % testLimit == 0 || !stmtIt.hasNext()) { + StringWriter sw = new StringWriter(); + m.write(sw, "N-TRIPLE"); + StringBuffer updateStringBuff = new StringBuffer(); + String graphURI = graph.getGraphURI(); + updateStringBuff.append(verb + " DATA { " + ((graphURI != null) ? "GRAPH <" + graphURI + "> { " : "" )); + updateStringBuff.append(sw); + updateStringBuff.append(((graphURI != null) ? " } " : "") + " }"); + + String updateString = updateStringBuff.toString(); + + //log.info(updateString); + + graph.executeUpdate(updateString); + + m.removeAll(); + } + } + } finally { + stmtIt.close(); + } + } + + @Override + protected void performRemoveAll() { + ExtendedIterator it = GraphUtil.findAll(graph); + try { + while (it.hasNext()) { + Triple t = it.next(); + graph.delete(t); + it.remove(); + } + } finally { + it.close(); + } + + // get rid of remaining blank nodes using a SPARQL DELETE + graph.removeAll(); + + graph.getEventManager().notifyEvent(graph, GraphEvents.removeAll); + } +} diff --git a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/VitroModelFactory.java b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/VitroModelFactory.java index eaf7db882..df2cff649 100644 --- a/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/VitroModelFactory.java +++ b/api/src/main/java/edu/cornell/mannlib/vitro/webapp/rdfservice/adapters/VitroModelFactory.java @@ -35,33 +35,34 @@ public class VitroModelFactory { Graph graph = model.getGraph(); Model bareModel = new ModelCom(graph); OntModel ontModel = new OntModelImpl(OWL_MEM, bareModel); - return ontModel; + return new BulkUpdatingOntModel(ontModel); } public static Model createUnion(Model baseModel, Model plusModel) { Graph baseGraph = baseModel.getGraph(); Graph plusGraph = plusModel.getGraph(); - BulkUpdatingUnion unionGraph = new BulkUpdatingUnion(baseGraph, - plusGraph); + + BulkUpdatingUnion unionGraph = new BulkUpdatingUnion(baseGraph, plusGraph); Model unionModel = ModelFactory.createModelForGraph(unionGraph); - return unionModel; + + return new BulkUpdatingModel(unionModel); } public static OntModel createUnion(OntModel baseModel, OntModel plusModel) { Graph baseGraph = baseModel.getGraph(); Graph plusGraph = plusModel.getGraph(); - BulkUpdatingUnion unionGraph = new BulkUpdatingUnion(baseGraph, - plusGraph); + + BulkUpdatingUnion unionGraph = new BulkUpdatingUnion(baseGraph, plusGraph); Model unionModel = ModelFactory.createModelForGraph(unionGraph); - OntModel unionOntModel = ModelFactory.createOntologyModel(OWL_MEM, - unionModel); - return unionOntModel; + OntModel unionOntModel = ModelFactory.createOntologyModel(OWL_MEM, unionModel); + + return new BulkUpdatingOntModel(unionOntModel); } public static Model createModelForGraph(Graph g) { - return ModelFactory.createModelForGraph(g); + return new BulkUpdatingModel(ModelFactory.createModelForGraph(g)); } private static class BulkUpdatingUnion extends Union { @@ -76,7 +77,5 @@ public class VitroModelFactory { + ToString.graphToString(L) + ", R=" + ToString.graphToString(R) + "]"; } - - } }