[VIVO-1122] Fix to the way resources are consumed in the SPARQL clients, so that it doesn't leak / tie up HTTP connections.
This commit is contained in:
parent
046d445639
commit
817e90716c
4 changed files with 122 additions and 74 deletions
|
@ -7,6 +7,7 @@ import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import edu.cornell.mannlib.vitro.webapp.utils.http.HttpClientFactory;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.http.HttpResponse;
|
import org.apache.http.HttpResponse;
|
||||||
|
@ -44,6 +45,7 @@ import com.hp.hpl.jena.util.iterator.SingletonIterator;
|
||||||
import com.hp.hpl.jena.util.iterator.WrappedIterator;
|
import com.hp.hpl.jena.util.iterator.WrappedIterator;
|
||||||
|
|
||||||
import edu.cornell.mannlib.vitro.webapp.utils.logging.ToString;
|
import edu.cornell.mannlib.vitro.webapp.utils.logging.ToString;
|
||||||
|
import org.apache.http.util.EntityUtils;
|
||||||
|
|
||||||
public class SparqlGraph implements GraphWithPerform {
|
public class SparqlGraph implements GraphWithPerform {
|
||||||
|
|
||||||
|
@ -73,9 +75,7 @@ public class SparqlGraph implements GraphWithPerform {
|
||||||
this.endpointURI = endpointURI;
|
this.endpointURI = endpointURI;
|
||||||
this.graphURI = graphURI;
|
this.graphURI = graphURI;
|
||||||
|
|
||||||
PoolingClientConnectionManager cm = new PoolingClientConnectionManager();
|
this.httpClient = HttpClientFactory.getHttpClient();
|
||||||
cm.setDefaultMaxPerRoute(50);
|
|
||||||
this.httpClient = new DefaultHttpClient(cm);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getEndpointURI() {
|
public String getEndpointURI() {
|
||||||
|
@ -92,20 +92,26 @@ public class SparqlGraph implements GraphWithPerform {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void executeUpdate(String updateString) {
|
public void executeUpdate(String updateString) {
|
||||||
|
HttpPost meth = new HttpPost(endpointURI);
|
||||||
try {
|
try {
|
||||||
HttpPost meth = new HttpPost(endpointURI);
|
|
||||||
meth.addHeader("Content-Type", "application/x-www-form-urlencoded");
|
meth.addHeader("Content-Type", "application/x-www-form-urlencoded");
|
||||||
meth.setEntity(new UrlEncodedFormEntity(Arrays.asList(
|
meth.setEntity(new UrlEncodedFormEntity(Arrays.asList(
|
||||||
new BasicNameValuePair("update", updateString))));
|
new BasicNameValuePair("update", updateString))));
|
||||||
HttpResponse response = httpClient.execute(meth);
|
HttpResponse response = httpClient.execute(meth);
|
||||||
int statusCode = response.getStatusLine().getStatusCode();
|
try {
|
||||||
if (statusCode > 399) {
|
int statusCode = response.getStatusLine().getStatusCode();
|
||||||
log.error("response " + statusCode + " to update. \n");
|
if (statusCode > 399) {
|
||||||
throw new RuntimeException("Unable to perform SPARQL UPDATE: \n"
|
log.error("response " + statusCode + " to update. \n");
|
||||||
+ updateString);
|
throw new RuntimeException("Unable to perform SPARQL UPDATE: \n"
|
||||||
|
+ updateString);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
EntityUtils.consume(response.getEntity());
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException("Unable to perform SPARQL UPDATE", e);
|
throw new RuntimeException("Unable to perform SPARQL UPDATE", e);
|
||||||
|
} finally {
|
||||||
|
meth.abort();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,10 +16,12 @@ import java.util.List;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
|
||||||
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
|
import edu.cornell.mannlib.vitro.webapp.rdfservice.ResultSetConsumer;
|
||||||
|
import edu.cornell.mannlib.vitro.webapp.utils.http.HttpClientFactory;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.http.HttpResponse;
|
import org.apache.http.HttpResponse;
|
||||||
|
import org.apache.http.client.HttpClient;
|
||||||
import org.apache.http.client.entity.UrlEncodedFormEntity;
|
import org.apache.http.client.entity.UrlEncodedFormEntity;
|
||||||
import org.apache.http.client.methods.HttpGet;
|
import org.apache.http.client.methods.HttpGet;
|
||||||
import org.apache.http.client.methods.HttpPost;
|
import org.apache.http.client.methods.HttpPost;
|
||||||
|
@ -27,6 +29,7 @@ import org.apache.http.client.utils.URIBuilder;
|
||||||
import org.apache.http.impl.client.DefaultHttpClient;
|
import org.apache.http.impl.client.DefaultHttpClient;
|
||||||
import org.apache.http.impl.conn.PoolingClientConnectionManager;
|
import org.apache.http.impl.conn.PoolingClientConnectionManager;
|
||||||
import org.apache.http.message.BasicNameValuePair;
|
import org.apache.http.message.BasicNameValuePair;
|
||||||
|
import org.apache.http.util.EntityUtils;
|
||||||
import org.apache.jena.riot.RDFDataMgr;
|
import org.apache.jena.riot.RDFDataMgr;
|
||||||
|
|
||||||
import com.hp.hpl.jena.graph.Triple;
|
import com.hp.hpl.jena.graph.Triple;
|
||||||
|
@ -70,12 +73,13 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
|
||||||
private static final Log log = LogFactory.getLog(RDFServiceImpl.class);
|
private static final Log log = LogFactory.getLog(RDFServiceImpl.class);
|
||||||
protected String readEndpointURI;
|
protected String readEndpointURI;
|
||||||
protected String updateEndpointURI;
|
protected String updateEndpointURI;
|
||||||
protected DefaultHttpClient httpClient;
|
|
||||||
// the number of triples to be
|
// the number of triples to be
|
||||||
private static final int CHUNK_SIZE = 1000; // added/removed in a single
|
private static final int CHUNK_SIZE = 1000; // added/removed in a single
|
||||||
// SPARQL UPDATE
|
// SPARQL UPDATE
|
||||||
|
|
||||||
/**
|
protected HttpClient httpClient;
|
||||||
|
|
||||||
|
/**
|
||||||
* Returns an RDFService for a remote repository
|
* Returns an RDFService for a remote repository
|
||||||
* @param String - URI of the read SPARQL endpoint for the knowledge base
|
* @param String - URI of the read SPARQL endpoint for the knowledge base
|
||||||
* @param String - URI of the update SPARQL endpoint for the knowledge base
|
* @param String - URI of the update SPARQL endpoint for the knowledge base
|
||||||
|
@ -89,10 +93,7 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
|
||||||
public RDFServiceSparql(String readEndpointURI, String updateEndpointURI, String defaultWriteGraphURI) {
|
public RDFServiceSparql(String readEndpointURI, String updateEndpointURI, String defaultWriteGraphURI) {
|
||||||
this.readEndpointURI = readEndpointURI;
|
this.readEndpointURI = readEndpointURI;
|
||||||
this.updateEndpointURI = updateEndpointURI;
|
this.updateEndpointURI = updateEndpointURI;
|
||||||
|
httpClient = HttpClientFactory.getHttpClient();
|
||||||
PoolingClientConnectionManager cm = new PoolingClientConnectionManager();
|
|
||||||
cm.setDefaultMaxPerRoute(50);
|
|
||||||
this.httpClient = new DefaultHttpClient(cm);
|
|
||||||
|
|
||||||
testConnection();
|
testConnection();
|
||||||
}
|
}
|
||||||
|
@ -302,35 +303,39 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
|
||||||
HttpGet meth = new HttpGet(new URIBuilder(readEndpointURI).addParameter("query", queryStr).build());
|
HttpGet meth = new HttpGet(new URIBuilder(readEndpointURI).addParameter("query", queryStr).build());
|
||||||
meth.addHeader("Accept", "application/sparql-results+xml");
|
meth.addHeader("Accept", "application/sparql-results+xml");
|
||||||
HttpResponse response = httpClient.execute(meth);
|
HttpResponse response = httpClient.execute(meth);
|
||||||
int statusCode = response.getStatusLine().getStatusCode();
|
try {
|
||||||
if (statusCode > 399) {
|
int statusCode = response.getStatusLine().getStatusCode();
|
||||||
log.error("response " + statusCode + " to query. \n");
|
if (statusCode > 399) {
|
||||||
log.debug("update string: \n" + queryStr);
|
log.error("response " + statusCode + " to query. \n");
|
||||||
throw new RDFServiceException("Unable to perform SPARQL UPDATE");
|
log.debug("update string: \n" + queryStr);
|
||||||
}
|
throw new RDFServiceException("Unable to perform SPARQL UPDATE");
|
||||||
|
|
||||||
try (InputStream in = response.getEntity().getContent()) {
|
|
||||||
ResultSet resultSet = ResultSetFactory.fromXML(in);
|
|
||||||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
|
||||||
switch (resultFormat) {
|
|
||||||
case CSV:
|
|
||||||
ResultSetFormatter.outputAsCSV(outputStream, resultSet);
|
|
||||||
break;
|
|
||||||
case TEXT:
|
|
||||||
ResultSetFormatter.out(outputStream, resultSet);
|
|
||||||
break;
|
|
||||||
case JSON:
|
|
||||||
ResultSetFormatter.outputAsJSON(outputStream, resultSet);
|
|
||||||
break;
|
|
||||||
case XML:
|
|
||||||
ResultSetFormatter.outputAsXML(outputStream, resultSet);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
throw new RDFServiceException("unrecognized result format");
|
|
||||||
}
|
}
|
||||||
InputStream result = new ByteArrayInputStream(
|
|
||||||
outputStream.toByteArray());
|
try (InputStream in = response.getEntity().getContent()) {
|
||||||
return result;
|
ResultSet resultSet = ResultSetFactory.fromXML(in);
|
||||||
|
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||||
|
switch (resultFormat) {
|
||||||
|
case CSV:
|
||||||
|
ResultSetFormatter.outputAsCSV(outputStream, resultSet);
|
||||||
|
break;
|
||||||
|
case TEXT:
|
||||||
|
ResultSetFormatter.out(outputStream, resultSet);
|
||||||
|
break;
|
||||||
|
case JSON:
|
||||||
|
ResultSetFormatter.outputAsJSON(outputStream, resultSet);
|
||||||
|
break;
|
||||||
|
case XML:
|
||||||
|
ResultSetFormatter.outputAsXML(outputStream, resultSet);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new RDFServiceException("unrecognized result format");
|
||||||
|
}
|
||||||
|
InputStream result = new ByteArrayInputStream(
|
||||||
|
outputStream.toByteArray());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
EntityUtils.consume(response.getEntity());
|
||||||
}
|
}
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
throw new RuntimeException(ioe);
|
throw new RuntimeException(ioe);
|
||||||
|
@ -347,15 +352,19 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
|
||||||
HttpGet meth = new HttpGet(new URIBuilder(readEndpointURI).addParameter("query", queryStr).build());
|
HttpGet meth = new HttpGet(new URIBuilder(readEndpointURI).addParameter("query", queryStr).build());
|
||||||
meth.addHeader("Accept", "application/sparql-results+xml");
|
meth.addHeader("Accept", "application/sparql-results+xml");
|
||||||
HttpResponse response = httpClient.execute(meth);
|
HttpResponse response = httpClient.execute(meth);
|
||||||
int statusCode = response.getStatusLine().getStatusCode();
|
try {
|
||||||
if (statusCode > 399) {
|
int statusCode = response.getStatusLine().getStatusCode();
|
||||||
log.error("response " + statusCode + " to query. \n");
|
if (statusCode > 399) {
|
||||||
log.debug("update string: \n" + queryStr);
|
log.error("response " + statusCode + " to query. \n");
|
||||||
throw new RDFServiceException("Unable to perform SPARQL UPDATE");
|
log.debug("update string: \n" + queryStr);
|
||||||
}
|
throw new RDFServiceException("Unable to perform SPARQL UPDATE");
|
||||||
|
}
|
||||||
|
|
||||||
try (InputStream in = response.getEntity().getContent()) {
|
try (InputStream in = response.getEntity().getContent()) {
|
||||||
consumer.processResultSet(ResultSetFactory.fromXML(in));
|
consumer.processResultSet(ResultSetFactory.fromXML(in));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
EntityUtils.consume(response.getEntity());
|
||||||
}
|
}
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
throw new RuntimeException(ioe);
|
throw new RuntimeException(ioe);
|
||||||
|
@ -498,13 +507,17 @@ public class RDFServiceSparql extends RDFServiceImpl implements RDFService {
|
||||||
HttpPost meth = new HttpPost(updateEndpointURI);
|
HttpPost meth = new HttpPost(updateEndpointURI);
|
||||||
meth.addHeader("Content-Type", "application/x-www-form-urlencoded");
|
meth.addHeader("Content-Type", "application/x-www-form-urlencoded");
|
||||||
meth.setEntity(new UrlEncodedFormEntity(Arrays.asList(new BasicNameValuePair("update", updateString))));
|
meth.setEntity(new UrlEncodedFormEntity(Arrays.asList(new BasicNameValuePair("update", updateString))));
|
||||||
HttpResponse response = httpClient.execute(meth);
|
HttpResponse response = httpClient.execute(meth);
|
||||||
int statusCode = response.getStatusLine().getStatusCode();
|
try {
|
||||||
if (statusCode > 399) {
|
int statusCode = response.getStatusLine().getStatusCode();
|
||||||
log.error("response " + response.getStatusLine() + " to update. \n");
|
if (statusCode > 399) {
|
||||||
//log.debug("update string: \n" + updateString);
|
log.error("response " + response.getStatusLine() + " to update. \n");
|
||||||
throw new RDFServiceException("Unable to perform SPARQL UPDATE");
|
//log.debug("update string: \n" + updateString);
|
||||||
}
|
throw new RDFServiceException("Unable to perform SPARQL UPDATE");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
EntityUtils.consume(response.getEntity());
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RDFServiceException("Unable to perform change set update", e);
|
throw new RDFServiceException("Unable to perform change set update", e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
|
|
||||||
|
import org.apache.commons.httpclient.HttpMethod;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -23,6 +24,7 @@ import org.apache.http.protocol.HttpContext;
|
||||||
|
|
||||||
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException;
|
import edu.cornell.mannlib.vitro.webapp.rdfservice.RDFServiceException;
|
||||||
import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.sparql.RDFServiceSparql;
|
import edu.cornell.mannlib.vitro.webapp.rdfservice.impl.sparql.RDFServiceSparql;
|
||||||
|
import org.apache.http.util.EntityUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For now, at least, it is just like an RDFServiceSparql except:
|
* For now, at least, it is just like an RDFServiceSparql except:
|
||||||
|
@ -78,22 +80,27 @@ public class RDFServiceVirtuoso extends RDFServiceSparql {
|
||||||
log.debug("UPDATE STRING: " + updateString);
|
log.debug("UPDATE STRING: " + updateString);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
HttpPost request = createHttpRequest(updateString);
|
||||||
HttpResponse response = httpClient.execute(
|
HttpResponse response = httpClient.execute(
|
||||||
createHttpRequest(updateString), createHttpContext());
|
request, createHttpContext());
|
||||||
int statusCode = response.getStatusLine().getStatusCode();
|
try {
|
||||||
if (statusCode > 399) {
|
int statusCode = response.getStatusLine().getStatusCode();
|
||||||
log.error("response " + response.getStatusLine()
|
if (statusCode > 399) {
|
||||||
+ " to update. \n");
|
log.error("response " + response.getStatusLine()
|
||||||
|
+ " to update. \n");
|
||||||
|
|
||||||
try (InputStream content = response.getEntity().getContent()) {
|
try (InputStream content = response.getEntity().getContent()) {
|
||||||
for (String line : IOUtils.readLines(content)) {
|
for (String line : IOUtils.readLines(content)) {
|
||||||
log.error("response-line >>" + line);
|
log.error("response-line >>" + line);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
throw new RDFServiceException(
|
throw new RDFServiceException(
|
||||||
"Unable to perform SPARQL UPDATE: status code = "
|
"Unable to perform SPARQL UPDATE: status code = "
|
||||||
+ statusCode);
|
+ statusCode);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
EntityUtils.consume(response.getEntity());
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Failed to update: " + updateString, e);
|
log.error("Failed to update: " + updateString, e);
|
||||||
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
/* $This file is distributed under the terms of the license in /doc/license.txt$ */
|
||||||
|
|
||||||
|
package edu.cornell.mannlib.vitro.webapp.utils.http;
|
||||||
|
|
||||||
|
import org.apache.http.client.HttpClient;
|
||||||
|
import org.apache.http.impl.client.DefaultHttpClient;
|
||||||
|
import org.apache.http.impl.conn.PoolingClientConnectionManager;
|
||||||
|
|
||||||
|
public final class HttpClientFactory {
|
||||||
|
private static final DefaultHttpClient httpClient;
|
||||||
|
|
||||||
|
static {
|
||||||
|
PoolingClientConnectionManager cm = new PoolingClientConnectionManager();
|
||||||
|
cm.setDefaultMaxPerRoute(50);
|
||||||
|
cm.setMaxTotal(300);
|
||||||
|
httpClient = new DefaultHttpClient(cm);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static HttpClient getHttpClient() {
|
||||||
|
return httpClient;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue