Compare commits

...

1 Commits

Author SHA1 Message Date
junderw
7b622e9efd
Fix reorgs permanently for reorgs up to 6 blocks 2024-12-14 19:40:36 +09:00
4 changed files with 200 additions and 24 deletions

View File

@ -516,7 +516,7 @@ where
// save updated stats to cache // save updated stats to cache
if let Some(lastblock) = lastblock { if let Some(lastblock) = lastblock {
chain.store().cache_db().write( chain.store().cache_db().write_nocache(
vec![asset_cache_row(asset_id, &newstats, &lastblock)], vec![asset_cache_row(asset_id, &newstats, &lastblock)],
DBFlush::Enable, DBFlush::Enable,
); );

View File

@ -1,6 +1,10 @@
use bounded_vec_deque::BoundedVecDeque;
use rocksdb; use rocksdb;
use std::collections::HashSet;
use std::path::Path; use std::path::Path;
use std::sync::atomic::AtomicBool;
use std::sync::RwLock;
use crate::config::Config; use crate::config::Config;
use crate::util::{bincode_util, Bytes}; use crate::util::{bincode_util, Bytes};
@ -134,11 +138,23 @@ impl<'a> Iterator for ReverseScanGroupIterator<'a> {
} }
} }
type SingleBlockCache = HashSet<Vec<u8>>;
type TipsCache = BoundedVecDeque<SingleBlockCache>;
#[derive(Debug)] #[derive(Debug)]
pub struct DB { pub struct DB {
db: rocksdb::DB, db: rocksdb::DB,
// BoundedVecDeque of most recent blocks
// Outer Vec is a list of rocksdb keys to remove when reorged
// Inner Vec is the key (a key is Vec<u8>)
// It will automatically drop "blocks" that go over the bound.
rollback_cache: RwLock<TipsCache>,
rollback_active: AtomicBool,
} }
// 6 blocks should be enough
const CACHE_CAPACITY: usize = 6;
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
pub enum DBFlush { pub enum DBFlush {
Disable, Disable,
@ -147,8 +163,13 @@ pub enum DBFlush {
impl DB { impl DB {
pub fn open(path: &Path, config: &Config) -> DB { pub fn open(path: &Path, config: &Config) -> DB {
let mut rollback_cache = BoundedVecDeque::with_capacity(CACHE_CAPACITY, CACHE_CAPACITY);
rollback_cache.push_back(HashSet::new()); // last HashSet is "current block"
let db = DB { let db = DB {
db: open_raw_db(path), db: open_raw_db(path),
// TODO: Make the number of blocks configurable? 6 should be fine for mainnet
rollback_cache: RwLock::new(rollback_cache),
rollback_active: AtomicBool::new(false),
}; };
db.verify_compatibility(config); db.verify_compatibility(config);
db db
@ -220,17 +241,123 @@ impl DB {
ReverseScanGroupIterator::new(iters, value_offset) ReverseScanGroupIterator::new(iters, value_offset)
} }
pub fn write(&self, mut rows: Vec<DBRow>, flush: DBFlush) { fn fill_cache(&self, key: &[u8]) {
// Single letter keys tend to be related to versioning and tips
// So do not cache as they don't need to be rolled back
if key.len() < 2 {
return;
}
self.with_cache(|cache| {
cache.insert(key.to_owned());
});
}
fn with_cache<F>(&self, func: F)
where
F: FnOnce(&mut SingleBlockCache),
{
func(
self.rollback_cache
.write()
.unwrap()
.back_mut()
.expect("Always one block"),
)
}
pub fn tick_next_block(&self) {
// Adding a new block's worth of cache
// This will automatically drop the oldest block (HashSet)
self.rollback_cache
.write()
.unwrap()
.push_back(HashSet::new());
}
/// Performs a rollback of `count` blocks, then ticks one block forward
pub fn rollback(&self, mut count: usize) -> usize {
if count == 0 {
return 0;
}
let mut cache = self.rollback_cache.write().unwrap();
while count > 0 {
if let Some(block) = cache.pop_back() {
debug!(
"Rolling back DB cached block with {} entries @ {:?}",
block.len(),
self.db.path()
);
for key in block {
// Ignore rocksdb errors, but log them
let _ = self.db.delete(key).inspect_err(|err| {
warn!("Error when deleting rocksdb rollback cache: {err}");
});
}
count -= 1;
} else {
break;
}
}
cache.push_back(HashSet::new());
count
}
pub fn rollbacks_enabled(&self) -> bool {
self.rollback_active
.load(std::sync::atomic::Ordering::Acquire)
}
pub fn disable_rollbacks(&self) {
self.rollback_active
.store(false, std::sync::atomic::Ordering::Release);
}
pub fn enable_rollbacks(&self) {
self.rollback_active
.store(true, std::sync::atomic::Ordering::Release);
}
pub fn write_nocache(&self, rows: Vec<DBRow>, flush: DBFlush) {
self.write_blocks_nocache(vec![rows], flush);
}
pub fn write_blocks(&self, blocks: Vec<Vec<DBRow>>, flush: DBFlush) {
self.write_blocks_inner(blocks, flush, false)
}
pub fn write_blocks_nocache(&self, blocks: Vec<Vec<DBRow>>, flush: DBFlush) {
self.write_blocks_inner(blocks, flush, true)
}
#[inline]
fn write_blocks_inner(&self, blocks: Vec<Vec<DBRow>>, flush: DBFlush, skip_cache: bool) {
debug!( debug!(
"writing {} rows to {:?}, flush={:?}", "writing {} rows to {:?}, flush={:?}",
rows.len(), blocks.iter().map(|b| b.len()).sum::<usize>(),
self.db, self.db,
flush flush
); );
rows.sort_unstable_by(|a, b| a.key.cmp(&b.key));
let mut batch = rocksdb::WriteBatch::default(); let mut batch = rocksdb::WriteBatch::default();
for row in rows { for mut rows in blocks {
batch.put(&row.key, &row.value); rows.sort_unstable_by(|a, b| a.key.cmp(&b.key));
if !skip_cache
&& self
.rollback_active
.load(std::sync::atomic::Ordering::Acquire)
{
self.with_cache(|cache| {
for row in &rows {
cache.insert(row.key.clone());
batch.put(&row.key, &row.value);
}
});
// Special case: we should tick forward blocks
self.tick_next_block();
} else {
for row in &rows {
batch.put(&row.key, &row.value);
}
}
} }
let do_flush = match flush { let do_flush = match flush {
DBFlush::Enable => true, DBFlush::Enable => true,
@ -247,10 +374,22 @@ impl DB {
} }
pub fn put(&self, key: &[u8], value: &[u8]) { pub fn put(&self, key: &[u8], value: &[u8]) {
if self
.rollback_active
.load(std::sync::atomic::Ordering::Acquire)
{
self.fill_cache(key);
}
self.db.put(key, value).unwrap(); self.db.put(key, value).unwrap();
} }
pub fn put_sync(&self, key: &[u8], value: &[u8]) { pub fn put_sync(&self, key: &[u8], value: &[u8]) {
if self
.rollback_active
.load(std::sync::atomic::Ordering::Acquire)
{
self.fill_cache(key);
}
let mut opts = rocksdb::WriteOptions::new(); let mut opts = rocksdb::WriteOptions::new();
opts.set_sync(true); opts.set_sync(true);
self.db.put_opt(key, value, &opts).unwrap(); self.db.put_opt(key, value, &opts).unwrap();

View File

@ -86,6 +86,27 @@ impl Store {
} }
} }
pub fn tick_next_block(&self) {
self.txstore_db.tick_next_block();
self.history_db.tick_next_block();
self.cache_db.tick_next_block();
}
pub fn enable_rollback_cache(&self) {
self.txstore_db.enable_rollbacks();
self.history_db.enable_rollbacks();
self.cache_db.enable_rollbacks();
}
pub fn rollback(&self, count: usize) {
let mut leftover = 0;
leftover += self.txstore_db.rollback(count);
leftover += self.history_db.rollback(count);
leftover += self.cache_db.rollback(count);
if leftover > 0 {
warn!("Rolling back all DB caches missed {count} blocks. Re-orged duplicates might still be active in the DB.")
}
}
pub fn txstore_db(&self) -> &DB { pub fn txstore_db(&self) -> &DB {
&self.txstore_db &self.txstore_db
} }
@ -273,6 +294,18 @@ impl Indexer {
let tip = daemon.getbestblockhash()?; let tip = daemon.getbestblockhash()?;
let new_headers = self.get_new_headers(&daemon, &tip)?; let new_headers = self.get_new_headers(&daemon, &tip)?;
// Deal with re-orgs before indexing
let headers_len = {
let mut headers = self.store.indexed_headers.write().unwrap();
let reorged = headers.apply(&new_headers);
assert_eq!(tip, *headers.tip());
// reorg happened
if !reorged.is_empty() {
self.store.rollback(reorged.len());
}
headers.len()
};
let to_add = self.headers_to_add(&new_headers); let to_add = self.headers_to_add(&new_headers);
debug!( debug!(
"adding transactions from {} blocks using {:?}", "adding transactions from {} blocks using {:?}",
@ -301,16 +334,20 @@ impl Indexer {
// update the synced tip *after* the new data is flushed to disk // update the synced tip *after* the new data is flushed to disk
debug!("updating synced tip to {:?}", tip); debug!("updating synced tip to {:?}", tip);
self.store.txstore_db.put_sync(b"t", &serialize(&tip)); self.store.txstore_db.put_sync(b"t", &serialize(&tip));
// Ticking cache DB "block every time we update"
let mut headers = self.store.indexed_headers.write().unwrap(); // This means that each "block" essentially contains the
headers.apply(new_headers); // cache updates between each update() call, and we will
assert_eq!(tip, *headers.tip()); // rollback more cache_db than other DBs when rolling back
// but this is just a cache anyway.
// We'd rather not have bad data in the cache.
self.store.cache_db.tick_next_block();
if let FetchFrom::BlkFiles = self.from { if let FetchFrom::BlkFiles = self.from {
self.store.enable_rollback_cache();
self.from = FetchFrom::Bitcoind; self.from = FetchFrom::Bitcoind;
} }
self.tip_metric.set(headers.len() as i64 - 1); self.tip_metric.set(headers_len as i64 - 1);
Ok(tip) Ok(tip)
} }
@ -324,7 +361,7 @@ impl Indexer {
}; };
{ {
let _timer = self.start_timer("add_write"); let _timer = self.start_timer("add_write");
self.store.txstore_db.write(rows, self.flush); self.store.txstore_db.write_blocks(rows, self.flush);
} }
self.store self.store
@ -360,7 +397,7 @@ impl Indexer {
} }
index_blocks(blocks, &previous_txos_map, &self.iconfig) index_blocks(blocks, &previous_txos_map, &self.iconfig)
}; };
self.store.history_db.write(rows, self.flush); self.store.history_db.write_blocks(rows, self.flush);
} }
} }
@ -820,7 +857,7 @@ impl ChainQuery {
// save updated utxo set to cache // save updated utxo set to cache
if let Some(lastblock) = lastblock { if let Some(lastblock) = lastblock {
if had_cache || processed_items > MIN_HISTORY_ITEMS_TO_CACHE { if had_cache || processed_items > MIN_HISTORY_ITEMS_TO_CACHE {
self.store.cache_db.write( self.store.cache_db.write_nocache(
vec![UtxoCacheRow::new(scripthash, &newutxos, &lastblock).into_row()], vec![UtxoCacheRow::new(scripthash, &newutxos, &lastblock).into_row()],
flush, flush,
); );
@ -928,7 +965,7 @@ impl ChainQuery {
// save updated stats to cache // save updated stats to cache
if let Some(lastblock) = lastblock { if let Some(lastblock) = lastblock {
if newstats.funded_txo_count + newstats.spent_txo_count > MIN_HISTORY_ITEMS_TO_CACHE { if newstats.funded_txo_count + newstats.spent_txo_count > MIN_HISTORY_ITEMS_TO_CACHE {
self.store.cache_db.write( self.store.cache_db.write_nocache(
vec![StatsCacheRow::new(scripthash, &newstats, &lastblock).into_row()], vec![StatsCacheRow::new(scripthash, &newstats, &lastblock).into_row()],
flush, flush,
); );
@ -1256,7 +1293,7 @@ fn load_blockheaders(db: &DB) -> HashMap<BlockHash, BlockHeader> {
.collect() .collect()
} }
fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec<DBRow> { fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec<Vec<DBRow>> {
// persist individual transactions: // persist individual transactions:
// T{txid} → {rawtx} // T{txid} → {rawtx}
// C{txid}{blockhash}{height} → // C{txid}{blockhash}{height} →
@ -1284,7 +1321,6 @@ fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec<DBRo
rows.push(BlockRow::new_done(blockhash).into_row()); // mark block as "added" rows.push(BlockRow::new_done(blockhash).into_row()); // mark block as "added"
rows rows
}) })
.flatten()
.collect() .collect()
} }
@ -1370,7 +1406,7 @@ fn index_blocks(
block_entries: &[BlockEntry], block_entries: &[BlockEntry],
previous_txos_map: &HashMap<OutPoint, TxOut>, previous_txos_map: &HashMap<OutPoint, TxOut>,
iconfig: &IndexerConfig, iconfig: &IndexerConfig,
) -> Vec<DBRow> { ) -> Vec<Vec<DBRow>> {
block_entries block_entries
.par_iter() // serialization is CPU-intensive .par_iter() // serialization is CPU-intensive
.map(|b| { .map(|b| {
@ -1389,7 +1425,6 @@ fn index_blocks(
rows.push(BlockRow::new_done(full_hash(&b.entry.hash()[..])).into_row()); // mark block as "indexed" rows.push(BlockRow::new_done(full_hash(&b.entry.hash()[..])).into_row()); // mark block as "indexed"
rows rows
}) })
.flatten()
.collect() .collect()
} }

View File

@ -111,7 +111,7 @@ impl HeaderList {
); );
let mut headers = HeaderList::empty(); let mut headers = HeaderList::empty();
headers.apply(headers.order(headers_chain)); headers.apply(&headers.order(headers_chain));
headers headers
} }
@ -155,7 +155,8 @@ impl HeaderList {
.collect() .collect()
} }
pub fn apply(&mut self, new_headers: Vec<HeaderEntry>) { /// Returns any re-orged headers
pub fn apply(&mut self, new_headers: &[HeaderEntry]) -> Vec<HeaderEntry> {
// new_headers[i] -> new_headers[i - 1] (i.e. new_headers.last() is the tip) // new_headers[i] -> new_headers[i - 1] (i.e. new_headers.last() is the tip)
for i in 1..new_headers.len() { for i in 1..new_headers.len() {
assert_eq!(new_headers[i - 1].height() + 1, new_headers[i].height()); assert_eq!(new_headers[i - 1].height() + 1, new_headers[i].height());
@ -175,21 +176,22 @@ impl HeaderList {
assert_eq!(entry.header().prev_blockhash, expected_prev_blockhash); assert_eq!(entry.header().prev_blockhash, expected_prev_blockhash);
height height
} }
None => return, None => return vec![],
}; };
debug!( debug!(
"applying {} new headers from height {}", "applying {} new headers from height {}",
new_headers.len(), new_headers.len(),
new_height new_height
); );
let _removed = self.headers.split_off(new_height); // keep [0..new_height) entries let removed = self.headers.split_off(new_height); // keep [0..new_height) entries
for new_header in new_headers { for new_header in new_headers {
let height = new_header.height(); let height = new_header.height();
assert_eq!(height, self.headers.len()); assert_eq!(height, self.headers.len());
self.tip = *new_header.hash(); self.tip = *new_header.hash();
self.headers.push(new_header); self.headers.push(new_header.clone());
self.heights.insert(self.tip, height); self.heights.insert(self.tip, height);
} }
removed
} }
pub fn header_by_blockhash(&self, blockhash: &BlockHash) -> Option<&HeaderEntry> { pub fn header_by_blockhash(&self, blockhash: &BlockHash) -> Option<&HeaderEntry> {