Compare commits

...

8 Commits

Author SHA1 Message Date
Roman Zeyde
06a1a88747
Experiment with Rayon 2018-11-22 22:53:24 +02:00
Roman Zeyde
50adc89f0e
WIP
logging
no mempool
2018-11-22 21:44:54 +02:00
Roman Zeyde
64ab3560c2
WIP 2018-11-22 13:51:59 +02:00
Roman Zeyde
44cd77524e
Monitor REST API handlers' latency, methods and error codes 2018-11-22 13:23:03 +02:00
Roman Zeyde
cec6c35e28
Move default latency buckets' definition into metrics.rs 2018-11-22 13:22:22 +02:00
Roman Zeyde
91c3519a4a
Index while compaction is enabled (for SSD) 2018-11-21 13:06:10 +02:00
Roman Zeyde
97d8e3fdbc
Add testnet benchmarking script 2018-11-21 13:06:10 +02:00
Roman Zeyde
581a901526
Remove FUNDING_TXN_LIMIT limit for performance tests 2018-11-21 10:01:20 +02:00
9 changed files with 104 additions and 62 deletions

1
Cargo.lock generated
View File

@ -374,6 +374,7 @@ dependencies = [
"num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"page_size 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
"prometheus 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rocksdb 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)",
"secp256k1 0.11.5 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -41,6 +41,7 @@ tiny_http = "0.6"
hyper = "0.12"
url = "1.0"
lru-cache = "0.1.1"
rayon = "1.0"
[dependencies.bitcoin]
version = "0.15.1"

View File

@ -64,7 +64,7 @@ fn run_server(config: &Config) -> Result<()> {
if server.is_none() {
let info = app.daemon().getblockchaininfo()?;
if info.initialblockdownload == false && info.verificationprogress > 0.9999 {
server = Some(rest::run_server(&config, query.clone()));
server = Some(rest::run_server(&config, query.clone(), &metrics));
} else {
warn!("bitcoind not fully synced waiting");
}

View File

@ -16,6 +16,7 @@ extern crate lru_cache;
extern crate num_cpus;
extern crate page_size;
extern crate prometheus;
extern crate rayon;
extern crate rocksdb;
extern crate secp256k1;
extern crate serde;

View File

@ -66,6 +66,13 @@ impl Metrics {
h
}
pub fn default_latency_buckets() -> Vec<f64> {
vec![
1e-4, 2e-4, 5e-4, 1e-3, 2e-3, 5e-3, 1e-2, 2e-2, 5e-2, 0.1, 0.2, 0.5, 1., 2., 5., 10.,
20., 50., 100.,
]
}
pub fn start(&self) {
let server = tiny_http::Server::http(self.addr).expect(&format!(
"failed to start monitoring HTTP server at {}",

View File

@ -3,6 +3,7 @@ use bitcoin::blockdata::block::Block;
use bitcoin::blockdata::transaction::Transaction;
use bitcoin::consensus::encode::{deserialize, serialize};
use bitcoin::util::hash::Sha256dHash;
use rayon::prelude::*;
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, RwLock};
@ -18,8 +19,6 @@ use util::{
use errors::*;
const FUNDING_TXN_LIMIT: usize = 100;
#[derive(Clone)]
pub struct FundingOutput {
pub txn: Option<TxnHeight>,
@ -167,7 +166,6 @@ fn txids_by_script_hash(store: &ReadStore, script_hash: &[u8]) -> Vec<HashPrefix
store
.scan(&TxOutRow::filter(script_hash))
.iter()
.take(FUNDING_TXN_LIMIT + 1)
.map(|row| TxOutRow::from_row(row).txid_prefix)
.collect()
}
@ -208,16 +206,12 @@ pub struct Query {
impl Query {
pub fn new(app: Arc<App>, metrics: &Metrics) -> Arc<Query> {
let latency_buckets = vec![
1e-4, 2e-4, 5e-4, 1e-3, 2e-3, 5e-3, 1e-2, 2e-2, 5e-2, 0.1, 0.2, 0.5, 1., 2., 5., 10.,
20., 50., 100.,
];
Arc::new(Query {
app,
tracker: RwLock::new(Tracker::new(metrics)),
latency: metrics.histogram_vec(
HistogramOpts::new("query_latency", "Query latency (in seconds)")
.buckets(latency_buckets),
.buckets(Metrics::default_latency_buckets()),
&["type"],
),
})
@ -228,26 +222,39 @@ impl Query {
store: &ReadStore,
prefixes: Vec<HashPrefix>,
) -> Result<Vec<TxnHeight>> {
if prefixes.len() > FUNDING_TXN_LIMIT {
bail!("Too many txs");
}
let mut txns = vec![];
let _timer = self
.latency
.with_label_values(&["load_txns_by_prefix"])
.start_timer();
for txid_prefix in prefixes {
for tx_row in txrows_by_prefix(store, &txid_prefix) {
// lookup full transaction ID (with confirmed height & blockhash) from DB
let tx_rows: Vec<TxRow> = {
let _timer = self
.latency
.with_label_values(&["load_txns_by_prefix-txrows_by_prefix"])
.start_timer();
prefixes
.par_iter()
.flat_map(|txid_prefix| txrows_by_prefix(store, &txid_prefix))
.collect()
};
// lookup actual transaction data from DB / bitcoind
let _timer = self
.latency
.with_label_values(&["load_txns_by_prefix-tx_get"])
.start_timer();
let txns = tx_rows
.par_iter()
.map(|tx_row| {
let txid: Sha256dHash = deserialize(&tx_row.key.txid).unwrap();
let txn = self.tx_get(&txid).chain_err(|| "cannot locate tx")?;
txns.push(TxnHeight {
let txn = self.tx_get(&txid).expect("cannot locate tx");
TxnHeight {
txn,
height: tx_row.height,
blockhash: tx_row.blockhash,
})
}
}
}
}).collect();
Ok(txns)
}
@ -260,10 +267,22 @@ impl Query {
.latency
.with_label_values(&["find_spending_input"])
.start_timer();
let spending_txns: Vec<TxnHeight> = self.load_txns_by_prefix(
store,
txids_by_funding_output(store, &funding.txn_id, funding.output_index),
)?;
let txid_prefixes = {
let _timer = self
.latency
.with_label_values(&["find_spending_input-txids_by_funding_output"])
.start_timer();
txids_by_funding_output(store, &funding.txn_id, funding.output_index)
};
let spending_txns: Vec<TxnHeight> = {
let _timer = self
.latency
.with_label_values(&["find_spending_input-load_txns_by_prefix"])
.start_timer();
self.load_txns_by_prefix(store, txid_prefixes)
}?;
let mut spending_inputs = vec![];
for t in &spending_txns {
for (input_index, input) in t.txn.input.iter().enumerate() {
@ -318,53 +337,32 @@ impl Query {
.latency
.with_label_values(&["confirmed_status"])
.start_timer();
info!("confirmed_status: start");
let mut funding = vec![];
let mut spending = vec![];
let read_store = self.app.read_store();
let txid_prefixes = txids_by_script_hash(read_store, script_hash);
info!("confirmed_status: {} prefixes", txid_prefixes.len());
for t in self.load_txns_by_prefix(read_store, txid_prefixes)? {
funding.extend(self.find_funding_outputs(&t, script_hash));
}
info!("confirmed_status: {} funding outputs", funding.len());
for funding_output in &funding {
if let Some(spent) = self.find_spending_input(read_store, &funding_output)? {
spending.push(spent);
}
}
Ok((funding, spending))
}
fn mempool_status(
&self,
script_hash: &[u8],
confirmed_funding: &[FundingOutput],
) -> Result<(Vec<FundingOutput>, Vec<SpendingInput>)> {
let _timer = self
.latency
.with_label_values(&["mempool_status"])
.start_timer();
let mut funding = vec![];
let mut spending = vec![];
let tracker = self.tracker.read().unwrap();
let txid_prefixes = txids_by_script_hash(tracker.index(), script_hash);
for t in self.load_txns_by_prefix(tracker.index(), txid_prefixes)? {
funding.extend(self.find_funding_outputs(&t, script_hash));
}
// // TODO: dedup outputs (somehow) both confirmed and in mempool (e.g. reorg?)
for funding_output in funding.iter().chain(confirmed_funding.iter()) {
if let Some(spent) = self.find_spending_input(tracker.index(), &funding_output)? {
spending.push(spent);
}
}
info!("confirmed_status: {} spending inputs", spending.len());
Ok((funding, spending))
}
pub fn status(&self, script_hash: &[u8]) -> Result<Status> {
let _timer = self.latency.with_label_values(&["status"]).start_timer();
let confirmed = self.confirmed_status(script_hash)?;
//.chain_err(|| "failed to get confirmed status")?;
let mempool = self.mempool_status(script_hash, &confirmed.0)?;
//.chain_err(|| "failed to get mempool status")?;
Ok(Status { confirmed, mempool })
Ok(Status {
confirmed,
mempool: (vec![], vec![]),
})
}
pub fn find_spending_by_outpoint(&self, outpoint: OutPoint) -> Result<Option<SpendingInput>> {

View File

@ -12,6 +12,7 @@ use hyper::service::service_fn_ok;
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use index::compute_script_hash;
use mempool::MEMPOOL_HEIGHT;
use metrics::{HistogramOpts, MetricOpts, Metrics};
use query::{FundingOutput, Query, SpendingInput, TxnHeight};
use serde::Serialize;
use serde_json;
@ -360,18 +361,34 @@ fn attach_txs_data(txs: &mut Vec<TransactionValue>, config: &Config, query: &Arc
}
}
pub fn run_server(config: &Config, query: Arc<Query>) {
pub fn run_server(config: &Config, query: Arc<Query>, metrics: &Metrics) {
let addr = &config.http_addr;
info!("REST server running on {}", addr);
let config = Arc::new(config.clone());
let latency_histogram = metrics.histogram(
HistogramOpts::new(
"rest_handler_latency",
"REST API handler latency (in seconds)",
).buckets(Metrics::default_latency_buckets()),
);
let response_counter = metrics.counter_vec(
MetricOpts::new(
"rest_handler_response",
"REST API handler latency (in seconds)",
),
&["method", "code"],
);
let new_service = move || {
let query = query.clone();
let config = config.clone();
let latency_histogram = latency_histogram.clone();
let response_counter = response_counter.clone();
service_fn_ok(
move |req: Request<Body>| match handle_request(req, &query, &config) {
service_fn_ok(move |req: Request<Body>| {
let _timer = latency_histogram.start_timer();
let response = match handle_request(&req, &query, &config) {
Ok(response) => response,
Err(e) => {
warn!("{:?}", e);
@ -381,8 +398,12 @@ pub fn run_server(config: &Config, query: Arc<Query>) {
.body(Body::from(e.1))
.unwrap()
}
},
)
};
response_counter
.with_label_values(&[req.method().as_str(), response.status().as_str()])
.inc();
response
})
};
let server = Server::bind(&addr)
@ -395,7 +416,7 @@ pub fn run_server(config: &Config, query: Arc<Query>) {
}
fn handle_request(
req: Request<Body>,
req: &Request<Body>,
query: &Arc<Query>,
config: &Config,
) -> Result<Response<Body>, HttpError> {

View File

@ -49,7 +49,7 @@ impl DBStore {
db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy);
db_opts.set_target_file_size_base(256 << 20);
db_opts.set_write_buffer_size(256 << 20);
db_opts.set_disable_auto_compactions(opts.bulk_import); // for initial bulk load
db_opts.set_disable_auto_compactions(false); // for initial bulk load
db_opts.set_advise_random_on_open(!opts.bulk_import); // bulk load uses sequential I/O
if opts.low_memory == false {
db_opts.set_compaction_readahead_size(1 << 20);
@ -90,7 +90,6 @@ impl DBStore {
let store = DBStore::open_opts(opts);
info!("starting full compaction");
store.db.compact_range(None, None); // would take a while
info!("finished full compaction");
store
}

14
testnet-bench.sh Executable file
View File

@ -0,0 +1,14 @@
#!/bin/bash
set -x
time curl http://localhost:3001/address/n2zbkoYEUQfXeRmdzzExGxFbyFBGcRAkVL # 2
time curl http://localhost:3001/address/mm41aypj76mpTqdRoT5sQ4xrpXYEsGQQ3j # 15
time curl http://localhost:3001/address/mkMTj6YCu8pQTLiM5hs2cfS3S1QbSAmyPG # 59
time curl http://localhost:3001/address/msYN6FJfvA3p2XoVzgjZpZV4AbEcwBQEEJ # 396
time curl http://localhost:3001/address/2NDhzMt2D9ZxXapbuq567WGeWP7NuDN81cg # 4.5K
time curl http://localhost:3001/address/msjBEBAqheDdgqVab1mYAh7RULQFjEB9yS # 8.9K
time curl http://localhost:3001/address/mxosQ4CvQR8ipfWdRktyB3u16tauEdamGc # 13K
time curl http://localhost:3001/address/n2eMqTT929pb1RDNuqEnxdaLau1rxy3efi # 20K
time curl http://localhost:3001/address/mgxVT9fzHwYDsgEGJSZekKgYbAyrBkqdpi # 129K
time curl http://localhost:3001/address/2N66DDrmjDCMM3yMSYtAQyAqRtasSkFhbmX # 282K
time curl http://localhost:3001/address/2N8KqVf2fhoeL6pRWeyKRwJVsf3xL3JqcvF # 625K
time curl http://localhost:3001/address/2N9FY2fckLiZtdUEWSMYPe5Zrs1baHX7foG # 710K