Compare commits

...

1 Commits

Author SHA1 Message Date
junderw
d195b81d3b
WIP: Full compaction every 100k blocks during initial sync only 2024-10-06 13:38:24 +09:00

View File

@ -209,6 +209,8 @@ pub struct ChainQuery {
// TODO: &[Block] should be an iterator / a queue.
impl Indexer {
const FINISHED_SYNCING_KEY: &'static [u8] = &[b'F'];
pub fn open(store: Arc<Store>, from: FetchFrom, config: &Config, metrics: &Metrics) -> Self {
Indexer {
store,
@ -246,15 +248,21 @@ impl Indexer {
}
fn start_auto_compactions(&self, db: &DB) {
let key = b"F".to_vec();
if db.get(&key).is_none() {
if db.get(Self::FINISHED_SYNCING_KEY).is_none() {
db.full_compaction();
db.put_sync(&key, b"");
assert!(db.get(&key).is_some());
db.put_sync(Self::FINISHED_SYNCING_KEY, b"");
assert!(db.get(Self::FINISHED_SYNCING_KEY).is_some());
}
db.enable_auto_compaction();
}
fn compact_if_not_finished_syncing(&self, db: &DB) {
if db.get(Self::FINISHED_SYNCING_KEY).is_none() {
debug!("Running (non-final) periodic compaction on {:?}", db);
db.full_compaction();
}
}
fn get_new_headers(&self, daemon: &Daemon, tip: &BlockHash) -> Result<Vec<HeaderEntry>> {
let headers = self.store.indexed_headers.read().unwrap();
let new_headers = daemon.get_new_headers(&headers, tip)?;
@ -277,7 +285,12 @@ impl Indexer {
to_add.len(),
self.from
);
start_fetcher(self.from, &daemon, to_add)?.map(|blocks| self.add(&blocks));
start_fetcher(self.from, &daemon, to_add)?.map(|blocks| {
self.add(&blocks);
if needs_periodic_compact(&blocks, 100_000) {
self.compact_if_not_finished_syncing(&self.store.txstore_db);
}
});
self.start_auto_compactions(&self.store.txstore_db);
let to_index = self.headers_to_index(&new_headers);
@ -286,7 +299,12 @@ impl Indexer {
to_index.len(),
self.from
);
start_fetcher(self.from, &daemon, to_index)?.map(|blocks| self.index(&blocks));
start_fetcher(self.from, &daemon, to_index)?.map(|blocks| {
self.index(&blocks);
if needs_periodic_compact(&blocks, 100_000) {
self.compact_if_not_finished_syncing(&self.store.history_db);
}
});
self.start_auto_compactions(&self.store.history_db);
if let DBFlush::Disable = self.flush {
@ -1135,6 +1153,19 @@ fn load_blockheaders(db: &DB) -> HashMap<BlockHash, BlockHeader> {
.collect()
}
fn needs_periodic_compact(blocks: &[BlockEntry], interval: usize) -> bool {
if let (Some(start), Some(end)) = (
blocks.first().map(|b| b.entry.height()),
blocks.last().map(|b| b.entry.height()),
) {
if start / interval != end / interval {
// start and end are on differing intervals, so compact
return true;
}
};
false
}
fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec<DBRow> {
// persist individual transactions:
// T{txid} → {rawtx}