Compare commits

..

1 Commits

Author SHA1 Message Date
junderw
2962b4e3f8
electrum: force exit if graceful shutdown exceeds 5 seconds
Slow or zombie TCP clients can keep peer connection threads alive,
blocking the RPC drop/join on shutdown indefinitely. Replace the
shutdown thread-logging loop with a watchdog that forcibly exits the
process if graceful shutdown has not completed within 5 seconds.
2026-05-31 11:41:55 +09:00
12 changed files with 69 additions and 357 deletions

View File

@ -1,19 +0,0 @@
# electrs
## Rules
1. You are an expert Rust developer.
2. You are an expert Bitcoin developer.
3. If you are unsure of a change, ask the developer to make a choice proactively.
## Before testing
- Run cargo fmt (from root)
- command: `cargo fmt`
## Testing
- Run the checks script
- `./scripts/checks.sh`
- Run with tests only when a test is added or changed
- `INCLUDE_TESTS=1 ./scripts/checks.sh`

10
Cargo.lock generated
View File

@ -913,7 +913,6 @@ dependencies = [
"log",
"num_cpus",
"page_size",
"ppp",
"prometheus",
"rayon",
"rocksdb",
@ -1100,15 +1099,6 @@ version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964"
[[package]]
name = "ppp"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a7a2049cd2570bd67bf0228e86bf850f8ceb5190a345c471d03a909da6049e0"
dependencies = [
"thiserror",
]
[[package]]
name = "ppv-lite86"
version = "0.2.21"

View File

@ -45,7 +45,6 @@ socket2 = { version = "0.4", features = ["all"] }
num_cpus = "1.12.0"
page_size = "0.4.2"
prometheus = "0.13"
ppp = "2.3.0"
rayon = "1.5.0"
rocksdb = "0.24.0"
serde = "1.0.118"

View File

@ -57,8 +57,6 @@ TESTNAME="Running cargo clippy check electrum-discovery + liquid"
echo "$TESTNAME"
cargo clippy $@ -q -F electrum-discovery,liquid
if [ $INCLUDE_TESTS ]; then
TESTNAME="Running cargo test with all features"
echo "$TESTNAME"
cargo test $@ -q --lib --all-features
fi
TESTNAME="Running cargo test with all features"
echo "$TESTNAME"
cargo test $@ -q --lib --all-features

View File

@ -122,17 +122,25 @@ fn run_server(config: Arc<Config>) -> Result<()> {
if let Err(err) = signal.wait(Duration::from_millis(config.main_loop_delay), true) {
info!("stopping server: {}", err);
electrs::util::spawn_thread("shutdown-thread-checker", || {
let mut counter = 40;
let interval_ms = 500;
// Watchdog: give the graceful shutdown ~5 seconds to close all TCP
// connections. If it hasn't completed by then (e.g. clients holding
// sockets open with slow/zombie connections), force the process to
// exit so we don't hang indefinitely.
electrs::util::spawn_thread("shutdown-watchdog", || {
let timeout = Duration::from_secs(5);
let interval = Duration::from_millis(500);
let mut elapsed = Duration::ZERO;
while counter > 0 {
while elapsed < timeout {
electrs::util::with_spawned_threads(|threads| {
debug!("Threads during shutdown: {:?}", threads);
});
std::thread::sleep(std::time::Duration::from_millis(interval_ms));
counter -= 1;
std::thread::sleep(interval);
elapsed += interval;
}
error!("graceful shutdown timed out after 5 seconds, forcing exit");
process::exit(0);
});
rest_server.stop();

View File

@ -69,15 +69,14 @@ pub struct Config {
pub electrum_max_subscriptions: usize,
pub electrum_max_clients: usize,
pub electrum_idle_timeout: u64,
pub electrum_haproxy_depth: usize,
pub electrum_connections_per_client: usize,
pub electrum_public_hosts: Option<crate::electrum::ServerHosts>,
#[cfg(feature = "liquid")]
pub parent_network: BNetwork,
#[cfg(feature = "liquid")]
pub asset_db_path: Option<PathBuf>,
#[cfg(feature = "electrum-discovery")]
pub electrum_public_hosts: Option<crate::electrum::ServerHosts>,
#[cfg(feature = "electrum-discovery")]
pub electrum_announce: bool,
#[cfg(feature = "electrum-discovery")]
@ -303,16 +302,6 @@ impl Config {
.long("electrum-idle-timeout")
.help("Maximum idle time in seconds since the last client request before disconnecting the Electrum connection.")
.default_value("600")
).arg(
Arg::with_name("electrum_haproxy_depth")
.long("electrum-haproxy-depth")
.help("Which HAProxy PROXY-protocol header layer identifies the real client IP. 0 disables PROXY-protocol detection; 1 uses the first (outermost) address, 2 the second, and so on. If the requested layer or any PROXY header is absent, no client IP is associated with the connection.")
.default_value("0")
).arg(
Arg::with_name("electrum_connections_per_client")
.long("electrum-connections-per-client")
.help("Maximum number of concurrent Electrum connections allowed per client (keyed by the HAProxy-reported address when available, otherwise the peer IP). 0 disables the per-client limit.")
.default_value("10")
);
#[cfg(unix)]
@ -532,8 +521,6 @@ impl Config {
let electrum_public_hosts = m
.value_of("electrum_public_hosts")
.map(|s| serde_json::from_str(s).expect("invalid --electrum-public-hosts"));
#[cfg(not(feature = "electrum-discovery"))]
let electrum_public_hosts: Option<crate::electrum::ServerHosts> = None;
let mut log = stderrlog::new();
log.verbosity(m.occurrences_of("verbosity") as usize);
@ -588,12 +575,6 @@ impl Config {
electrum_max_subscriptions: value_t_or_exit!(m, "electrum_max_subscriptions", usize),
electrum_max_clients: value_t_or_exit!(m, "electrum_max_clients", usize),
electrum_idle_timeout: value_t_or_exit!(m, "electrum_idle_timeout", u64),
electrum_haproxy_depth: value_t_or_exit!(m, "electrum_haproxy_depth", usize),
electrum_connections_per_client: value_t_or_exit!(
m,
"electrum_connections_per_client",
usize
),
jsonrpc_import: m.is_present("jsonrpc_import"),
light_mode: m.is_present("light_mode"),
main_loop_delay: value_t_or_exit!(m, "main_loop_delay", u64),
@ -623,6 +604,7 @@ impl Config {
#[cfg(feature = "liquid")]
asset_db_path,
#[cfg(feature = "electrum-discovery")]
electrum_public_hosts,
#[cfg(feature = "electrum-discovery")]
electrum_announce: m.is_present("electrum_announce"),

View File

@ -1,7 +1,8 @@
use std::collections::HashMap;
use std::convert::TryInto;
use std::fs;
use std::io::{BufRead, BufReader, Cursor, Read, Write};
use std::io::{BufRead, BufReader, Read, Write};
#[cfg(feature = "electrum-discovery")]
use std::net::IpAddr;
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::os::unix::fs::FileTypeExt;
@ -16,7 +17,6 @@ use std::time::{Duration, Instant};
use bitcoin::hashes::sha256d::Hash as Sha256dHash;
use error_chain::ChainedError;
use hex;
use ppp::PartialResult;
use serde_json::{from_str, Value};
use sha2::{Digest, Sha256};
@ -27,7 +27,7 @@ use elements::encode::serialize;
use crate::chain::Txid;
use crate::config::{Config, VERSION_STRING};
use crate::electrum::{get_electrum_height, ProtocolVersion, ServerFeatures};
use crate::electrum::{get_electrum_height, ProtocolVersion};
use crate::errors::*;
use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
use crate::new_index::{Query, Utxo};
@ -41,7 +41,7 @@ const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::new(1, 4);
const MAX_HEADERS: usize = 2016;
#[cfg(feature = "electrum-discovery")]
use crate::electrum::DiscoveryManager;
use crate::electrum::{DiscoveryManager, ServerFeatures};
// TODO: Sha256dHash should be a generic hash-container (since script hash is single SHA256)
fn hash_from_value(val: Option<&Value>) -> Result<Sha256dHash> {
@ -77,36 +77,6 @@ fn bool_from_value_or(val: Option<&Value>, name: &str, default: bool) -> Result<
bool_from_value(val, name)
}
/// Extracts the source socket address from a parsed PROXY protocol v1 header.
fn proxy_v1_source(addresses: &ppp::v1::Addresses) -> Option<SocketAddr> {
match addresses {
ppp::v1::Addresses::Tcp4(ip) => Some(SocketAddr::new(
IpAddr::V4(ip.source_address),
ip.source_port,
)),
ppp::v1::Addresses::Tcp6(ip) => Some(SocketAddr::new(
IpAddr::V6(ip.source_address),
ip.source_port,
)),
ppp::v1::Addresses::Unknown => None,
}
}
/// Extracts the source socket address from a parsed PROXY protocol v2 header.
fn proxy_v2_source(addresses: &ppp::v2::Addresses) -> Option<SocketAddr> {
match addresses {
ppp::v2::Addresses::IPv4(ip) => Some(SocketAddr::new(
IpAddr::V4(ip.source_address),
ip.source_port,
)),
ppp::v2::Addresses::IPv6(ip) => Some(SocketAddr::new(
IpAddr::V6(ip.source_address),
ip.source_port,
)),
ppp::v2::Addresses::Unspecified | ppp::v2::Addresses::Unix(_) => None,
}
}
// TODO: implement caching and delta updates
fn get_status_hash(txs: Vec<(Txid, Option<BlockId>)>, query: &Query) -> Option<FullHash> {
if txs.is_empty() {
@ -158,12 +128,6 @@ struct Connection {
idle_timeout: u64,
last_request_at: Instant,
die_please: Option<Receiver<()>>,
server_features: Arc<ServerFeatures>,
haproxy_depth: usize,
proxy_client: Option<SocketAddr>,
connections_per_client: usize,
client_counts: Arc<Mutex<HashMap<IpAddr, usize>>>,
registered_ip: Option<IpAddr>,
#[cfg(feature = "electrum-discovery")]
discovery: Option<Arc<DiscoveryManager>>,
}
@ -179,10 +143,6 @@ impl Connection {
max_subscriptions: usize,
idle_timeout: u64,
die_please: Receiver<()>,
server_features: Arc<ServerFeatures>,
haproxy_depth: usize,
connections_per_client: usize,
client_counts: Arc<Mutex<HashMap<IpAddr, usize>>>,
#[cfg(feature = "electrum-discovery")] discovery: Option<Arc<DiscoveryManager>>,
) -> Connection {
Connection {
@ -198,12 +158,6 @@ impl Connection {
idle_timeout,
last_request_at: Instant::now(),
die_please: Some(die_please),
server_features,
haproxy_depth,
proxy_client: None,
connections_per_client,
client_counts,
registered_ip: None,
#[cfg(feature = "electrum-discovery")]
discovery,
}
@ -225,8 +179,13 @@ impl Connection {
Ok(json!(self.query.config().electrum_banner.clone()))
}
#[cfg(feature = "electrum-discovery")]
fn server_features(&self) -> Result<Value> {
Ok(json!(self.server_features.as_ref()))
let discovery = self
.discovery
.as_ref()
.chain_err(|| "discovery is disabled")?;
Ok(json!(discovery.our_features()))
}
fn server_donation_address(&self) -> Result<Value> {
@ -531,8 +490,9 @@ impl Connection {
"server.peers.subscribe" => self.server_peers_subscribe(),
"server.ping" => Ok(Value::Null),
"server.version" => self.server_version(),
"server.features" => self.server_features(),
#[cfg(feature = "electrum-discovery")]
"server.features" => self.server_features(),
#[cfg(feature = "electrum-discovery")]
"server.add_peer" => self.server_add_peer(params),
@ -611,78 +571,13 @@ impl Connection {
fn close_idle_connection(&mut self, idle_for: Duration) {
info!(
"[{}] closing idle connection after {} seconds without requests (timeout: {} seconds)",
self.client_string(),
self.stream.addr_string(),
idle_for.as_secs(),
self.idle_timeout,
);
self.chan.close();
}
/// A human-readable identifier for the connected client, preferring the
/// HAProxy-reported address (when present) over the direct peer address.
fn client_string(&self) -> String {
match self.proxy_client {
Some(addr) => format!("{} via {}", addr, self.stream.addr_string()),
None => self.stream.addr_string(),
}
}
/// Resolves the PROXY-protocol parse result into the client address at the
/// configured `electrum-haproxy-depth` layer. A depth of 0, a missing PROXY
/// header, or a non-existent layer all leave the client unidentified.
fn set_proxy_client(&mut self, addresses: Option<Vec<SocketAddr>>) {
self.proxy_client = match (self.haproxy_depth, addresses) {
(0, _) | (_, None) => None,
(depth, Some(addrs)) => addrs.get(depth - 1).copied(),
};
}
/// Registers this connection against its client key (the HAProxy-reported IP
/// when available, otherwise the direct peer IP) and enforces the
/// `electrum-connections-per-client` limit. Returns an error if the limit has
/// already been reached, in which case the connection must be closed.
fn register_client(&mut self) -> Result<()> {
if self.connections_per_client == 0 {
// Per-client limit disabled.
return Ok(());
}
let key = match self
.proxy_client
.map(|addr| addr.ip())
.or_else(|| self.stream.direct_ip())
{
Some(key) => key,
// No usable client key (e.g. a unix socket with no PROXY header).
None => return Ok(()),
};
let mut counts = self.client_counts.lock().unwrap();
let count = counts.entry(key).or_insert(0);
if *count >= self.connections_per_client {
bail!(
"too many connections from client {} ({} max per client)",
key,
self.connections_per_client
);
}
*count += 1;
self.registered_ip = Some(key);
Ok(())
}
/// Releases this connection's slot in the per-client connection counter.
fn unregister_client(&mut self) {
if let Some(key) = self.registered_ip.take() {
let mut counts = self.client_counts.lock().unwrap();
if let Some(count) = counts.get_mut(&key) {
*count -= 1;
if *count == 0 {
counts.remove(&key);
}
}
}
}
fn handle_replies(&mut self, shutdown: crossbeam_channel::Receiver<()>) -> Result<()> {
let idle_timeout = Duration::from_secs(self.idle_timeout);
loop {
@ -714,14 +609,6 @@ impl Connection {
self.chan.close();
return Ok(());
}
Message::Proxy(addresses) => {
self.set_proxy_client(addresses);
if let Err(e) = self.register_client() {
info!("[{}] {}", self.client_string(), e);
self.chan.close();
return Ok(());
}
}
}
}
recv(shutdown) -> _ => {
@ -781,117 +668,11 @@ impl Connection {
}
}
/// Reads and parses any PROXY-protocol (HAProxy) headers found at the very
/// start of the connection. Returns the source address reported by each
/// proxy layer (outermost first), or `None` if no PROXY header was present,
/// together with any bytes that were read past the header(s) and belong to
/// the Electrum request stream.
fn read_proxy_headers(
stream: &mut ConnectionStream,
) -> Result<(Option<Vec<SocketAddr>>, Vec<u8>)> {
// Upper bound on how much we are willing to buffer while looking for
// PROXY headers, to avoid unbounded memory use from a slow/malicious peer.
const MAX_PROXY_HEADER_SIZE: usize = 4096;
enum Step {
Parsed(usize, Option<SocketAddr>),
NeedMore,
Done,
}
let mut buf: Vec<u8> = Vec::with_capacity(256);
let mut addrs: Vec<SocketAddr> = Vec::new();
let mut saw_proxy = false;
let mut chunk = [0u8; 256];
loop {
// Parse as many complete, stacked PROXY headers as the buffer allows.
let need_more = loop {
if buf.is_empty() {
break true;
}
let step = match ppp::HeaderResult::parse(&buf) {
ppp::HeaderResult::V2(Ok(header)) => {
Step::Parsed(header.len(), proxy_v2_source(&header.addresses))
}
ppp::HeaderResult::V1(Ok(header)) => {
Step::Parsed(header.header.len(), proxy_v1_source(&header.addresses))
}
other => {
if other.is_incomplete() {
Step::NeedMore
} else {
Step::Done
}
}
};
match step {
Step::Parsed(consumed, src) => {
saw_proxy = true;
if let Some(src) = src {
addrs.push(src);
}
if consumed == 0 || consumed > buf.len() {
// Defensive: never spin forever on a degenerate parse.
break false;
}
buf.drain(..consumed);
}
Step::NeedMore => break true,
Step::Done => break false,
}
};
if !need_more {
break;
}
if buf.len() > MAX_PROXY_HEADER_SIZE {
bail!(
"PROXY protocol header too large (exceeds {} bytes)",
MAX_PROXY_HEADER_SIZE
);
}
let n = stream
.read(&mut chunk)
.chain_err(|| "failed to read PROXY protocol header")?;
if n == 0 {
// EOF before another complete header; stop with what we have.
break;
}
buf.extend_from_slice(&chunk[..n]);
}
let result = if saw_proxy { Some(addrs) } else { None };
Ok((result, buf))
}
fn handle_requests(
stream: ConnectionStream,
mut reader: BufReader<ConnectionStream>,
tx: crossbeam_channel::Sender<Message>,
max_line_size: usize,
) -> Result<()> {
let mut stream = stream;
// Consume any PROXY-protocol (HAProxy) headers at the very start of the
// connection before treating the stream as Electrum requests. We always
// consume them — even when HAProxy support is disabled
// (`electrum-haproxy-depth = 0`) — so that PROXY headers sent by an
// accidentally-misconfigured upstream are stripped instead of corrupting
// the Electrum request parser.
//
// Crucially, `read_proxy_headers` only ever buffers bytes it has already
// read from the socket: when no PROXY header is present it returns those
// bytes as `leftover` so the start of the first Electrum request is
// preserved rather than discarded.
//
// The parsed addresses are forwarded over the channel; whether they are
// actually used to identify the client is decided later based on the
// configured `electrum-haproxy-depth` (a depth of 0 ignores them).
let (proxy_addrs, leftover) = Connection::read_proxy_headers(&mut stream)?;
tx.send(Message::Proxy(proxy_addrs))
.chain_err(|| "channel closed")?;
let mut reader = BufReader::new(Cursor::new(leftover).chain(stream));
loop {
let mut line = Vec::<u8>::new();
// Read up to max_line_size + 1 bytes to detect oversized lines
@ -930,7 +711,7 @@ impl Connection {
pub fn run(mut self) {
self.stats.clients.inc();
let stream = self.stream.try_clone().expect("failed to clone TcpStream");
let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
let tx = self.chan.sender();
let die_please = self.die_please.take().unwrap();
@ -950,12 +731,12 @@ impl Connection {
let max_line_size = self.max_line_size;
let child = spawn_thread("reader", move || {
Connection::handle_requests(stream, tx, max_line_size)
Connection::handle_requests(reader, tx, max_line_size)
});
if let Err(e) = self.handle_replies(reply_receiver) {
error!(
"[{}] connection handling failed: {}",
self.client_string(),
self.stream.addr_string(),
e.display_chain().to_string()
);
}
@ -963,9 +744,8 @@ impl Connection {
self.stats
.subscriptions
.sub(self.status_hashes.len() as i64);
self.unregister_client();
let addr = self.client_string();
let addr = self.stream.addr_string();
debug!("[{}] shutting down connection", addr);
// Drop the Arc so that the stream properly closes.
drop(arc_stream);
@ -1023,11 +803,6 @@ pub enum Message {
Request(String),
PeriodicUpdate,
Done,
/// The result of parsing zero or more PROXY-protocol (HAProxy) headers at
/// the start of the connection. `None` means no PROXY header was present;
/// `Some(addrs)` holds the source address reported by each proxy layer,
/// outermost first.
Proxy(Option<Vec<SocketAddr>>),
}
pub enum Notification {
@ -1116,10 +891,11 @@ impl RPC {
let notification = Channel::unbounded();
let server_features = {
// Discovery is enabled when electrum-public-hosts is set
#[cfg(feature = "electrum-discovery")]
let discovery = config.electrum_public_hosts.clone().map(|hosts| {
use crate::chain::genesis_hash;
let hosts = config.electrum_public_hosts.clone().unwrap_or_default();
Arc::new(ServerFeatures {
let features = ServerFeatures {
hosts,
server_version: VERSION_STRING.clone(),
genesis_hash: genesis_hash(config.network_type),
@ -1127,15 +903,10 @@ impl RPC {
protocol_max: PROTOCOL_VERSION,
hash_function: "sha256".into(),
pruning: None,
})
};
// Discovery is enabled when electrum-public-hosts is set
#[cfg(feature = "electrum-discovery")]
let discovery = config.electrum_public_hosts.as_ref().map(|_hosts| {
};
let discovery = Arc::new(DiscoveryManager::new(
config.network_type,
server_features.as_ref().clone(),
features,
PROTOCOL_VERSION,
config.electrum_announce,
config.tor_proxy,
@ -1149,18 +920,12 @@ impl RPC {
let max_subscriptions = config.electrum_max_subscriptions;
let max_clients = config.electrum_max_clients;
let idle_timeout = config.electrum_idle_timeout;
let haproxy_depth = config.electrum_haproxy_depth;
let connections_per_client = config.electrum_connections_per_client;
RPC {
notification: notification.sender(),
server: Some(spawn_thread("rpc", move || {
let senders =
Arc::new(Mutex::new(Vec::<crossbeam_channel::Sender<Message>>::new()));
// Tracks the number of live connections per client (keyed by the
// HAProxy-reported address when available, otherwise the peer IP).
let client_counts: Arc<Mutex<HashMap<IpAddr, usize>>> =
Arc::new(Mutex::new(HashMap::new()));
let acceptor_shutdown = Channel::unbounded();
let acceptor_shutdown_sender = acceptor_shutdown.sender();
@ -1204,7 +969,6 @@ impl RPC {
let query = Arc::clone(&query);
let senders = Arc::clone(&senders);
let stats = Arc::clone(&stats);
let client_counts = Arc::clone(&client_counts);
let garbage_sender = garbage_sender.clone();
// Kill the peers properly
@ -1213,7 +977,6 @@ impl RPC {
#[cfg(feature = "electrum-discovery")]
let discovery = discovery.clone();
let server_features = Arc::clone(&server_features);
let spawned = spawn_thread("peer", move || {
let addr = stream.addr_string();
@ -1227,10 +990,6 @@ impl RPC {
max_subscriptions,
idle_timeout,
peace_receiver,
server_features,
haproxy_depth,
connections_per_client,
client_counts,
#[cfg(feature = "electrum-discovery")]
discovery,
);
@ -1404,15 +1163,6 @@ impl ConnectionStream {
}
}
/// The direct peer IP address, if this is a TCP connection. Unix-socket
/// connections have no IP and return `None`.
fn direct_ip(&self) -> Option<IpAddr> {
match self {
ConnectionStream::Tcp(_, a) => Some(a.ip()),
ConnectionStream::Unix(..) => None,
}
}
fn try_clone(&self) -> std::io::Result<Self> {
Ok(match self {
ConnectionStream::Tcp(s, a) => ConnectionStream::Tcp(s.try_clone()?, *a),

View File

@ -290,7 +290,12 @@ fn parse_blocks(blob: Vec<u8>, magic: u32) -> Result<Vec<SizedBlock>> {
cursor.set_position(end);
}
Ok(super::THREAD_POOL.install(|| {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(0) // CPU-bound
.thread_name(|i| format!("parse-blocks-{}", i))
.build()
.unwrap();
Ok(pool.install(|| {
slices
.into_par_iter()
.map(|(slice, size)| (deserialize(slice).expect("failed to parse Block"), size))

View File

@ -5,16 +5,6 @@ pub mod precache;
mod query;
pub mod schema;
use std::sync::LazyLock;
pub(crate) static THREAD_POOL: LazyLock<rayon::ThreadPool> = LazyLock::new(|| {
rayon::ThreadPoolBuilder::new()
.num_threads(0) // 0 = use number of logical CPUs
.thread_name(|i| format!("electrs-worker-{}", i))
.build()
.expect("failed to create global rayon thread pool")
});
pub use self::db::{DBRow, DB};
pub use self::fetch::{BlockEntry, FetchFrom};
pub use self::mempool::Mempool;

View File

@ -1447,7 +1447,24 @@ fn lookup_txos(
outpoints: &BTreeSet<OutPoint>,
allow_missing: bool,
) -> HashMap<OutPoint, TxOut> {
super::THREAD_POOL.install(|| {
let mut loop_count = 10;
let pool = loop {
match rayon::ThreadPoolBuilder::new()
.num_threads(16) // we need to saturate SSD IOPS
.thread_name(|i| format!("lookup-txo-{}", i))
.build()
{
Ok(pool) => break pool,
Err(e) => {
if loop_count == 0 {
panic!("schema::lookup_txos failed to create a ThreadPool: {}", e);
}
std::thread::sleep(std::time::Duration::from_millis(50));
loop_count -= 1;
}
}
};
pool.install(|| {
// Should match lookup_txos_sequential
outpoints
.par_iter()

View File

@ -1806,10 +1806,6 @@ fn handle_request(
json_response(recent, TTL_MEMPOOL_RECENT)
}
(&Method::GET, Some(&"fee-estimates"), None, None, None, None) => {
json_response(query.estimate_fee_map(), TTL_SHORT)
}
#[cfg(feature = "liquid")]
(&Method::GET, Some(&"assets"), Some(&"registry"), Some(&"search"), None, None) => {
let search = query_params.get("q").map(|q| q.trim()).unwrap_or("");

6
start
View File

@ -5,7 +5,6 @@ DAEMON=bitcoin
NETWORK=mainnet
FEATURES=default
DB_FOLDER=/electrs
ASSET_DB_ARGS=()
NODENAME=$(hostname|cut -d . -f1)
LOCATION=$(hostname|cut -d . -f2)
USAGE="Usage: $0 (mainnet|testnet|signet|liquid|liquidtestnet) [popular-scripts]"
@ -65,7 +64,6 @@ case "${1}" in
DAEMON=elements
NETWORK=liquid
FEATURES=liquid
ASSET_DB_ARGS=(--asset-db-path /elements/asset_registry_db)
THREADS=$((NPROC / 8))
CRONJOB_TIMING="12 4 * * *"
;;
@ -73,7 +71,6 @@ case "${1}" in
DAEMON=elements
NETWORK=liquidtestnet
FEATURES=liquid
ASSET_DB_ARGS=(--asset-db-path /elements/asset_registry_testnet_db)
THREADS=$((NPROC / 8))
CRONJOB_TIMING="17 4 * * *"
;;
@ -158,7 +155,7 @@ do
ELECTRUM_TXS_LIMIT=500
ELECTRUM_MAX_LINE_SIZE=1048576 # 1 MiB
ELECTRUM_MAX_SUBSCRIPTIONS=100
ELECTRUM_MAX_CLIENTS=1000
ELECTRUM_MAX_CLIENTS=10
MAIN_LOOP_DELAY=500
DAEMON_CONF="${HOME}/${DAEMON}.conf"
HTTP_SOCKET_FILE="${HOME}/socket/esplora-${DAEMON}-${NETWORK}"
@ -255,7 +252,6 @@ do
--network "${NETWORK}" \
--daemon-dir "${HOME}" \
--db-dir "${DB_FOLDER}" \
"${ASSET_DB_ARGS[@]}" \
--main-loop-delay "${MAIN_LOOP_DELAY}" \
--rpc-socket-file "${RPC_SOCKET_FILE}" \
--http-socket-file "${HTTP_SOCKET_FILE}" \