Compare commits
1 Commits
mempool
...
junderw/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b622e9efd |
@ -516,7 +516,7 @@ where
|
|||||||
|
|
||||||
// save updated stats to cache
|
// save updated stats to cache
|
||||||
if let Some(lastblock) = lastblock {
|
if let Some(lastblock) = lastblock {
|
||||||
chain.store().cache_db().write(
|
chain.store().cache_db().write_nocache(
|
||||||
vec![asset_cache_row(asset_id, &newstats, &lastblock)],
|
vec![asset_cache_row(asset_id, &newstats, &lastblock)],
|
||||||
DBFlush::Enable,
|
DBFlush::Enable,
|
||||||
);
|
);
|
||||||
|
|||||||
@ -1,6 +1,10 @@
|
|||||||
|
use bounded_vec_deque::BoundedVecDeque;
|
||||||
use rocksdb;
|
use rocksdb;
|
||||||
|
|
||||||
|
use std::collections::HashSet;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
use std::sync::atomic::AtomicBool;
|
||||||
|
use std::sync::RwLock;
|
||||||
|
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::util::{bincode_util, Bytes};
|
use crate::util::{bincode_util, Bytes};
|
||||||
@ -134,11 +138,23 @@ impl<'a> Iterator for ReverseScanGroupIterator<'a> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SingleBlockCache = HashSet<Vec<u8>>;
|
||||||
|
type TipsCache = BoundedVecDeque<SingleBlockCache>;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct DB {
|
pub struct DB {
|
||||||
db: rocksdb::DB,
|
db: rocksdb::DB,
|
||||||
|
// BoundedVecDeque of most recent blocks
|
||||||
|
// Outer Vec is a list of rocksdb keys to remove when reorged
|
||||||
|
// Inner Vec is the key (a key is Vec<u8>)
|
||||||
|
// It will automatically drop "blocks" that go over the bound.
|
||||||
|
rollback_cache: RwLock<TipsCache>,
|
||||||
|
rollback_active: AtomicBool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 6 blocks should be enough
|
||||||
|
const CACHE_CAPACITY: usize = 6;
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug)]
|
#[derive(Copy, Clone, Debug)]
|
||||||
pub enum DBFlush {
|
pub enum DBFlush {
|
||||||
Disable,
|
Disable,
|
||||||
@ -147,8 +163,13 @@ pub enum DBFlush {
|
|||||||
|
|
||||||
impl DB {
|
impl DB {
|
||||||
pub fn open(path: &Path, config: &Config) -> DB {
|
pub fn open(path: &Path, config: &Config) -> DB {
|
||||||
|
let mut rollback_cache = BoundedVecDeque::with_capacity(CACHE_CAPACITY, CACHE_CAPACITY);
|
||||||
|
rollback_cache.push_back(HashSet::new()); // last HashSet is "current block"
|
||||||
let db = DB {
|
let db = DB {
|
||||||
db: open_raw_db(path),
|
db: open_raw_db(path),
|
||||||
|
// TODO: Make the number of blocks configurable? 6 should be fine for mainnet
|
||||||
|
rollback_cache: RwLock::new(rollback_cache),
|
||||||
|
rollback_active: AtomicBool::new(false),
|
||||||
};
|
};
|
||||||
db.verify_compatibility(config);
|
db.verify_compatibility(config);
|
||||||
db
|
db
|
||||||
@ -220,17 +241,123 @@ impl DB {
|
|||||||
ReverseScanGroupIterator::new(iters, value_offset)
|
ReverseScanGroupIterator::new(iters, value_offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write(&self, mut rows: Vec<DBRow>, flush: DBFlush) {
|
fn fill_cache(&self, key: &[u8]) {
|
||||||
|
// Single letter keys tend to be related to versioning and tips
|
||||||
|
// So do not cache as they don't need to be rolled back
|
||||||
|
if key.len() < 2 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
self.with_cache(|cache| {
|
||||||
|
cache.insert(key.to_owned());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn with_cache<F>(&self, func: F)
|
||||||
|
where
|
||||||
|
F: FnOnce(&mut SingleBlockCache),
|
||||||
|
{
|
||||||
|
func(
|
||||||
|
self.rollback_cache
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
|
.back_mut()
|
||||||
|
.expect("Always one block"),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn tick_next_block(&self) {
|
||||||
|
// Adding a new block's worth of cache
|
||||||
|
// This will automatically drop the oldest block (HashSet)
|
||||||
|
self.rollback_cache
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
|
.push_back(HashSet::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Performs a rollback of `count` blocks, then ticks one block forward
|
||||||
|
pub fn rollback(&self, mut count: usize) -> usize {
|
||||||
|
if count == 0 {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
let mut cache = self.rollback_cache.write().unwrap();
|
||||||
|
while count > 0 {
|
||||||
|
if let Some(block) = cache.pop_back() {
|
||||||
|
debug!(
|
||||||
|
"Rolling back DB cached block with {} entries @ {:?}",
|
||||||
|
block.len(),
|
||||||
|
self.db.path()
|
||||||
|
);
|
||||||
|
for key in block {
|
||||||
|
// Ignore rocksdb errors, but log them
|
||||||
|
let _ = self.db.delete(key).inspect_err(|err| {
|
||||||
|
warn!("Error when deleting rocksdb rollback cache: {err}");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
count -= 1;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cache.push_back(HashSet::new());
|
||||||
|
count
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn rollbacks_enabled(&self) -> bool {
|
||||||
|
self.rollback_active
|
||||||
|
.load(std::sync::atomic::Ordering::Acquire)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn disable_rollbacks(&self) {
|
||||||
|
self.rollback_active
|
||||||
|
.store(false, std::sync::atomic::Ordering::Release);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn enable_rollbacks(&self) {
|
||||||
|
self.rollback_active
|
||||||
|
.store(true, std::sync::atomic::Ordering::Release);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write_nocache(&self, rows: Vec<DBRow>, flush: DBFlush) {
|
||||||
|
self.write_blocks_nocache(vec![rows], flush);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write_blocks(&self, blocks: Vec<Vec<DBRow>>, flush: DBFlush) {
|
||||||
|
self.write_blocks_inner(blocks, flush, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write_blocks_nocache(&self, blocks: Vec<Vec<DBRow>>, flush: DBFlush) {
|
||||||
|
self.write_blocks_inner(blocks, flush, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn write_blocks_inner(&self, blocks: Vec<Vec<DBRow>>, flush: DBFlush, skip_cache: bool) {
|
||||||
debug!(
|
debug!(
|
||||||
"writing {} rows to {:?}, flush={:?}",
|
"writing {} rows to {:?}, flush={:?}",
|
||||||
rows.len(),
|
blocks.iter().map(|b| b.len()).sum::<usize>(),
|
||||||
self.db,
|
self.db,
|
||||||
flush
|
flush
|
||||||
);
|
);
|
||||||
rows.sort_unstable_by(|a, b| a.key.cmp(&b.key));
|
|
||||||
let mut batch = rocksdb::WriteBatch::default();
|
let mut batch = rocksdb::WriteBatch::default();
|
||||||
for row in rows {
|
for mut rows in blocks {
|
||||||
batch.put(&row.key, &row.value);
|
rows.sort_unstable_by(|a, b| a.key.cmp(&b.key));
|
||||||
|
if !skip_cache
|
||||||
|
&& self
|
||||||
|
.rollback_active
|
||||||
|
.load(std::sync::atomic::Ordering::Acquire)
|
||||||
|
{
|
||||||
|
self.with_cache(|cache| {
|
||||||
|
for row in &rows {
|
||||||
|
cache.insert(row.key.clone());
|
||||||
|
batch.put(&row.key, &row.value);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// Special case: we should tick forward blocks
|
||||||
|
self.tick_next_block();
|
||||||
|
} else {
|
||||||
|
for row in &rows {
|
||||||
|
batch.put(&row.key, &row.value);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
let do_flush = match flush {
|
let do_flush = match flush {
|
||||||
DBFlush::Enable => true,
|
DBFlush::Enable => true,
|
||||||
@ -247,10 +374,22 @@ impl DB {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn put(&self, key: &[u8], value: &[u8]) {
|
pub fn put(&self, key: &[u8], value: &[u8]) {
|
||||||
|
if self
|
||||||
|
.rollback_active
|
||||||
|
.load(std::sync::atomic::Ordering::Acquire)
|
||||||
|
{
|
||||||
|
self.fill_cache(key);
|
||||||
|
}
|
||||||
self.db.put(key, value).unwrap();
|
self.db.put(key, value).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn put_sync(&self, key: &[u8], value: &[u8]) {
|
pub fn put_sync(&self, key: &[u8], value: &[u8]) {
|
||||||
|
if self
|
||||||
|
.rollback_active
|
||||||
|
.load(std::sync::atomic::Ordering::Acquire)
|
||||||
|
{
|
||||||
|
self.fill_cache(key);
|
||||||
|
}
|
||||||
let mut opts = rocksdb::WriteOptions::new();
|
let mut opts = rocksdb::WriteOptions::new();
|
||||||
opts.set_sync(true);
|
opts.set_sync(true);
|
||||||
self.db.put_opt(key, value, &opts).unwrap();
|
self.db.put_opt(key, value, &opts).unwrap();
|
||||||
|
|||||||
@ -86,6 +86,27 @@ impl Store {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn tick_next_block(&self) {
|
||||||
|
self.txstore_db.tick_next_block();
|
||||||
|
self.history_db.tick_next_block();
|
||||||
|
self.cache_db.tick_next_block();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn enable_rollback_cache(&self) {
|
||||||
|
self.txstore_db.enable_rollbacks();
|
||||||
|
self.history_db.enable_rollbacks();
|
||||||
|
self.cache_db.enable_rollbacks();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn rollback(&self, count: usize) {
|
||||||
|
let mut leftover = 0;
|
||||||
|
leftover += self.txstore_db.rollback(count);
|
||||||
|
leftover += self.history_db.rollback(count);
|
||||||
|
leftover += self.cache_db.rollback(count);
|
||||||
|
if leftover > 0 {
|
||||||
|
warn!("Rolling back all DB caches missed {count} blocks. Re-orged duplicates might still be active in the DB.")
|
||||||
|
}
|
||||||
|
}
|
||||||
pub fn txstore_db(&self) -> &DB {
|
pub fn txstore_db(&self) -> &DB {
|
||||||
&self.txstore_db
|
&self.txstore_db
|
||||||
}
|
}
|
||||||
@ -273,6 +294,18 @@ impl Indexer {
|
|||||||
let tip = daemon.getbestblockhash()?;
|
let tip = daemon.getbestblockhash()?;
|
||||||
let new_headers = self.get_new_headers(&daemon, &tip)?;
|
let new_headers = self.get_new_headers(&daemon, &tip)?;
|
||||||
|
|
||||||
|
// Deal with re-orgs before indexing
|
||||||
|
let headers_len = {
|
||||||
|
let mut headers = self.store.indexed_headers.write().unwrap();
|
||||||
|
let reorged = headers.apply(&new_headers);
|
||||||
|
assert_eq!(tip, *headers.tip());
|
||||||
|
// reorg happened
|
||||||
|
if !reorged.is_empty() {
|
||||||
|
self.store.rollback(reorged.len());
|
||||||
|
}
|
||||||
|
headers.len()
|
||||||
|
};
|
||||||
|
|
||||||
let to_add = self.headers_to_add(&new_headers);
|
let to_add = self.headers_to_add(&new_headers);
|
||||||
debug!(
|
debug!(
|
||||||
"adding transactions from {} blocks using {:?}",
|
"adding transactions from {} blocks using {:?}",
|
||||||
@ -301,16 +334,20 @@ impl Indexer {
|
|||||||
// update the synced tip *after* the new data is flushed to disk
|
// update the synced tip *after* the new data is flushed to disk
|
||||||
debug!("updating synced tip to {:?}", tip);
|
debug!("updating synced tip to {:?}", tip);
|
||||||
self.store.txstore_db.put_sync(b"t", &serialize(&tip));
|
self.store.txstore_db.put_sync(b"t", &serialize(&tip));
|
||||||
|
// Ticking cache DB "block every time we update"
|
||||||
let mut headers = self.store.indexed_headers.write().unwrap();
|
// This means that each "block" essentially contains the
|
||||||
headers.apply(new_headers);
|
// cache updates between each update() call, and we will
|
||||||
assert_eq!(tip, *headers.tip());
|
// rollback more cache_db than other DBs when rolling back
|
||||||
|
// but this is just a cache anyway.
|
||||||
|
// We'd rather not have bad data in the cache.
|
||||||
|
self.store.cache_db.tick_next_block();
|
||||||
|
|
||||||
if let FetchFrom::BlkFiles = self.from {
|
if let FetchFrom::BlkFiles = self.from {
|
||||||
|
self.store.enable_rollback_cache();
|
||||||
self.from = FetchFrom::Bitcoind;
|
self.from = FetchFrom::Bitcoind;
|
||||||
}
|
}
|
||||||
|
|
||||||
self.tip_metric.set(headers.len() as i64 - 1);
|
self.tip_metric.set(headers_len as i64 - 1);
|
||||||
|
|
||||||
Ok(tip)
|
Ok(tip)
|
||||||
}
|
}
|
||||||
@ -324,7 +361,7 @@ impl Indexer {
|
|||||||
};
|
};
|
||||||
{
|
{
|
||||||
let _timer = self.start_timer("add_write");
|
let _timer = self.start_timer("add_write");
|
||||||
self.store.txstore_db.write(rows, self.flush);
|
self.store.txstore_db.write_blocks(rows, self.flush);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.store
|
self.store
|
||||||
@ -360,7 +397,7 @@ impl Indexer {
|
|||||||
}
|
}
|
||||||
index_blocks(blocks, &previous_txos_map, &self.iconfig)
|
index_blocks(blocks, &previous_txos_map, &self.iconfig)
|
||||||
};
|
};
|
||||||
self.store.history_db.write(rows, self.flush);
|
self.store.history_db.write_blocks(rows, self.flush);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -820,7 +857,7 @@ impl ChainQuery {
|
|||||||
// save updated utxo set to cache
|
// save updated utxo set to cache
|
||||||
if let Some(lastblock) = lastblock {
|
if let Some(lastblock) = lastblock {
|
||||||
if had_cache || processed_items > MIN_HISTORY_ITEMS_TO_CACHE {
|
if had_cache || processed_items > MIN_HISTORY_ITEMS_TO_CACHE {
|
||||||
self.store.cache_db.write(
|
self.store.cache_db.write_nocache(
|
||||||
vec![UtxoCacheRow::new(scripthash, &newutxos, &lastblock).into_row()],
|
vec![UtxoCacheRow::new(scripthash, &newutxos, &lastblock).into_row()],
|
||||||
flush,
|
flush,
|
||||||
);
|
);
|
||||||
@ -928,7 +965,7 @@ impl ChainQuery {
|
|||||||
// save updated stats to cache
|
// save updated stats to cache
|
||||||
if let Some(lastblock) = lastblock {
|
if let Some(lastblock) = lastblock {
|
||||||
if newstats.funded_txo_count + newstats.spent_txo_count > MIN_HISTORY_ITEMS_TO_CACHE {
|
if newstats.funded_txo_count + newstats.spent_txo_count > MIN_HISTORY_ITEMS_TO_CACHE {
|
||||||
self.store.cache_db.write(
|
self.store.cache_db.write_nocache(
|
||||||
vec![StatsCacheRow::new(scripthash, &newstats, &lastblock).into_row()],
|
vec![StatsCacheRow::new(scripthash, &newstats, &lastblock).into_row()],
|
||||||
flush,
|
flush,
|
||||||
);
|
);
|
||||||
@ -1256,7 +1293,7 @@ fn load_blockheaders(db: &DB) -> HashMap<BlockHash, BlockHeader> {
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec<DBRow> {
|
fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec<Vec<DBRow>> {
|
||||||
// persist individual transactions:
|
// persist individual transactions:
|
||||||
// T{txid} → {rawtx}
|
// T{txid} → {rawtx}
|
||||||
// C{txid}{blockhash}{height} →
|
// C{txid}{blockhash}{height} →
|
||||||
@ -1284,7 +1321,6 @@ fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec<DBRo
|
|||||||
rows.push(BlockRow::new_done(blockhash).into_row()); // mark block as "added"
|
rows.push(BlockRow::new_done(blockhash).into_row()); // mark block as "added"
|
||||||
rows
|
rows
|
||||||
})
|
})
|
||||||
.flatten()
|
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1370,7 +1406,7 @@ fn index_blocks(
|
|||||||
block_entries: &[BlockEntry],
|
block_entries: &[BlockEntry],
|
||||||
previous_txos_map: &HashMap<OutPoint, TxOut>,
|
previous_txos_map: &HashMap<OutPoint, TxOut>,
|
||||||
iconfig: &IndexerConfig,
|
iconfig: &IndexerConfig,
|
||||||
) -> Vec<DBRow> {
|
) -> Vec<Vec<DBRow>> {
|
||||||
block_entries
|
block_entries
|
||||||
.par_iter() // serialization is CPU-intensive
|
.par_iter() // serialization is CPU-intensive
|
||||||
.map(|b| {
|
.map(|b| {
|
||||||
@ -1389,7 +1425,6 @@ fn index_blocks(
|
|||||||
rows.push(BlockRow::new_done(full_hash(&b.entry.hash()[..])).into_row()); // mark block as "indexed"
|
rows.push(BlockRow::new_done(full_hash(&b.entry.hash()[..])).into_row()); // mark block as "indexed"
|
||||||
rows
|
rows
|
||||||
})
|
})
|
||||||
.flatten()
|
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -111,7 +111,7 @@ impl HeaderList {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let mut headers = HeaderList::empty();
|
let mut headers = HeaderList::empty();
|
||||||
headers.apply(headers.order(headers_chain));
|
headers.apply(&headers.order(headers_chain));
|
||||||
headers
|
headers
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -155,7 +155,8 @@ impl HeaderList {
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn apply(&mut self, new_headers: Vec<HeaderEntry>) {
|
/// Returns any re-orged headers
|
||||||
|
pub fn apply(&mut self, new_headers: &[HeaderEntry]) -> Vec<HeaderEntry> {
|
||||||
// new_headers[i] -> new_headers[i - 1] (i.e. new_headers.last() is the tip)
|
// new_headers[i] -> new_headers[i - 1] (i.e. new_headers.last() is the tip)
|
||||||
for i in 1..new_headers.len() {
|
for i in 1..new_headers.len() {
|
||||||
assert_eq!(new_headers[i - 1].height() + 1, new_headers[i].height());
|
assert_eq!(new_headers[i - 1].height() + 1, new_headers[i].height());
|
||||||
@ -175,21 +176,22 @@ impl HeaderList {
|
|||||||
assert_eq!(entry.header().prev_blockhash, expected_prev_blockhash);
|
assert_eq!(entry.header().prev_blockhash, expected_prev_blockhash);
|
||||||
height
|
height
|
||||||
}
|
}
|
||||||
None => return,
|
None => return vec![],
|
||||||
};
|
};
|
||||||
debug!(
|
debug!(
|
||||||
"applying {} new headers from height {}",
|
"applying {} new headers from height {}",
|
||||||
new_headers.len(),
|
new_headers.len(),
|
||||||
new_height
|
new_height
|
||||||
);
|
);
|
||||||
let _removed = self.headers.split_off(new_height); // keep [0..new_height) entries
|
let removed = self.headers.split_off(new_height); // keep [0..new_height) entries
|
||||||
for new_header in new_headers {
|
for new_header in new_headers {
|
||||||
let height = new_header.height();
|
let height = new_header.height();
|
||||||
assert_eq!(height, self.headers.len());
|
assert_eq!(height, self.headers.len());
|
||||||
self.tip = *new_header.hash();
|
self.tip = *new_header.hash();
|
||||||
self.headers.push(new_header);
|
self.headers.push(new_header.clone());
|
||||||
self.heights.insert(self.tip, height);
|
self.heights.insert(self.tip, height);
|
||||||
}
|
}
|
||||||
|
removed
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn header_by_blockhash(&self, blockhash: &BlockHash) -> Option<&HeaderEntry> {
|
pub fn header_by_blockhash(&self, blockhash: &BlockHash) -> Option<&HeaderEntry> {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user