Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
06a1a88747 | ||
|
|
50adc89f0e | ||
|
|
64ab3560c2 | ||
|
|
44cd77524e | ||
|
|
cec6c35e28 | ||
|
|
91c3519a4a | ||
|
|
97d8e3fdbc | ||
|
|
581a901526 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -374,6 +374,7 @@ dependencies = [
|
||||
"num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"page_size 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"prometheus 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rocksdb 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"secp256k1 0.11.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
|
||||
@ -41,6 +41,7 @@ tiny_http = "0.6"
|
||||
hyper = "0.12"
|
||||
url = "1.0"
|
||||
lru-cache = "0.1.1"
|
||||
rayon = "1.0"
|
||||
|
||||
[dependencies.bitcoin]
|
||||
version = "0.15.1"
|
||||
|
||||
@ -64,7 +64,7 @@ fn run_server(config: &Config) -> Result<()> {
|
||||
if server.is_none() {
|
||||
let info = app.daemon().getblockchaininfo()?;
|
||||
if info.initialblockdownload == false && info.verificationprogress > 0.9999 {
|
||||
server = Some(rest::run_server(&config, query.clone()));
|
||||
server = Some(rest::run_server(&config, query.clone(), &metrics));
|
||||
} else {
|
||||
warn!("bitcoind not fully synced waiting");
|
||||
}
|
||||
|
||||
@ -16,6 +16,7 @@ extern crate lru_cache;
|
||||
extern crate num_cpus;
|
||||
extern crate page_size;
|
||||
extern crate prometheus;
|
||||
extern crate rayon;
|
||||
extern crate rocksdb;
|
||||
extern crate secp256k1;
|
||||
extern crate serde;
|
||||
|
||||
@ -66,6 +66,13 @@ impl Metrics {
|
||||
h
|
||||
}
|
||||
|
||||
pub fn default_latency_buckets() -> Vec<f64> {
|
||||
vec![
|
||||
1e-4, 2e-4, 5e-4, 1e-3, 2e-3, 5e-3, 1e-2, 2e-2, 5e-2, 0.1, 0.2, 0.5, 1., 2., 5., 10.,
|
||||
20., 50., 100.,
|
||||
]
|
||||
}
|
||||
|
||||
pub fn start(&self) {
|
||||
let server = tiny_http::Server::http(self.addr).expect(&format!(
|
||||
"failed to start monitoring HTTP server at {}",
|
||||
|
||||
104
src/query.rs
104
src/query.rs
@ -3,6 +3,7 @@ use bitcoin::blockdata::block::Block;
|
||||
use bitcoin::blockdata::transaction::Transaction;
|
||||
use bitcoin::consensus::encode::{deserialize, serialize};
|
||||
use bitcoin::util::hash::Sha256dHash;
|
||||
use rayon::prelude::*;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
@ -18,8 +19,6 @@ use util::{
|
||||
|
||||
use errors::*;
|
||||
|
||||
const FUNDING_TXN_LIMIT: usize = 100;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct FundingOutput {
|
||||
pub txn: Option<TxnHeight>,
|
||||
@ -167,7 +166,6 @@ fn txids_by_script_hash(store: &ReadStore, script_hash: &[u8]) -> Vec<HashPrefix
|
||||
store
|
||||
.scan(&TxOutRow::filter(script_hash))
|
||||
.iter()
|
||||
.take(FUNDING_TXN_LIMIT + 1)
|
||||
.map(|row| TxOutRow::from_row(row).txid_prefix)
|
||||
.collect()
|
||||
}
|
||||
@ -208,16 +206,12 @@ pub struct Query {
|
||||
|
||||
impl Query {
|
||||
pub fn new(app: Arc<App>, metrics: &Metrics) -> Arc<Query> {
|
||||
let latency_buckets = vec![
|
||||
1e-4, 2e-4, 5e-4, 1e-3, 2e-3, 5e-3, 1e-2, 2e-2, 5e-2, 0.1, 0.2, 0.5, 1., 2., 5., 10.,
|
||||
20., 50., 100.,
|
||||
];
|
||||
Arc::new(Query {
|
||||
app,
|
||||
tracker: RwLock::new(Tracker::new(metrics)),
|
||||
latency: metrics.histogram_vec(
|
||||
HistogramOpts::new("query_latency", "Query latency (in seconds)")
|
||||
.buckets(latency_buckets),
|
||||
.buckets(Metrics::default_latency_buckets()),
|
||||
&["type"],
|
||||
),
|
||||
})
|
||||
@ -228,26 +222,39 @@ impl Query {
|
||||
store: &ReadStore,
|
||||
prefixes: Vec<HashPrefix>,
|
||||
) -> Result<Vec<TxnHeight>> {
|
||||
if prefixes.len() > FUNDING_TXN_LIMIT {
|
||||
bail!("Too many txs");
|
||||
}
|
||||
|
||||
let mut txns = vec![];
|
||||
let _timer = self
|
||||
.latency
|
||||
.with_label_values(&["load_txns_by_prefix"])
|
||||
.start_timer();
|
||||
for txid_prefix in prefixes {
|
||||
for tx_row in txrows_by_prefix(store, &txid_prefix) {
|
||||
|
||||
// lookup full transaction ID (with confirmed height & blockhash) from DB
|
||||
let tx_rows: Vec<TxRow> = {
|
||||
let _timer = self
|
||||
.latency
|
||||
.with_label_values(&["load_txns_by_prefix-txrows_by_prefix"])
|
||||
.start_timer();
|
||||
prefixes
|
||||
.par_iter()
|
||||
.flat_map(|txid_prefix| txrows_by_prefix(store, &txid_prefix))
|
||||
.collect()
|
||||
};
|
||||
|
||||
// lookup actual transaction data from DB / bitcoind
|
||||
let _timer = self
|
||||
.latency
|
||||
.with_label_values(&["load_txns_by_prefix-tx_get"])
|
||||
.start_timer();
|
||||
let txns = tx_rows
|
||||
.par_iter()
|
||||
.map(|tx_row| {
|
||||
let txid: Sha256dHash = deserialize(&tx_row.key.txid).unwrap();
|
||||
let txn = self.tx_get(&txid).chain_err(|| "cannot locate tx")?;
|
||||
txns.push(TxnHeight {
|
||||
let txn = self.tx_get(&txid).expect("cannot locate tx");
|
||||
TxnHeight {
|
||||
txn,
|
||||
height: tx_row.height,
|
||||
blockhash: tx_row.blockhash,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}).collect();
|
||||
Ok(txns)
|
||||
}
|
||||
|
||||
@ -260,10 +267,22 @@ impl Query {
|
||||
.latency
|
||||
.with_label_values(&["find_spending_input"])
|
||||
.start_timer();
|
||||
let spending_txns: Vec<TxnHeight> = self.load_txns_by_prefix(
|
||||
store,
|
||||
txids_by_funding_output(store, &funding.txn_id, funding.output_index),
|
||||
)?;
|
||||
|
||||
let txid_prefixes = {
|
||||
let _timer = self
|
||||
.latency
|
||||
.with_label_values(&["find_spending_input-txids_by_funding_output"])
|
||||
.start_timer();
|
||||
txids_by_funding_output(store, &funding.txn_id, funding.output_index)
|
||||
};
|
||||
|
||||
let spending_txns: Vec<TxnHeight> = {
|
||||
let _timer = self
|
||||
.latency
|
||||
.with_label_values(&["find_spending_input-load_txns_by_prefix"])
|
||||
.start_timer();
|
||||
self.load_txns_by_prefix(store, txid_prefixes)
|
||||
}?;
|
||||
let mut spending_inputs = vec![];
|
||||
for t in &spending_txns {
|
||||
for (input_index, input) in t.txn.input.iter().enumerate() {
|
||||
@ -318,53 +337,32 @@ impl Query {
|
||||
.latency
|
||||
.with_label_values(&["confirmed_status"])
|
||||
.start_timer();
|
||||
info!("confirmed_status: start");
|
||||
let mut funding = vec![];
|
||||
let mut spending = vec![];
|
||||
let read_store = self.app.read_store();
|
||||
let txid_prefixes = txids_by_script_hash(read_store, script_hash);
|
||||
info!("confirmed_status: {} prefixes", txid_prefixes.len());
|
||||
for t in self.load_txns_by_prefix(read_store, txid_prefixes)? {
|
||||
funding.extend(self.find_funding_outputs(&t, script_hash));
|
||||
}
|
||||
info!("confirmed_status: {} funding outputs", funding.len());
|
||||
for funding_output in &funding {
|
||||
if let Some(spent) = self.find_spending_input(read_store, &funding_output)? {
|
||||
spending.push(spent);
|
||||
}
|
||||
}
|
||||
Ok((funding, spending))
|
||||
}
|
||||
|
||||
fn mempool_status(
|
||||
&self,
|
||||
script_hash: &[u8],
|
||||
confirmed_funding: &[FundingOutput],
|
||||
) -> Result<(Vec<FundingOutput>, Vec<SpendingInput>)> {
|
||||
let _timer = self
|
||||
.latency
|
||||
.with_label_values(&["mempool_status"])
|
||||
.start_timer();
|
||||
let mut funding = vec![];
|
||||
let mut spending = vec![];
|
||||
let tracker = self.tracker.read().unwrap();
|
||||
let txid_prefixes = txids_by_script_hash(tracker.index(), script_hash);
|
||||
for t in self.load_txns_by_prefix(tracker.index(), txid_prefixes)? {
|
||||
funding.extend(self.find_funding_outputs(&t, script_hash));
|
||||
}
|
||||
// // TODO: dedup outputs (somehow) both confirmed and in mempool (e.g. reorg?)
|
||||
for funding_output in funding.iter().chain(confirmed_funding.iter()) {
|
||||
if let Some(spent) = self.find_spending_input(tracker.index(), &funding_output)? {
|
||||
spending.push(spent);
|
||||
}
|
||||
}
|
||||
info!("confirmed_status: {} spending inputs", spending.len());
|
||||
Ok((funding, spending))
|
||||
}
|
||||
|
||||
pub fn status(&self, script_hash: &[u8]) -> Result<Status> {
|
||||
let _timer = self.latency.with_label_values(&["status"]).start_timer();
|
||||
let confirmed = self.confirmed_status(script_hash)?;
|
||||
//.chain_err(|| "failed to get confirmed status")?;
|
||||
let mempool = self.mempool_status(script_hash, &confirmed.0)?;
|
||||
//.chain_err(|| "failed to get mempool status")?;
|
||||
Ok(Status { confirmed, mempool })
|
||||
Ok(Status {
|
||||
confirmed,
|
||||
mempool: (vec![], vec![]),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn find_spending_by_outpoint(&self, outpoint: OutPoint) -> Result<Option<SpendingInput>> {
|
||||
|
||||
33
src/rest.rs
33
src/rest.rs
@ -12,6 +12,7 @@ use hyper::service::service_fn_ok;
|
||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||
use index::compute_script_hash;
|
||||
use mempool::MEMPOOL_HEIGHT;
|
||||
use metrics::{HistogramOpts, MetricOpts, Metrics};
|
||||
use query::{FundingOutput, Query, SpendingInput, TxnHeight};
|
||||
use serde::Serialize;
|
||||
use serde_json;
|
||||
@ -360,18 +361,34 @@ fn attach_txs_data(txs: &mut Vec<TransactionValue>, config: &Config, query: &Arc
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run_server(config: &Config, query: Arc<Query>) {
|
||||
pub fn run_server(config: &Config, query: Arc<Query>, metrics: &Metrics) {
|
||||
let addr = &config.http_addr;
|
||||
info!("REST server running on {}", addr);
|
||||
|
||||
let config = Arc::new(config.clone());
|
||||
let latency_histogram = metrics.histogram(
|
||||
HistogramOpts::new(
|
||||
"rest_handler_latency",
|
||||
"REST API handler latency (in seconds)",
|
||||
).buckets(Metrics::default_latency_buckets()),
|
||||
);
|
||||
let response_counter = metrics.counter_vec(
|
||||
MetricOpts::new(
|
||||
"rest_handler_response",
|
||||
"REST API handler latency (in seconds)",
|
||||
),
|
||||
&["method", "code"],
|
||||
);
|
||||
|
||||
let new_service = move || {
|
||||
let query = query.clone();
|
||||
let config = config.clone();
|
||||
let latency_histogram = latency_histogram.clone();
|
||||
let response_counter = response_counter.clone();
|
||||
|
||||
service_fn_ok(
|
||||
move |req: Request<Body>| match handle_request(req, &query, &config) {
|
||||
service_fn_ok(move |req: Request<Body>| {
|
||||
let _timer = latency_histogram.start_timer();
|
||||
let response = match handle_request(&req, &query, &config) {
|
||||
Ok(response) => response,
|
||||
Err(e) => {
|
||||
warn!("{:?}", e);
|
||||
@ -381,8 +398,12 @@ pub fn run_server(config: &Config, query: Arc<Query>) {
|
||||
.body(Body::from(e.1))
|
||||
.unwrap()
|
||||
}
|
||||
},
|
||||
)
|
||||
};
|
||||
response_counter
|
||||
.with_label_values(&[req.method().as_str(), response.status().as_str()])
|
||||
.inc();
|
||||
response
|
||||
})
|
||||
};
|
||||
|
||||
let server = Server::bind(&addr)
|
||||
@ -395,7 +416,7 @@ pub fn run_server(config: &Config, query: Arc<Query>) {
|
||||
}
|
||||
|
||||
fn handle_request(
|
||||
req: Request<Body>,
|
||||
req: &Request<Body>,
|
||||
query: &Arc<Query>,
|
||||
config: &Config,
|
||||
) -> Result<Response<Body>, HttpError> {
|
||||
|
||||
@ -49,7 +49,7 @@ impl DBStore {
|
||||
db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy);
|
||||
db_opts.set_target_file_size_base(256 << 20);
|
||||
db_opts.set_write_buffer_size(256 << 20);
|
||||
db_opts.set_disable_auto_compactions(opts.bulk_import); // for initial bulk load
|
||||
db_opts.set_disable_auto_compactions(false); // for initial bulk load
|
||||
db_opts.set_advise_random_on_open(!opts.bulk_import); // bulk load uses sequential I/O
|
||||
if opts.low_memory == false {
|
||||
db_opts.set_compaction_readahead_size(1 << 20);
|
||||
@ -90,7 +90,6 @@ impl DBStore {
|
||||
|
||||
let store = DBStore::open_opts(opts);
|
||||
info!("starting full compaction");
|
||||
store.db.compact_range(None, None); // would take a while
|
||||
info!("finished full compaction");
|
||||
store
|
||||
}
|
||||
|
||||
14
testnet-bench.sh
Executable file
14
testnet-bench.sh
Executable file
@ -0,0 +1,14 @@
|
||||
#!/bin/bash
|
||||
set -x
|
||||
time curl http://localhost:3001/address/n2zbkoYEUQfXeRmdzzExGxFbyFBGcRAkVL # 2
|
||||
time curl http://localhost:3001/address/mm41aypj76mpTqdRoT5sQ4xrpXYEsGQQ3j # 15
|
||||
time curl http://localhost:3001/address/mkMTj6YCu8pQTLiM5hs2cfS3S1QbSAmyPG # 59
|
||||
time curl http://localhost:3001/address/msYN6FJfvA3p2XoVzgjZpZV4AbEcwBQEEJ # 396
|
||||
time curl http://localhost:3001/address/2NDhzMt2D9ZxXapbuq567WGeWP7NuDN81cg # 4.5K
|
||||
time curl http://localhost:3001/address/msjBEBAqheDdgqVab1mYAh7RULQFjEB9yS # 8.9K
|
||||
time curl http://localhost:3001/address/mxosQ4CvQR8ipfWdRktyB3u16tauEdamGc # 13K
|
||||
time curl http://localhost:3001/address/n2eMqTT929pb1RDNuqEnxdaLau1rxy3efi # 20K
|
||||
time curl http://localhost:3001/address/mgxVT9fzHwYDsgEGJSZekKgYbAyrBkqdpi # 129K
|
||||
time curl http://localhost:3001/address/2N66DDrmjDCMM3yMSYtAQyAqRtasSkFhbmX # 282K
|
||||
time curl http://localhost:3001/address/2N8KqVf2fhoeL6pRWeyKRwJVsf3xL3JqcvF # 625K
|
||||
time curl http://localhost:3001/address/2N9FY2fckLiZtdUEWSMYPe5Zrs1baHX7foG # 710K
|
||||
Loading…
Reference in New Issue
Block a user