diff --git a/contrib/pom.xml b/contrib/pom.xml index 2667cbcda..18586aaaf 100644 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -78,6 +78,11 @@ gson 2.8.6 + + org.xerial + sqlite-jdbc + 3.34.0 + diff --git a/contrib/src/main/java/org/archive/modules/postprocessor/TroughCrawlLogFeed.java b/contrib/src/main/java/org/archive/modules/postprocessor/TroughCrawlLogFeed.java index a2e3e1bf2..91001dd45 100644 --- a/contrib/src/main/java/org/archive/modules/postprocessor/TroughCrawlLogFeed.java +++ b/contrib/src/main/java/org/archive/modules/postprocessor/TroughCrawlLogFeed.java @@ -18,10 +18,12 @@ */ package org.archive.modules.postprocessor; + import java.net.MalformedURLException; import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.Date; -import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -89,8 +91,12 @@ public class TroughCrawlLogFeed extends Processor implements Lifecycle { protected static final Logger logger = Logger.getLogger(TroughCrawlLogFeed.class.getName()); - protected static final int BATCH_MAX_TIME_MS = 20 * 1000; + protected static final int BATCH_MAX_TIME_MS = 60 * 1000; protected static final int BATCH_MAX_SIZE = 400; + protected static final String CRAWLED_BATCH = "crawled"; + protected static final String UNCRAWLED_BATCH = "uncrawled"; + protected AtomicInteger crawledBatchSize = new AtomicInteger(0); + protected AtomicInteger uncrawledBatchSize = new AtomicInteger(0); protected KeyedProperties kp = new KeyedProperties(); public KeyedProperties getKeyedProperties() { @@ -119,15 +125,15 @@ public String getRethinkUrl() { protected TroughClient troughClient() throws MalformedURLException { if (troughClient == null) { - troughClient = new TroughClient(getRethinkUrl(), 60 * 60); + troughClient = new TroughClient(getRethinkUrl(), null); troughClient.start(); } return troughClient; } - protected List crawledBatch = new ArrayList(); + protected ConcurrentLinkedQueue crawledBatch = new ConcurrentLinkedQueue(); protected long crawledBatchLastTime = System.currentTimeMillis(); - protected List uncrawledBatch = new ArrayList(); + protected ConcurrentLinkedQueue uncrawledBatch = new ConcurrentLinkedQueue(); protected long uncrawledBatchLastTime = System.currentTimeMillis(); protected Frontier frontier; @@ -157,6 +163,22 @@ protected boolean shouldProcess(CrawlURI curi) { return false; } } + + protected boolean dumpPendingAtClose = true; + public boolean getDumpPendingAtClose() { + return dumpPendingAtClose; + } + public void setDumpPendingAtClose(boolean dumpPendingAtClose) { + this.dumpPendingAtClose = dumpPendingAtClose; + } + + protected boolean forceBatchPosting = false; + protected boolean getForceBatchPosting() { + return forceBatchPosting; + } + protected void setForceBatchPosting(boolean forceBatchPosting) { + this.forceBatchPosting = forceBatchPosting; + } @Override public synchronized void stop() { @@ -164,7 +186,9 @@ public synchronized void stop() { return; } if (!crawledBatch.isEmpty()) { - postCrawledBatch(); + setForceBatchPosting(true); + postBatch(crawledBatch, crawledBatchSize, CRAWLED_BATCH); + setForceBatchPosting(false); } if (frontier instanceof BdbFrontier) { @@ -177,9 +201,14 @@ public void execute(Object o) { } }; - logger.info("dumping " + frontier.queuedUriCount() + " queued urls to trough feed"); - ((BdbFrontier) frontier).forAllPendingDo(closure); - logger.info("dumped " + frontier.queuedUriCount() + " queued urls to trough feed"); + if(getDumpPendingAtClose()) { + logger.info("dumping " + frontier.queuedUriCount() + " queued urls to trough feed"); + setForceBatchPosting(true); + ((BdbFrontier) frontier).forAllPendingDo(closure); + postBatch(uncrawledBatch, uncrawledBatchSize, UNCRAWLED_BATCH); + setForceBatchPosting(false); + logger.info("dumped " + frontier.queuedUriCount() + " queued urls to trough feed"); + } } else { logger.warning("frontier is not a BdbFrontier, cannot dump queued urls to trough feed"); } @@ -223,12 +252,10 @@ protected void innerProcess(CrawlURI curi) throws InterruptedException { serverCache.getHostFor(curi.getUURI()).getHostName(), }; - synchronized (crawledBatch) { - crawledBatch.add(values); - } + crawledBatch.add(values); - if (crawledBatch.size() >= BATCH_MAX_SIZE || System.currentTimeMillis() - crawledBatchLastTime > BATCH_MAX_TIME_MS) { - postCrawledBatch(); + if (crawledBatchSize.incrementAndGet() >= BATCH_MAX_SIZE || System.currentTimeMillis() - crawledBatchLastTime > BATCH_MAX_TIME_MS) { + postBatch(crawledBatch, crawledBatchSize, CRAWLED_BATCH); } } else { Object[] values = new Object[] { @@ -241,72 +268,78 @@ protected void innerProcess(CrawlURI curi) throws InterruptedException { serverCache.getHostFor(curi.getUURI()).getHostName(), }; - synchronized (uncrawledBatch) { - uncrawledBatch.add(values); - } - - if (uncrawledBatch.size() >= BATCH_MAX_SIZE || System.currentTimeMillis() - uncrawledBatchLastTime > BATCH_MAX_TIME_MS) { - postUncrawledBatch(); + uncrawledBatch.add(values); + if (uncrawledBatchSize.incrementAndGet() >= BATCH_MAX_SIZE || System.currentTimeMillis() - uncrawledBatchLastTime > BATCH_MAX_TIME_MS) { + postBatch(uncrawledBatch, uncrawledBatchSize, UNCRAWLED_BATCH); } } } - protected void postCrawledBatch() { - logger.info("posting batch of " + crawledBatch.size() + " crawled urls trough segment " + getSegmentId()); - synchronized (crawledBatch) { - if (!crawledBatch.isEmpty()) { - StringBuffer sqlTmpl = new StringBuffer(); - sqlTmpl.append("insert into crawled_url (" - + "timestamp, status_code, size, payload_size, url, hop_path, is_seed_redirect, " - + "via, mimetype, content_digest, seed, is_duplicate, warc_filename, " - + "warc_offset, warc_content_bytes, host) values " - + "(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"); - for (int i = 1; i < crawledBatch.size(); i++) { - sqlTmpl.append(", (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"); + protected void postBatch(ConcurrentLinkedQueue batch, AtomicInteger batchSize, String batchType) { + ArrayList crawlLogLines = new ArrayList(); + synchronized (batch) { + //read and remove log lines from batch into local variable so we can exit the synchronized block asap + if (batchSize.get() >= BATCH_MAX_SIZE || getForceBatchPosting()) { + while(!batch.isEmpty()) { + Object[] crawlLine = batch.poll(); + if(crawlLine != null) + crawlLogLines.add(crawlLine); + else + break; } - - Object[] flattenedValues = new Object[16 * crawledBatch.size()]; - for (int i = 0; i < crawledBatch.size(); i++) { - System.arraycopy(crawledBatch.get(i), 0, flattenedValues, 16 * i, 16); - } - - try { - troughClient().write(getSegmentId(), sqlTmpl.toString(), flattenedValues); - } catch (Exception e) { - logger.log(Level.WARNING, "problem posting batch of " + crawledBatch.size() + " crawled urls to trough segment " + getSegmentId(), e); - } - - crawledBatchLastTime = System.currentTimeMillis(); - crawledBatch.clear(); + batchSize.getAndSet(batch.size()); //size() is O(n). Use sparingly. Should be near zero right now. + logger.info("posting batch of " + crawlLogLines.size() + " " + batchType + " urls to trough segment " + getSegmentId()); } } - } - protected void postUncrawledBatch() { - logger.info("posting batch of " + uncrawledBatch.size() + " uncrawled urls trough segment " + getSegmentId()); - synchronized (uncrawledBatch) { - if (!uncrawledBatch.isEmpty()) { - StringBuffer sqlTmpl = new StringBuffer(); - sqlTmpl.append( - "insert into uncrawled_url (timestamp, url, hop_path, status_code, via, seed, host)" - + " values (%s, %s, %s, %s, %s, %s, %s)"); - - for (int i = 1; i < uncrawledBatch.size(); i++) { - sqlTmpl.append(", (%s, %s, %s, %s, %s, %s, %s)"); - } + if( crawlLogLines != null && crawlLogLines.size() > 0){ + StringBuffer sqlTmpl = new StringBuffer(); + int numCols=0; + switch(batchType){ + case CRAWLED_BATCH: + sqlTmpl.append("insert into crawled_url (" + + "timestamp, status_code, size, payload_size, url, hop_path, is_seed_redirect, " + + "via, mimetype, content_digest, seed, is_duplicate, warc_filename, " + + "warc_offset, warc_content_bytes, host) values " + + "(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"); + for (int i = 1; i < crawlLogLines.size(); i++) { + sqlTmpl.append(", (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"); + } + numCols=16; + break; + case UNCRAWLED_BATCH: + sqlTmpl.append( + "insert into uncrawled_url (timestamp, url, hop_path, status_code, via, seed, host)" + + " values (%s, %s, %s, %s, %s, %s, %s)"); + + for (int i = 1; i < crawlLogLines.size(); i++) { + sqlTmpl.append(", (%s, %s, %s, %s, %s, %s, %s)"); + } + numCols=7; + break; + } - Object[] flattenedValues = new Object[7 * uncrawledBatch.size()]; - for (int i = 0; i < uncrawledBatch.size(); i++) { - System.arraycopy(uncrawledBatch.get(i), 0, flattenedValues, 7 * i, 7); - } + /* + Trough takes a single object array for the values we insert, so an array of size (N columns * X rows). + We read each row in column by column, then repeat for each row. + */ + Object[] flattenedValues = new Object[numCols * crawlLogLines.size()]; + for (int i = 0; i < crawlLogLines.size(); i++) { + System.arraycopy(crawlLogLines.get(i),0,flattenedValues, numCols * i,numCols); + } - try { - troughClient().write(getSegmentId(), sqlTmpl.toString(), flattenedValues); - } catch (Exception e) { - logger.log(Level.WARNING, "problem posting batch of " + uncrawledBatch.size() + " uncrawled urls to trough segment " + getSegmentId(), e); - } + try { + troughClient().write(getSegmentId(), sqlTmpl.toString(), flattenedValues); + } catch (Exception e) { + logger.log(Level.WARNING, "problem posting batch of " + crawlLogLines.size() + " " + batchType + " urls to trough segment " + getSegmentId(), e); + } - uncrawledBatchLastTime = System.currentTimeMillis(); - uncrawledBatch.clear(); + switch(batchType) { + case CRAWLED_BATCH: + crawledBatchLastTime = System.currentTimeMillis(); + break; + case UNCRAWLED_BATCH: + uncrawledBatchLastTime = System.currentTimeMillis(); + break; } } } diff --git a/contrib/src/main/java/org/archive/modules/recrawl/TroughContentDigestHistory.java b/contrib/src/main/java/org/archive/modules/recrawl/TroughContentDigestHistory.java index 7bf0b37db..f8b3cc1b0 100644 --- a/contrib/src/main/java/org/archive/modules/recrawl/TroughContentDigestHistory.java +++ b/contrib/src/main/java/org/archive/modules/recrawl/TroughContentDigestHistory.java @@ -5,6 +5,14 @@ import static org.archive.modules.recrawl.RecrawlAttributeConstants.A_WARC_RECORD_ID; import java.net.MalformedURLException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.ConcurrentHashMap; +import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -65,11 +73,30 @@ public String getRethinkUrl() { return (String) kp.get("rethinkUrl"); } + + { + setTroughSyncMaxBatchSize(400); + } + /** + * @param troughSyncMaxBatchSize number of dedup rows to buffer before syncing to Trough. + */ + public void setTroughSyncMaxBatchSize(int troughSyncMaxBatchSize) { kp.put("troughSyncMaxBatchSize", troughSyncMaxBatchSize); } + public int getTroughSyncMaxBatchSize() { return (int) kp.get("troughSyncMaxBatchSize"); } + + { + setTroughPromotionInterval(3600); + } + /** + * @param troughPromotionInterval number of seconds between runs of the Trough Client promoter thread. + */ + public void setTroughPromotionInterval(int troughPromotionInterval) { kp.put("troughPromotionInterval", troughPromotionInterval); } + public int getTroughPromotionInterval() { return (int) kp.get("troughPromotionInterval"); } + protected TroughClient troughClient = null; protected TroughClient troughClient() throws MalformedURLException { if (troughClient == null) { - troughClient = new TroughClient(getRethinkUrl(), 60 * 60); + troughClient = new TroughClient(getRethinkUrl(), getTroughPromotionInterval()); troughClient.start(); } return troughClient; @@ -81,12 +108,27 @@ protected TroughClient troughClient() throws MalformedURLException { + " url varchar(2100) not null,\n" + " date varchar(100) not null,\n" + " id varchar(100));\n"; // warc record id + protected static final String WRITE_SQL_TMPL = + "insert or ignore into dedup (digest_key, url, date, id)"; + + // row count that avoids table scan if no deletes + protected static final String COUNT_SQL = + "SELECT MAX(_ROWID_) FROM dedup LIMIT 1;"; + protected static final String DROP_TABLE_DEDUP_SQL = + "DROP table dedup;"; + protected static final String SELECT_ALL_SQL = + "SELECT * FROM dedup;"; + protected static final String DEDUP_QUERY_SQL = + "SELECT * FROM dedup WHERE digest_key = ? LIMIT 1"; + protected ConcurrentHashMap segmentCache = new ConcurrentHashMap(); + protected Connection dedupDbConnection = null; @Override public void onApplicationEvent(CrawlStateEvent event) { switch(event.getState()) { case PREPARING: try { + dedupDbConnection = DriverManager.getConnection("jdbc:sqlite::memory:"); // initializes TroughClient and starts promoter thread as a side effect troughClient().registerSchema(SCHEMA_ID, SCHEMA_SQL); } catch (Exception e) { @@ -106,10 +148,17 @@ public void onApplicationEvent(CrawlStateEvent event) { * necessarily finished processing.) */ if (troughClient != null) { + //force all in memory segments to sync to trough before final promotion + Enumeration segments = segmentCache.keys(); + while(segments.hasMoreElements()){ + String segmentId = segments.nextElement(); + syncToTrough(segmentId,true); + } troughClient.stop(); troughClient.promoteDirtySegments(); troughClient = null; } + dedupDbConnection = null; break; default: @@ -122,49 +171,177 @@ public void load(CrawlURI curi) { // WARCWriterProcessor knows it should put the info in there HashMap contentDigestHistory = curi.getContentDigestHistory(); - try { - String sql = "select * from dedup where digest_key = %s"; - List> results = troughClient().read(getSegmentId(), sql, new String[] {persistKeyFor(curi)}); - if (!results.isEmpty()) { - Map hist = new HashMap(); - hist.put(A_ORIGINAL_URL, results.get(0).get("url")); - hist.put(A_ORIGINAL_DATE, results.get(0).get("date")); - hist.put(A_WARC_RECORD_ID, results.get(0).get("id")); - - if (logger.isLoggable(Level.FINER)) { - logger.finer("loaded history by digest " + persistKeyFor(curi) - + " for uri " + curi + " - " + hist); + //Check in-memory segment first + boolean memoryDedupHit = false; + if(segmentCache.containsKey(getSegmentId())) { + String segmentDedupQuerySql = segmentizeDedupTableName(getSegmentId(), DEDUP_QUERY_SQL); + try(PreparedStatement dedupQueryStatement = dedupDbConnection.prepareStatement(segmentDedupQuerySql)) { + dedupQueryStatement.setString(1, persistKeyFor(curi)); + ResultSet rs = dedupQueryStatement.executeQuery(); + while(rs.next()) { + Map hist = new HashMap(); + hist.put(A_ORIGINAL_URL, rs.getString("url")); + hist.put(A_ORIGINAL_DATE, rs.getString("date")); + hist.put(A_WARC_RECORD_ID, rs.getString("id")); + if (logger.isLoggable(Level.FINER)) { + logger.finer("loaded in-memory history by digest " + persistKeyFor(curi) + + " for uri " + curi + " - " + hist); + } + contentDigestHistory.putAll(hist); + memoryDedupHit=true; + break; } - contentDigestHistory.putAll(hist); + } catch (Exception e) { + logger.log(Level.WARNING, "problem querying in-memory dedup in " + getSegmentId() + " for url " + curi + " sql: "+segmentDedupQuerySql, e); } - } catch (TroughNoReadUrlException e) { - // this is totally normal at the beginning of the crawl, for example - logger.log(Level.FINE, "problem retrieving dedup info from trough segment " + getSegmentId() + " for url " + curi, e); - } catch (Exception e) { - logger.log(Level.WARNING, "problem retrieving dedup info from trough segment " + getSegmentId() + " for url " + curi, e); } - } + if(!memoryDedupHit) { + try { + String sql = "select * from dedup where digest_key = %s limit 1"; + List> results = troughClient().read(getSegmentId(), sql, new String[]{persistKeyFor(curi)}); + if (!results.isEmpty()) { + Map hist = new HashMap(); + hist.put(A_ORIGINAL_URL, results.get(0).get("url")); + hist.put(A_ORIGINAL_DATE, results.get(0).get("date")); + hist.put(A_WARC_RECORD_ID, results.get(0).get("id")); - protected static final String WRITE_SQL_TMPL = - "insert or ignore into dedup (digest_key, url, date, id) values (%s, %s, %s, %s);"; + if (logger.isLoggable(Level.FINER)) { + logger.finer("loaded history by digest " + persistKeyFor(curi) + + " for uri " + curi + " - " + hist); + } + contentDigestHistory.putAll(hist); + } + } catch (TroughNoReadUrlException e) { + // this is totally normal at the beginning of the crawl, for example + logger.log(Level.FINE, "problem retrieving dedup info from trough segment " + getSegmentId() + " for url " + curi, e); + } catch (Exception e) { + logger.log(Level.WARNING, "problem retrieving dedup info from trough segment " + getSegmentId() + " for url " + curi, e); + } + } + } @Override public void store(CrawlURI curi) { if (!curi.hasContentDigestHistory() || curi.getContentDigestHistory().isEmpty()) { return; } - Map hist = curi.getContentDigestHistory(); - - try { - String digestKey = persistKeyFor(curi); - Object url = hist.get(A_ORIGINAL_URL); - Object date = hist.get(A_ORIGINAL_DATE); - Object recordId = hist.get(A_WARC_RECORD_ID); - Object[] values = new Object[] { digestKey, url, date, recordId }; - troughClient().write(getSegmentId(), WRITE_SQL_TMPL, values, SCHEMA_ID); - } catch (Exception e) { - logger.log(Level.WARNING, "problem writing dedup info to trough segment " + getSegmentId() + " for url " + curi, e); + if (getSegmentId().isEmpty()) { + logger.log(Level.WARNING, "no segment id found for url " + curi); + return; + } + + if (!segmentCache.containsKey(getSegmentId())) { + synchronized (getSegmentId()) { + if (!segmentCache.containsKey(getSegmentId())) { + segmentCache.putIfAbsent(getSegmentId(),new Object()); + createDedupCacheDbTable(getSegmentId()); + } + } + } + + insertSQLiteCacheDB(getSegmentId(), curi); + } + protected String segmentizeDedupTableName(String segmentId, String sql) + { + String tableName = "dedup_" + segmentId.replaceAll("-","_"); + return sql.replace(" dedup"," \""+tableName+"\""); + } + public void syncToTrough(String segmentId, boolean forceSync) { + logger.log(Level.FINE, "syncing local sqlite db to trough " + segmentId); + + int totalRows=0; + ResultSet dedupRows = null; + int rowsRead=0; + Object[] flattenedValues = null; + synchronized (segmentCache.get(segmentId)) { + String segmentCountSql = segmentizeDedupTableName(segmentId, COUNT_SQL); + try (PreparedStatement rowCountStatement = dedupDbConnection.prepareStatement(segmentCountSql)) { + ResultSet rs = rowCountStatement.executeQuery(); + rs.next(); + totalRows = rs.getInt(1); + } catch (SQLException e) { + logger.log(Level.WARNING, "problem reading row count info from local sqlite segment " + segmentId, e); + } + if (forceSync || totalRows > getTroughSyncMaxBatchSize()) { + /* + Trough takes a single object array for the values we insert, so an array of size (N columns * X rows). + We read each row in column by column, then repeat for each row. + */ + flattenedValues = new Object[4 * totalRows]; + String segmentSelectAllSql = segmentizeDedupTableName(segmentId, SELECT_ALL_SQL); + try (PreparedStatement dedupReadSelect = dedupDbConnection.prepareStatement(segmentSelectAllSql)) { + dedupRows = dedupReadSelect.executeQuery(); + while (dedupRows.next()) { + for (int i=0; i<4; i++) { + flattenedValues[(4 * rowsRead) + i] = dedupRows.getObject(i+1); + } + rowsRead++; + } + } catch (SQLException e) { + logger.log(Level.WARNING, "problem reading dedup info from local sqlite segment " + segmentId, e); + } + //Delete the cache db and rebuild it + String segmentDropTableSql = segmentizeDedupTableName(segmentId, DROP_TABLE_DEDUP_SQL); + try { + PreparedStatement dropTableStatement = dedupDbConnection.prepareStatement(segmentDropTableSql); + dropTableStatement.execute(); + createDedupCacheDbTable(segmentId); + } catch (SQLException e) { + logger.log(Level.WARNING, "problem removing cache db table " + segmentId + " sql: " + segmentDropTableSql, e); + } + } + } + + if(rowsRead > 0) { + //Store dedup values into Trough + StringBuffer sqlTmpl = new StringBuffer(); + //don't alter the table name since we're dealing with trough now + sqlTmpl.append(WRITE_SQL_TMPL + " values (%s, %s, %s, %s)"); + for(int i=1; i < rowsRead; i++){ + sqlTmpl.append(", (%s, %s, %s, %s)"); + } + try { + troughClient().write(segmentId, sqlTmpl.toString(), flattenedValues, SCHEMA_ID); + } catch (Exception e) { + logger.log(Level.WARNING, "problem posting batch of " + rowsRead + " dedup urls to trough segment " + segmentId, e); + } + } + } + public void insertSQLiteCacheDB(String segmentId, CrawlURI curi) { + Map hist = curi.getContentDigestHistory(); + int totalRows=0; + String segmentWriteSqlTemplate = segmentizeDedupTableName(segmentId, WRITE_SQL_TMPL); + synchronized (segmentCache.get(segmentId)) { + + try (PreparedStatement pstmt = dedupDbConnection.prepareStatement(segmentWriteSqlTemplate + " values (?, ?, ?, ?);")) { + pstmt.setString(1, persistKeyFor(curi)); + pstmt.setObject(2, hist.get(A_ORIGINAL_URL)); + pstmt.setObject(3, hist.get(A_ORIGINAL_DATE)); + pstmt.setObject(4, hist.get(A_WARC_RECORD_ID)); + pstmt.executeUpdate(); + } catch (Exception e) { + logger.log(Level.WARNING, "problem writing dedup info to local sqlite segment " + segmentId + " for url " + curi + " sql: " + segmentWriteSqlTemplate, e); + } + } + String segmentCountSql = segmentizeDedupTableName(segmentId, COUNT_SQL); + try(PreparedStatement rowCountStatement = dedupDbConnection.prepareStatement(segmentCountSql)) { + ResultSet rs = rowCountStatement.executeQuery(); + rs.next(); + totalRows = rs.getInt(1); + } catch (Exception e) { + logger.log(Level.WARNING, "problem getting row count after dedup insert " + segmentId + " for url " + curi + " sql: "+segmentCountSql, e); + } + if(totalRows > getTroughSyncMaxBatchSize()) { + syncToTrough(getSegmentId(), false); + } + } + public void createDedupCacheDbTable(String segmentId) { + String segmentSchema = segmentizeDedupTableName(segmentId, SCHEMA_SQL); + try (Statement stmt = dedupDbConnection.createStatement()) { + stmt.execute(segmentSchema); + } catch (SQLException e) { + logger.log(Level.WARNING, "Problem creating SQLite dedup db shard " + segmentId + " with schema "+ segmentSchema, e); } } } diff --git a/contrib/src/main/java/org/archive/trough/TroughClient.java b/contrib/src/main/java/org/archive/trough/TroughClient.java index 7a14c74b9..726f38028 100644 --- a/contrib/src/main/java/org/archive/trough/TroughClient.java +++ b/contrib/src/main/java/org/archive/trough/TroughClient.java @@ -8,11 +8,11 @@ import java.net.URL; import java.util.Date; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Matcher; @@ -60,9 +60,9 @@ public static String sqlValue(Object o) { protected static final String SQL_MIMETYPE = "application/sql"; protected Random rand = new Random(); - protected Map writeUrlCache; - protected Map readUrlCache; - protected Set dirtySegments; + protected ConcurrentHashMap writeUrlCache; + protected ConcurrentHashMap readUrlCache; + protected ConcurrentSkipListSet dirtySegments; protected String[] rethinkServers; protected String rethinkDb; protected Integer promotionInterval; @@ -240,9 +240,9 @@ protected String segmentManagerUrl() throws TroughException { */ public TroughClient(String rethinkdbTroughUrl, Integer promotionInterval) throws MalformedURLException { parseRethinkdbUrl(rethinkdbTroughUrl); - writeUrlCache = new HashMap(); - readUrlCache = new HashMap(); - dirtySegments = new HashSet(); + writeUrlCache = new ConcurrentHashMap(); + readUrlCache = new ConcurrentHashMap(); + dirtySegments = new ConcurrentSkipListSet(); this.promotionInterval = promotionInterval; } @@ -308,24 +308,28 @@ public List> read(String segmentId, String sqlTmpl, Object[] Object result = new JSONParser().parse(new InputStreamReader(connection.getInputStream(), "UTF-8")); return (List>) result; } catch (IOException e) { - synchronized (readUrlCache) { - readUrlCache.remove(segmentId); - } + readUrlCache.remove(segmentId); throw e; } catch (ParseException e) { - synchronized (readUrlCache) { - readUrlCache.remove(segmentId); - } + readUrlCache.remove(segmentId); throw new TroughException("problem parsing json response from " + readUrl, e); } } protected String readUrl(String segmentId) throws TroughException { if (readUrlCache.get(segmentId) == null) { - synchronized (readUrlCache) { - if (readUrlCache.get(segmentId) == null) { - String url = readUrlNoCache(segmentId); - readUrlCache.put(segmentId, url); + try { + readUrlCache.computeIfAbsent(segmentId, k -> { + try { + return readUrlNoCache(k); + } catch (TroughException e) { + throw new RuntimeException(e); + } + }); + } + catch(RuntimeException e) { + if(e.getCause() instanceof TroughException){ + throw (TroughException)e.getCause(); } } logger.info("segment " + segmentId + " read url is " + readUrlCache.get(segmentId)); @@ -377,26 +381,28 @@ public void write(String segmentId, String sqlTmpl, Object[] values, String sche throw new TroughException("unexpected response " + connection.getResponseCode() + " " + connection.getResponseMessage() + ": " + responsePayload(connection) + " from " + url + " to query: " + sql); - } - if (!dirtySegments.contains(segmentId)) { - synchronized (dirtySegments) { - dirtySegments.add(segmentId); - } } + dirtySegments.add(segmentId); } catch (IOException e) { - synchronized (writeUrlCache) { - writeUrlCache.remove(segmentId); - } + writeUrlCache.remove(segmentId); throw e; } } protected String writeUrl(String segmentId, String schemaId) throws IOException { if (writeUrlCache.get(segmentId) == null) { - synchronized (writeUrlCache) { - if (writeUrlCache.get(segmentId) == null) { - String url = writeUrlNoCache(segmentId, schemaId); - writeUrlCache.put(segmentId, url); + try { + writeUrlCache.computeIfAbsent(segmentId, k -> { + try { + return writeUrlNoCache(k, schemaId); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + catch(RuntimeException e) { + if(e.getCause() instanceof IOException){ + throw (IOException)e.getCause(); } } logger.info("segment " + segmentId + " write url is " + writeUrlCache.get(segmentId));