NetDB: SubDB lifecycle fixes (Gitlab #406)

This commit is contained in:
zzz
2023-10-24 02:35:52 +00:00
committed by idk
parent a23e09ca20
commit bf1b4cf502
8 changed files with 87 additions and 68 deletions
@@ -30,7 +30,6 @@ public interface DataStore {
/** @since 0.8.3 */
public Set<Map.Entry<Hash, DatabaseEntry>> getMapEntries();
public void stop();
public void restart();
public void rescan();
public int countLeaseSets();
@@ -108,10 +108,6 @@ class FloodfillMonitorJob extends JobImpl {
if (getContext().router().isHidden())
return false;
// Client subDb should not be a floodfill
if (_facade.isClientDb())
return false;
String enabled = getContext().getProperty(PROP_FLOODFILL_PARTICIPANT, "auto");
if ("true".equals(enabled))
return true;
@@ -60,9 +60,19 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
private static final long NEXT_RKEY_LS_ADVANCE_TIME = 10*60*1000;
private static final int NEXT_FLOOD_QTY = 2;
/**
* Main DB
*/
public FloodfillNetworkDatabaseFacade(RouterContext context) {
this(context, FloodfillNetworkDatabaseSegmentor.MAIN_DBID);
}
/**
* Sub DBs
*
* @param dbid null for main DB
* @since 0.9.60
*/
public FloodfillNetworkDatabaseFacade(RouterContext context, Hash dbid) {
super(context, dbid);
_activeFloodQueries = new HashMap<Hash, FloodSearchJob>();
@@ -85,7 +95,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
// for ISJ
_context.statManager().createRateStat("netDb.RILookupDirect", "Was an iterative RI lookup sent directly?", "NetworkDatabase", rate);
// No need to start the FloodfillMonitorJob for client subDb.
if (!super.isMainDb())
if (!isMainDb())
_ffMonitor = null;
else
_ffMonitor = new FloodfillMonitorJob(_context, this);
@@ -97,10 +107,8 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
super.startup();
if (_ffMonitor != null)
_context.jobQueue().addJob(_ffMonitor);
if (!super.isMainDb()){
if (!isMainDb()) {
isFF = false;
_lookupThrottler = null;
_lookupThrottlerBurst = null;
} else {
isFF = _context.getBooleanProperty(FloodfillMonitorJob.PROP_FLOODFILL_PARTICIPANT);
_lookupThrottler = new LookupThrottler();
@@ -108,7 +116,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
}
long down = _context.router().getEstimatedDowntime();
if (!_context.commSystem().isDummy() &&
if (!_context.commSystem().isDummy() && isMainDb() &&
(down == 0 || (!isFF && down > 30*60*1000) || (isFF && down > 24*60*60*1000))) {
// refresh old routers
Job rrj = new RefreshRoutersJob(_context, this);
@@ -120,7 +128,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
@Override
protected void createHandlers() {
// Only initialize the handlers for the flooodfill netDb.
if (super.isMainDb()) {
if (isMainDb()) {
if (_log.shouldInfo())
_log.info("[dbid: " + super._dbid + "] Initializing the message handlers");
_context.inNetMessagePool().registerHandlerJobBuilder(DatabaseLookupMessage.MESSAGE_TYPE, new FloodfillDatabaseLookupMessageHandler(_context, this));
@@ -66,7 +66,7 @@ public abstract class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacad
/** where the data store is pushing the data */
private String _dbDir;
// set of Hash objects that we should search on (to fill up a bucket, not to get data)
private final Set<Hash> _exploreKeys = new ConcurrentHashSet<Hash>(64);
private final Set<Hash> _exploreKeys;
private boolean _initialized;
/** Clock independent time of when we started up */
private long _started;
@@ -82,6 +82,7 @@ public abstract class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacad
private final BlindCache _blindCache;
protected final Hash _dbid;
private Hash _localKey;
private final Job _elj, _erj;
/**
* Map of Hash to RepublishLeaseSetJob for leases we'realready managing.
@@ -183,11 +184,16 @@ public abstract class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacad
if (!isMainDb()) {
_reseedChecker = null;
_blindCache = null;
_exploreKeys = null;
} else {
_reseedChecker = new ReseedChecker(context);
_blindCache = new BlindCache(context);
_exploreKeys = new ConcurrentHashSet<Hash>(64);
}
_localKey = null;
_elj = new ExpireLeasesJob(_context, this);
// We don't have a comm system here to check for ctx.commSystem().isDummy()
// we'll check before starting in startup()
_erj = new ExpireRoutersJob(_context, this);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Created KademliaNetworkDatabaseFacade for id: " + dbid);
context.statManager().createRateStat("netDb.lookupDeferred", "how many lookups are deferred?", "NetworkDatabase", new long[] { 60*60*1000 });
@@ -250,59 +256,61 @@ public abstract class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacad
/** @return unmodifiable set */
public Set<Hash> getExploreKeys() {
if (!_initialized)
if (!_initialized || !isMainDb())
return Collections.emptySet();
return Collections.unmodifiableSet(_exploreKeys);
}
public void removeFromExploreKeys(Collection<Hash> toRemove) {
if (!_initialized) return;
if (!_initialized || !isMainDb())
return;
_exploreKeys.removeAll(toRemove);
_context.statManager().addRateData("netDb.exploreKeySet", _exploreKeys.size());
}
public void queueForExploration(Collection<Hash> keys) {
if (!_initialized) return;
if (!_initialized || !isMainDb())
return;
for (Iterator<Hash> iter = keys.iterator(); iter.hasNext() && _exploreKeys.size() < MAX_EXPLORE_QUEUE; ) {
_exploreKeys.add(iter.next());
}
_context.statManager().addRateData("netDb.exploreKeySet", _exploreKeys.size());
}
/**
* Cannot be restarted.
*/
public synchronized void shutdown() {
_initialized = false;
if (!_context.commSystem().isDummy() &&
if (!_context.commSystem().isDummy() && isMainDb() &&
_context.router().getUptime() > ROUTER_INFO_EXPIRATION_FLOODFILL + 10*60*1000 + 60*1000) {
// expire inline before saving RIs in _ds.stop()
Job erj = new ExpireRoutersJob(_context, this);
erj.runJob();
}
_context.jobQueue().removeJob(_elj);
_context.jobQueue().removeJob(_erj);
if (_kb != null)
_kb.clear();
if (_ds != null)
_ds.stop();
_exploreKeys.clear();
if (_exploreKeys != null)
_exploreKeys.clear();
if (_negativeCache != null)
_negativeCache.clear();
_negativeCache.stop();
if (isMainDb())
blindCache().shutdown();
}
/**
* Unsupported, do not use
*
* @throws UnsupportedOperationException always
* @deprecated
*/
@Deprecated
public synchronized void restart() {
_dbDir = getDbDir();
if (_dbDir == null) {
_log.info("No DB dir specified [" + PROP_DB_DIR + "], using [" + DEFAULT_DB_DIR + "]");
_dbDir = DEFAULT_DB_DIR;
}
_ds.restart();
_exploreKeys.clear();
if (isMainDb())
blindCache().startup();
_initialized = true;
RouterInfo ri = _context.router().getRouterInfo();
publish(ri);
throw new UnsupportedOperationException();
}
@Override
@@ -311,17 +319,13 @@ public abstract class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacad
_ds.rescan();
}
String getDbDir() {
if (_dbDir == null) {
String dbDir = _context.getProperty(PROP_DB_DIR, DEFAULT_DB_DIR);
if (_dbid != FloodfillNetworkDatabaseSegmentor.MAIN_DBID) {
File subDir = new File(dbDir, _dbid.toBase32());
dbDir = subDir.toString();
}
return dbDir;
}
return _dbDir;
}
/**
* For the main DB only.
* Sub DBs are not persisted and must not access this directory.
*
* @return null before startup() is called; non-null thereafter, even for subdbs.
*/
String getDbDir() { return _dbDir; }
/**
* Check if the database is a client DB.
@@ -374,18 +378,19 @@ public abstract class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacad
public synchronized void startup() {
_log.info("Starting up the kademlia network database");
RouterInfo ri = _context.router().getRouterInfo();
String dbDir = _context.getProperty(PROP_DB_DIR, DEFAULT_DB_DIR);
_kb = new KBucketSet<Hash>(_context, ri.getIdentity().getHash(),
BUCKET_SIZE, KAD_B, new RejectTrimmer<Hash>());
_dbDir = getDbDir();
try {
if (isMainDb()) {
_ds = new PersistentDataStore(_context, _dbDir, this);
_ds = new PersistentDataStore(_context, dbDir, this);
} else {
_ds = new TransientDataStore(_context);
}
} catch (IOException ioe) {
throw new RuntimeException("Unable to initialize netdb storage", ioe);
}
_dbDir = dbDir;
_negativeCache = new NegativeLookupCache(_context);
if (isMainDb())
blindCache().startup();
@@ -396,22 +401,20 @@ public abstract class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacad
_started = System.currentTimeMillis();
// expire old leases
Job elj = new ExpireLeasesJob(_context, this);
long now = _context.clock().now();
elj.getTiming().setStartAfter(now + 11*60*1000);
_context.jobQueue().addJob(elj);
_elj.getTiming().setStartAfter(now + 11*60*1000);
_context.jobQueue().addJob(_elj);
//// expire some routers
// Don't run until after RefreshRoutersJob has run, and after validate() will return invalid for old routers.
if (!_context.commSystem().isDummy()) {
Job erj = new ExpireRoutersJob(_context, this);
boolean isFF = _context.getBooleanProperty(FloodfillMonitorJob.PROP_FLOODFILL_PARTICIPANT);
long down = _context.router().getEstimatedDowntime();
long delay = (down == 0 || (!isFF && down > 30*60*1000) || (isFF && down > 24*60*60*1000)) ?
ROUTER_INFO_EXPIRATION_FLOODFILL + 10*60*1000 :
10*60*1000;
erj.getTiming().setStartAfter(now + delay);
_context.jobQueue().addJob(erj);
_erj.getTiming().setStartAfter(now + delay);
_context.jobQueue().addJob(_erj);
}
if (!QUIET) {
@@ -749,7 +752,8 @@ public abstract class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacad
key = blindCache().getHash(key);
fail(key);
// this was an interesting key, so either refetch it or simply explore with it
_exploreKeys.add(key);
if (_exploreKeys != null)
_exploreKeys.add(key);
return null;
}
} else {
@@ -7,7 +7,6 @@ import net.i2p.data.Hash;
import net.i2p.router.RouterContext;
import net.i2p.util.LHMCache;
import net.i2p.util.ObjectCounter;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/**
@@ -19,6 +18,8 @@ class NegativeLookupCache {
private final ObjectCounter<Hash> counter;
private final Map<Hash, Destination> badDests;
private final int _maxFails;
private final SimpleTimer2.TimedEvent cleaner;
private final long cleanTime;
static final int MAX_FAILS = 3;
private static final int MAX_BAD_DESTS = 128;
@@ -28,8 +29,8 @@ class NegativeLookupCache {
this.counter = new ObjectCounter<Hash>();
this.badDests = new LHMCache<Hash, Destination>(MAX_BAD_DESTS);
this._maxFails = context.getProperty("netdb.negativeCache.maxFails",MAX_FAILS);
final long cleanTime = context.getProperty("netdb.negativeCache.cleanupInterval", CLEAN_TIME);
SimpleTimer2.getInstance().addPeriodicEvent(new Cleaner(), cleanTime);
cleanTime = context.getProperty("netdb.negativeCache.cleanupInterval", CLEAN_TIME);
cleaner = new Cleaner(context.simpleTimer2());
}
public void lookupFailed(Hash h) {
@@ -88,9 +89,29 @@ class NegativeLookupCache {
}
}
private class Cleaner implements SimpleTimer.TimedEvent {
/**
* Stops the timer. May not be restarted.
*
* @since 0.9.60
*/
public void stop() {
clear();
cleaner.cancel();
}
private class Cleaner extends SimpleTimer2.TimedEvent {
/**
* Schedules itself.
*
* @since 0.9.60
*/
public Cleaner(SimpleTimer2 pool) {
super(pool, cleanTime);
}
public void timeReached() {
NegativeLookupCache.this.counter.clear();
counter.clear();
reschedule(cleanTime);
}
}
}
@@ -99,11 +99,6 @@ public class PersistentDataStore extends TransientDataStore {
_writer.flush();
}
@Override
public void restart() {
super.restart();
}
@Override
public void rescan() {
if (_initialized)
@@ -45,10 +45,6 @@ class TransientDataStore implements DataStore {
_data.clear();
}
public void restart() {
stop();
}
public void rescan() {}
/**
@@ -34,7 +34,7 @@ public class Restarter implements Runnable {
log.logAlways(Log.WARN, "Restarted the tunnel manager");
//try { _context.peerManager().restart(); } catch (Throwable t) { log.log(Log.CRIT, "Error restarting the peer manager", t); }
//try { _context.netDbSegmentor().restart(); } catch (Throwable t) { log.log(Log.CRIT, "Error restarting the networkDb", t); }
//try { _context.netDb().restart(); } catch (Throwable t) { log.log(Log.CRIT, "Error restarting the networkDb", t); }
//try { _context.jobQueue().restart(); } catch (Throwable t) { log.log(Log.CRIT, "Error restarting the job queue", t); }
try { Thread.sleep(10*1000); } catch (InterruptedException ie) {}