Merge branch 'mempool' into junderw/cap-history-fix

This commit is contained in:
mononaut 2026-05-22 16:23:00 +09:00 committed by GitHub
commit af82c5e2d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 210 additions and 28 deletions

View File

@ -65,6 +65,10 @@ pub struct Config {
pub rest_default_max_address_summary_txs: usize,
pub rest_max_mempool_page_size: usize,
pub rest_max_mempool_txid_page_size: usize,
pub electrum_max_line_size: usize,
pub electrum_max_subscriptions: usize,
pub electrum_max_clients: usize,
pub electrum_idle_timeout: u64,
#[cfg(feature = "liquid")]
pub parent_network: BNetwork,
@ -278,6 +282,26 @@ impl Config {
.long("electrum-banner")
.help("Welcome banner for the Electrum server, shown in the console to clients.")
.takes_value(true)
).arg(
Arg::with_name("electrum_max_line_size")
.long("electrum-max-line-size")
.help("Maximum size of a single Electrum request line in bytes (default: 1 MiB).")
.default_value("1048576")
).arg(
Arg::with_name("electrum_max_subscriptions")
.long("electrum-max-subscriptions")
.help("Maximum number of scripthash subscriptions per client connection.")
.default_value("100")
).arg(
Arg::with_name("electrum_max_clients")
.long("electrum-max-clients")
.help("Maximum number of concurrent Electrum client connections.")
.default_value("10")
).arg(
Arg::with_name("electrum_idle_timeout")
.long("electrum-idle-timeout")
.help("Maximum idle time in seconds since the last client request before disconnecting the Electrum connection.")
.default_value("600")
);
#[cfg(unix)]
@ -547,6 +571,10 @@ impl Config {
"rest_max_mempool_txid_page_size",
usize
),
electrum_max_line_size: value_t_or_exit!(m, "electrum_max_line_size", usize),
electrum_max_subscriptions: value_t_or_exit!(m, "electrum_max_subscriptions", usize),
electrum_max_clients: value_t_or_exit!(m, "electrum_max_clients", usize),
electrum_idle_timeout: value_t_or_exit!(m, "electrum_idle_timeout", u64),
jsonrpc_import: m.is_present("jsonrpc_import"),
light_mode: m.is_present("light_mode"),
main_loop_delay: value_t_or_exit!(m, "main_loop_delay", u64),

View File

@ -12,6 +12,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use bitcoin::hashes::sha256d::Hash as Sha256dHash;
use error_chain::ChainedError;
@ -122,17 +123,25 @@ struct Connection {
chan: SyncChannel<Message>,
stats: Arc<Stats>,
txs_limit: usize,
max_line_size: usize,
max_subscriptions: usize,
idle_timeout: u64,
last_request_at: Instant,
die_please: Option<Receiver<()>>,
#[cfg(feature = "electrum-discovery")]
discovery: Option<Arc<DiscoveryManager>>,
}
impl Connection {
#[allow(clippy::too_many_arguments)]
pub fn new(
query: Arc<Query>,
stream: ConnectionStream,
stats: Arc<Stats>,
txs_limit: usize,
max_line_size: usize,
max_subscriptions: usize,
idle_timeout: u64,
die_please: Receiver<()>,
#[cfg(feature = "electrum-discovery")] discovery: Option<Arc<DiscoveryManager>>,
) -> Connection {
@ -144,6 +153,10 @@ impl Connection {
chan: SyncChannel::new(10),
stats,
txs_limit,
max_line_size,
max_subscriptions,
idle_timeout,
last_request_at: Instant::now(),
die_please: Some(die_please),
#[cfg(feature = "electrum-discovery")]
discovery,
@ -294,6 +307,16 @@ impl Connection {
fn blockchain_scripthash_subscribe(&mut self, params: &[Value]) -> Result<Value> {
let script_hash = hash_from_value(params.first()).chain_err(|| "bad script_hash")?;
// Enforce per-client subscription limit (don't count re-subscriptions to the same hash)
if !self.status_hashes.contains_key(&script_hash)
&& self.status_hashes.len() >= self.max_subscriptions
{
bail!(
"subscription limit reached ({} max per client)",
self.max_subscriptions
);
}
let history_txids = get_history(&self.query, &script_hash[..], self.txs_limit)?;
let status_hash = get_status_hash(history_txids, &self.query)
.map_or(Value::Null, |h| json!(hex::encode(full_hash(&h[..]))));
@ -545,14 +568,34 @@ impl Connection {
Ok(())
}
fn close_idle_connection(&mut self, idle_for: Duration) {
info!(
"[{}] closing idle connection after {} seconds without requests (timeout: {} seconds)",
self.stream.addr_string(),
idle_for.as_secs(),
self.idle_timeout,
);
self.chan.close();
}
fn handle_replies(&mut self, shutdown: crossbeam_channel::Receiver<()>) -> Result<()> {
let idle_timeout = Duration::from_secs(self.idle_timeout);
loop {
let elapsed = self.last_request_at.elapsed();
if elapsed > idle_timeout {
self.close_idle_connection(elapsed);
return Ok(());
}
let remaining = idle_timeout.saturating_sub(elapsed);
let idle_deadline = crossbeam_channel::after(remaining);
crossbeam_channel::select! {
recv(self.chan.receiver()) -> msg => {
let msg = msg.chain_err(|| "channel closed")?;
trace!("RPC {:?}", msg);
match msg {
Message::Request(line) => {
self.last_request_at = Instant::now();
let result = self.handle_line(&line);
self.send_values(&[result])?
}
@ -572,6 +615,11 @@ impl Connection {
self.chan.close();
return Ok(());
}
recv(idle_deadline) -> _ => {
let idle_for = self.last_request_at.elapsed();
self.close_idle_connection(idle_for);
return Ok(());
}
}
}
}
@ -623,15 +671,25 @@ impl Connection {
fn handle_requests(
mut reader: BufReader<ConnectionStream>,
tx: crossbeam_channel::Sender<Message>,
max_line_size: usize,
) -> Result<()> {
loop {
let mut line = Vec::<u8>::new();
reader
// Read up to max_line_size + 1 bytes to detect oversized lines
let mut limited = (&mut reader).take((max_line_size as u64).saturating_add(1));
limited
.read_until(b'\n', &mut line)
.chain_err(|| "failed to read a request")?;
if line.is_empty() {
tx.send(Message::Done).chain_err(|| "channel closed")?;
return Ok(());
} else if line.len() > max_line_size {
let _ = tx.send(Message::Done);
bail!(
"request line too large ({} bytes, max is {})",
line.len(),
max_line_size
)
} else {
if line.starts_with(&[22, 3, 1]) {
// (very) naive SSL handshake detection
@ -671,7 +729,10 @@ impl Connection {
let _ = reply_killer.send(());
});
let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
let max_line_size = self.max_line_size;
let child = spawn_thread("reader", move || {
Connection::handle_requests(reader, tx, max_line_size)
});
if let Err(e) = self.handle_replies(reply_receiver) {
error!(
"[{}] connection handling failed: {}",
@ -855,6 +916,10 @@ impl RPC {
});
let txs_limit = config.electrum_txs_limit;
let max_line_size = config.electrum_max_line_size;
let max_subscriptions = config.electrum_max_subscriptions;
let max_clients = config.electrum_max_clients;
let idle_timeout = config.electrum_idle_timeout;
RPC {
notification: notification.sender(),
@ -872,10 +937,33 @@ impl RPC {
acceptor_shutdown_sender,
);
let mut threads = HashMap::new();
let mut threads: HashMap<thread::ThreadId, (thread::JoinHandle<()>, Sender<()>)> =
HashMap::new();
let (garbage_sender, garbage_receiver) = crossbeam_channel::unbounded();
while let Some(stream) = acceptor.receiver().recv().unwrap() {
// Clean up finished threads before checking connection limit
while let Ok(id) = garbage_receiver.try_recv() {
if let Some((thread, killer)) = threads.remove(&id) {
let _ = killer.send(());
if let Err(error) = thread.join() {
error!("failed to join {:?}: {:?}", id, error);
}
}
}
// Enforce maximum connection limit
if threads.len() >= max_clients {
warn!(
"[{}] rejecting connection: max clients reached ({}/{})",
stream.addr_string(),
threads.len(),
max_clients
);
let _ = stream.shutdown(Shutdown::Both);
continue;
}
let addr = stream.addr_string();
// explicitely scope the shadowed variables for the new thread
let query = Arc::clone(&query);
@ -898,6 +986,9 @@ impl RPC {
stream,
stats,
txs_limit,
max_line_size,
max_subscriptions,
idle_timeout,
peace_receiver,
#[cfg(feature = "electrum-discovery")]
discovery,
@ -911,15 +1002,6 @@ impl RPC {
trace!("[{}] spawned {:?}", addr, spawned.thread().id());
threads.insert(spawned.thread().id(), (spawned, killer));
while let Ok(id) = garbage_receiver.try_recv() {
if let Some((thread, killer)) = threads.remove(&id) {
trace!("[{}] joining {:?}", addr, id);
let _ = killer.send(());
if let Err(error) = thread.join() {
error!("failed to join {:?}: {:?}", id, error);
}
}
}
}
// Drop these
drop(acceptor);

View File

@ -583,6 +583,19 @@ fn find_txid(
}
}
#[inline]
fn confirmed_after_txid<'a>(
after_txid_location: &TxidLocation,
after_txid: Option<&'a Txid>,
) -> Option<&'a Txid> {
match after_txid_location {
// A mempool cursor never exists in chain history, so always
// start from the newest confirmed tx when crossing the boundary.
TxidLocation::Mempool | TxidLocation::None => None,
TxidLocation::Chain(_) => after_txid,
}
}
/// Prepare transactions to be serialized in a JSON response
///
/// Any transactions with missing prevouts will be filtered out of the response, rather than returned with incorrect data.
@ -960,13 +973,8 @@ fn handle_request(
};
if txs.len() < max_txs {
let after_txid_ref = if !txs.is_empty() {
// If there are any txs, we know mempool found the
// after_txid IF it exists... so always return None.
None
} else {
after_txid.as_ref()
};
let after_txid_ref =
confirmed_after_txid(&after_txid_location, after_txid.as_ref());
let mut confirmed_txs = query
.chain()
.history(
@ -1067,13 +1075,8 @@ fn handle_request(
};
if txs.len() < max_txs {
let after_txid_ref = if !txs.is_empty() {
// If there are any txs, we know mempool found the
// after_txid IF it exists... so always return None.
None
} else {
after_txid.as_ref()
};
let after_txid_ref =
confirmed_after_txid(&after_txid_location, after_txid.as_ref());
let mut confirmed_txs = query
.chain()
.history_group(
@ -2148,6 +2151,8 @@ impl From<address::AddressError> for HttpError {
#[cfg(test)]
mod tests {
use super::{confirmed_after_txid, TxidLocation};
use crate::chain::Txid;
use crate::rest::HttpError;
use serde_json::Value;
use std::collections::HashMap;
@ -2214,6 +2219,37 @@ mod tests {
assert!(err.is_err());
}
#[test]
fn test_confirmed_after_txid_uses_chain_cursor_only() {
let txid: Txid = "0000000000000000000000000000000000000000000000000000000000000001"
.parse()
.unwrap();
assert_eq!(
confirmed_after_txid(&TxidLocation::Mempool, Some(&txid)),
None
);
assert_eq!(confirmed_after_txid(&TxidLocation::None, Some(&txid)), None);
assert_eq!(
confirmed_after_txid(&TxidLocation::Chain(123), Some(&txid)),
Some(&txid)
);
}
#[test]
fn test_confirmed_after_txid_allows_mempool_chain_boundary_progress() {
let txid: Txid = "0000000000000000000000000000000000000000000000000000000000000002"
.parse()
.unwrap();
// If a mempool cursor returns no newer mempool txs, confirmed history
// must start from the newest confirmed tx instead of seeking this txid.
assert_eq!(
confirmed_after_txid(&TxidLocation::Mempool, Some(&txid)),
None
);
}
#[test]
fn test_difficulty_new() {
use super::difficulty_new;

40
start
View File

@ -42,7 +42,7 @@ esac
# which network?
case "${1}" in
mainnet)
THREADS=$((NPROC / 4))
THREADS=$((NPROC / 8))
CRONJOB_TIMING="20 4 * * *"
;;
testnet)
@ -153,6 +153,9 @@ do
# prepare run-time variables
UTXOS_LIMIT=500
ELECTRUM_TXS_LIMIT=500
ELECTRUM_MAX_LINE_SIZE=1048576 # 1 MiB
ELECTRUM_MAX_SUBSCRIPTIONS=100
ELECTRUM_MAX_CLIENTS=10
MAIN_LOOP_DELAY=500
DAEMON_CONF="${HOME}/${DAEMON}.conf"
HTTP_SOCKET_FILE="${HOME}/socket/esplora-${DAEMON}-${NETWORK}"
@ -167,43 +170,73 @@ do
if [ "${NODENAME}" = "node201" ];then
UTXOS_LIMIT=9000
ELECTRUM_TXS_LIMIT=9000
ELECTRUM_MAX_LINE_SIZE=16777216 # 16 MiB
ELECTRUM_MAX_SUBSCRIPTIONS=100000
ELECTRUM_MAX_CLIENTS=10000
MAIN_LOOP_DELAY=14000
fi
if [ "${NODENAME}" = "node204" ] && [ "${LOCATION}" = "sg1" ];then
UTXOS_LIMIT=9000
ELECTRUM_TXS_LIMIT=9000
ELECTRUM_MAX_LINE_SIZE=16777216 # 16 MiB
ELECTRUM_MAX_SUBSCRIPTIONS=100000
ELECTRUM_MAX_CLIENTS=10000
fi
if [ "${NODENAME}" = "node204" ] && [ "${LOCATION}" = "hnl" ];then
UTXOS_LIMIT=9000
ELECTRUM_TXS_LIMIT=9000
ELECTRUM_MAX_LINE_SIZE=16777216 # 16 MiB
ELECTRUM_MAX_SUBSCRIPTIONS=100000
ELECTRUM_MAX_CLIENTS=10000
fi
if [ "${NODENAME}" = "node206" ] && [ "${LOCATION}" = "tk7" ];then
UTXOS_LIMIT=9000
ELECTRUM_TXS_LIMIT=9000
ELECTRUM_MAX_LINE_SIZE=16777216 # 16 MiB
ELECTRUM_MAX_SUBSCRIPTIONS=100000
ELECTRUM_MAX_CLIENTS=10000
fi
if [ "${NODENAME}" = "node211" ];then
UTXOS_LIMIT=9000
ELECTRUM_TXS_LIMIT=9000
ELECTRUM_MAX_LINE_SIZE=16777216 # 16 MiB
ELECTRUM_MAX_SUBSCRIPTIONS=100000
ELECTRUM_MAX_CLIENTS=10000
fi
if [ "${NODENAME}" = "node212" ];then
UTXOS_LIMIT=9000
ELECTRUM_TXS_LIMIT=9000
ELECTRUM_MAX_LINE_SIZE=16777216 # 16 MiB
ELECTRUM_MAX_SUBSCRIPTIONS=100000
ELECTRUM_MAX_CLIENTS=10000
fi
if [ "${NODENAME}" = "node213" ];then
UTXOS_LIMIT=9000
ELECTRUM_TXS_LIMIT=9000
ELECTRUM_MAX_LINE_SIZE=16777216 # 16 MiB
ELECTRUM_MAX_SUBSCRIPTIONS=100000
ELECTRUM_MAX_CLIENTS=10000
fi
if [ "${NODENAME}" = "node214" ];then
UTXOS_LIMIT=9000
ELECTRUM_TXS_LIMIT=9000
ELECTRUM_MAX_LINE_SIZE=16777216 # 16 MiB
ELECTRUM_MAX_SUBSCRIPTIONS=100000
ELECTRUM_MAX_CLIENTS=10000
fi
if [ "${NETWORK}" = "testnet4" ];then
UTXOS_LIMIT=9000
ELECTRUM_TXS_LIMIT=9000
ELECTRUM_MAX_LINE_SIZE=16777216 # 16 MiB
ELECTRUM_MAX_SUBSCRIPTIONS=100000
ELECTRUM_MAX_CLIENTS=10000
fi
if [ "${LOCATION}" = "fmt" ];then
UTXOS_LIMIT=9000
ELECTRUM_TXS_LIMIT=9000
ELECTRUM_MAX_LINE_SIZE=16777216 # 16 MiB
ELECTRUM_MAX_SUBSCRIPTIONS=100000
ELECTRUM_MAX_CLIENTS=10000
fi
if [ ! -e "${POPULAR_SCRIPTS_FILE}" ];then
@ -211,7 +244,7 @@ do
fi
# Run the electrs process (Note: db-dir is used in both commands)
cargo run \
nice cargo run \
--release \
--bin electrs \
--features "${FEATURES}" \
@ -229,6 +262,9 @@ do
--address-search \
--utxos-limit "${UTXOS_LIMIT}" \
--electrum-txs-limit "${ELECTRUM_TXS_LIMIT}" \
--electrum-max-line-size "${ELECTRUM_MAX_LINE_SIZE}" \
--electrum-max-subscriptions "${ELECTRUM_MAX_SUBSCRIPTIONS}" \
--electrum-max-clients "${ELECTRUM_MAX_CLIENTS}" \
-vv
sleep 1
done