Compare commits
3 Commits
mempool
...
junderw/ca
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
af82c5e2d7 | ||
|
|
3bd068ae51 | ||
|
|
1e09101c5e |
19
AGENTS.md
19
AGENTS.md
@ -1,19 +0,0 @@
|
||||
# electrs
|
||||
|
||||
## Rules
|
||||
|
||||
1. You are an expert Rust developer.
|
||||
2. You are an expert Bitcoin developer.
|
||||
3. If you are unsure of a change, ask the developer to make a choice proactively.
|
||||
|
||||
## Before testing
|
||||
|
||||
- Run cargo fmt (from root)
|
||||
- command: `cargo fmt`
|
||||
|
||||
## Testing
|
||||
|
||||
- Run the checks script
|
||||
- `./scripts/checks.sh`
|
||||
- Run with tests only when a test is added or changed
|
||||
- `INCLUDE_TESTS=1 ./scripts/checks.sh`
|
||||
10
Cargo.lock
generated
10
Cargo.lock
generated
@ -913,7 +913,6 @@ dependencies = [
|
||||
"log",
|
||||
"num_cpus",
|
||||
"page_size",
|
||||
"ppp",
|
||||
"prometheus",
|
||||
"rayon",
|
||||
"rocksdb",
|
||||
@ -1100,15 +1099,6 @@ version = "0.3.27"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964"
|
||||
|
||||
[[package]]
|
||||
name = "ppp"
|
||||
version = "2.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1a7a2049cd2570bd67bf0228e86bf850f8ceb5190a345c471d03a909da6049e0"
|
||||
dependencies = [
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ppv-lite86"
|
||||
version = "0.2.21"
|
||||
|
||||
@ -45,7 +45,6 @@ socket2 = { version = "0.4", features = ["all"] }
|
||||
num_cpus = "1.12.0"
|
||||
page_size = "0.4.2"
|
||||
prometheus = "0.13"
|
||||
ppp = "2.3.0"
|
||||
rayon = "1.5.0"
|
||||
rocksdb = "0.24.0"
|
||||
serde = "1.0.118"
|
||||
|
||||
@ -57,8 +57,6 @@ TESTNAME="Running cargo clippy check electrum-discovery + liquid"
|
||||
echo "$TESTNAME"
|
||||
cargo clippy $@ -q -F electrum-discovery,liquid
|
||||
|
||||
if [ $INCLUDE_TESTS ]; then
|
||||
TESTNAME="Running cargo test with all features"
|
||||
echo "$TESTNAME"
|
||||
cargo test $@ -q --lib --all-features
|
||||
fi
|
||||
TESTNAME="Running cargo test with all features"
|
||||
echo "$TESTNAME"
|
||||
cargo test $@ -q --lib --all-features
|
||||
|
||||
@ -69,15 +69,14 @@ pub struct Config {
|
||||
pub electrum_max_subscriptions: usize,
|
||||
pub electrum_max_clients: usize,
|
||||
pub electrum_idle_timeout: u64,
|
||||
pub electrum_haproxy_depth: usize,
|
||||
pub electrum_connections_per_client: usize,
|
||||
pub electrum_public_hosts: Option<crate::electrum::ServerHosts>,
|
||||
|
||||
#[cfg(feature = "liquid")]
|
||||
pub parent_network: BNetwork,
|
||||
#[cfg(feature = "liquid")]
|
||||
pub asset_db_path: Option<PathBuf>,
|
||||
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
pub electrum_public_hosts: Option<crate::electrum::ServerHosts>,
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
pub electrum_announce: bool,
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
@ -303,16 +302,6 @@ impl Config {
|
||||
.long("electrum-idle-timeout")
|
||||
.help("Maximum idle time in seconds since the last client request before disconnecting the Electrum connection.")
|
||||
.default_value("600")
|
||||
).arg(
|
||||
Arg::with_name("electrum_haproxy_depth")
|
||||
.long("electrum-haproxy-depth")
|
||||
.help("Which HAProxy PROXY-protocol header layer identifies the real client IP. 0 disables PROXY-protocol detection; 1 uses the first (outermost) address, 2 the second, and so on. If the requested layer or any PROXY header is absent, no client IP is associated with the connection.")
|
||||
.default_value("0")
|
||||
).arg(
|
||||
Arg::with_name("electrum_connections_per_client")
|
||||
.long("electrum-connections-per-client")
|
||||
.help("Maximum number of concurrent Electrum connections allowed per client (keyed by the HAProxy-reported address when available, otherwise the peer IP). 0 disables the per-client limit.")
|
||||
.default_value("10")
|
||||
);
|
||||
|
||||
#[cfg(unix)]
|
||||
@ -532,8 +521,6 @@ impl Config {
|
||||
let electrum_public_hosts = m
|
||||
.value_of("electrum_public_hosts")
|
||||
.map(|s| serde_json::from_str(s).expect("invalid --electrum-public-hosts"));
|
||||
#[cfg(not(feature = "electrum-discovery"))]
|
||||
let electrum_public_hosts: Option<crate::electrum::ServerHosts> = None;
|
||||
|
||||
let mut log = stderrlog::new();
|
||||
log.verbosity(m.occurrences_of("verbosity") as usize);
|
||||
@ -588,12 +575,6 @@ impl Config {
|
||||
electrum_max_subscriptions: value_t_or_exit!(m, "electrum_max_subscriptions", usize),
|
||||
electrum_max_clients: value_t_or_exit!(m, "electrum_max_clients", usize),
|
||||
electrum_idle_timeout: value_t_or_exit!(m, "electrum_idle_timeout", u64),
|
||||
electrum_haproxy_depth: value_t_or_exit!(m, "electrum_haproxy_depth", usize),
|
||||
electrum_connections_per_client: value_t_or_exit!(
|
||||
m,
|
||||
"electrum_connections_per_client",
|
||||
usize
|
||||
),
|
||||
jsonrpc_import: m.is_present("jsonrpc_import"),
|
||||
light_mode: m.is_present("light_mode"),
|
||||
main_loop_delay: value_t_or_exit!(m, "main_loop_delay", u64),
|
||||
@ -623,6 +604,7 @@ impl Config {
|
||||
#[cfg(feature = "liquid")]
|
||||
asset_db_path,
|
||||
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
electrum_public_hosts,
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
electrum_announce: m.is_present("electrum_announce"),
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
use std::collections::HashMap;
|
||||
use std::convert::TryInto;
|
||||
use std::fs;
|
||||
use std::io::{BufRead, BufReader, Cursor, Read, Write};
|
||||
use std::io::{BufRead, BufReader, Read, Write};
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
use std::net::IpAddr;
|
||||
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
|
||||
use std::os::unix::fs::FileTypeExt;
|
||||
@ -16,7 +17,6 @@ use std::time::{Duration, Instant};
|
||||
use bitcoin::hashes::sha256d::Hash as Sha256dHash;
|
||||
use error_chain::ChainedError;
|
||||
use hex;
|
||||
use ppp::PartialResult;
|
||||
use serde_json::{from_str, Value};
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
@ -27,7 +27,7 @@ use elements::encode::serialize;
|
||||
|
||||
use crate::chain::Txid;
|
||||
use crate::config::{Config, VERSION_STRING};
|
||||
use crate::electrum::{get_electrum_height, ProtocolVersion, ServerFeatures};
|
||||
use crate::electrum::{get_electrum_height, ProtocolVersion};
|
||||
use crate::errors::*;
|
||||
use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
|
||||
use crate::new_index::{Query, Utxo};
|
||||
@ -41,7 +41,7 @@ const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::new(1, 4);
|
||||
const MAX_HEADERS: usize = 2016;
|
||||
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
use crate::electrum::DiscoveryManager;
|
||||
use crate::electrum::{DiscoveryManager, ServerFeatures};
|
||||
|
||||
// TODO: Sha256dHash should be a generic hash-container (since script hash is single SHA256)
|
||||
fn hash_from_value(val: Option<&Value>) -> Result<Sha256dHash> {
|
||||
@ -77,36 +77,6 @@ fn bool_from_value_or(val: Option<&Value>, name: &str, default: bool) -> Result<
|
||||
bool_from_value(val, name)
|
||||
}
|
||||
|
||||
/// Extracts the source socket address from a parsed PROXY protocol v1 header.
|
||||
fn proxy_v1_source(addresses: &ppp::v1::Addresses) -> Option<SocketAddr> {
|
||||
match addresses {
|
||||
ppp::v1::Addresses::Tcp4(ip) => Some(SocketAddr::new(
|
||||
IpAddr::V4(ip.source_address),
|
||||
ip.source_port,
|
||||
)),
|
||||
ppp::v1::Addresses::Tcp6(ip) => Some(SocketAddr::new(
|
||||
IpAddr::V6(ip.source_address),
|
||||
ip.source_port,
|
||||
)),
|
||||
ppp::v1::Addresses::Unknown => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Extracts the source socket address from a parsed PROXY protocol v2 header.
|
||||
fn proxy_v2_source(addresses: &ppp::v2::Addresses) -> Option<SocketAddr> {
|
||||
match addresses {
|
||||
ppp::v2::Addresses::IPv4(ip) => Some(SocketAddr::new(
|
||||
IpAddr::V4(ip.source_address),
|
||||
ip.source_port,
|
||||
)),
|
||||
ppp::v2::Addresses::IPv6(ip) => Some(SocketAddr::new(
|
||||
IpAddr::V6(ip.source_address),
|
||||
ip.source_port,
|
||||
)),
|
||||
ppp::v2::Addresses::Unspecified | ppp::v2::Addresses::Unix(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: implement caching and delta updates
|
||||
fn get_status_hash(txs: Vec<(Txid, Option<BlockId>)>, query: &Query) -> Option<FullHash> {
|
||||
if txs.is_empty() {
|
||||
@ -158,12 +128,6 @@ struct Connection {
|
||||
idle_timeout: u64,
|
||||
last_request_at: Instant,
|
||||
die_please: Option<Receiver<()>>,
|
||||
server_features: Arc<ServerFeatures>,
|
||||
haproxy_depth: usize,
|
||||
proxy_client: Option<SocketAddr>,
|
||||
connections_per_client: usize,
|
||||
client_counts: Arc<Mutex<HashMap<IpAddr, usize>>>,
|
||||
registered_ip: Option<IpAddr>,
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
discovery: Option<Arc<DiscoveryManager>>,
|
||||
}
|
||||
@ -179,10 +143,6 @@ impl Connection {
|
||||
max_subscriptions: usize,
|
||||
idle_timeout: u64,
|
||||
die_please: Receiver<()>,
|
||||
server_features: Arc<ServerFeatures>,
|
||||
haproxy_depth: usize,
|
||||
connections_per_client: usize,
|
||||
client_counts: Arc<Mutex<HashMap<IpAddr, usize>>>,
|
||||
#[cfg(feature = "electrum-discovery")] discovery: Option<Arc<DiscoveryManager>>,
|
||||
) -> Connection {
|
||||
Connection {
|
||||
@ -198,12 +158,6 @@ impl Connection {
|
||||
idle_timeout,
|
||||
last_request_at: Instant::now(),
|
||||
die_please: Some(die_please),
|
||||
server_features,
|
||||
haproxy_depth,
|
||||
proxy_client: None,
|
||||
connections_per_client,
|
||||
client_counts,
|
||||
registered_ip: None,
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
discovery,
|
||||
}
|
||||
@ -225,8 +179,13 @@ impl Connection {
|
||||
Ok(json!(self.query.config().electrum_banner.clone()))
|
||||
}
|
||||
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
fn server_features(&self) -> Result<Value> {
|
||||
Ok(json!(self.server_features.as_ref()))
|
||||
let discovery = self
|
||||
.discovery
|
||||
.as_ref()
|
||||
.chain_err(|| "discovery is disabled")?;
|
||||
Ok(json!(discovery.our_features()))
|
||||
}
|
||||
|
||||
fn server_donation_address(&self) -> Result<Value> {
|
||||
@ -531,8 +490,9 @@ impl Connection {
|
||||
"server.peers.subscribe" => self.server_peers_subscribe(),
|
||||
"server.ping" => Ok(Value::Null),
|
||||
"server.version" => self.server_version(),
|
||||
"server.features" => self.server_features(),
|
||||
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
"server.features" => self.server_features(),
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
"server.add_peer" => self.server_add_peer(params),
|
||||
|
||||
@ -611,78 +571,13 @@ impl Connection {
|
||||
fn close_idle_connection(&mut self, idle_for: Duration) {
|
||||
info!(
|
||||
"[{}] closing idle connection after {} seconds without requests (timeout: {} seconds)",
|
||||
self.client_string(),
|
||||
self.stream.addr_string(),
|
||||
idle_for.as_secs(),
|
||||
self.idle_timeout,
|
||||
);
|
||||
self.chan.close();
|
||||
}
|
||||
|
||||
/// A human-readable identifier for the connected client, preferring the
|
||||
/// HAProxy-reported address (when present) over the direct peer address.
|
||||
fn client_string(&self) -> String {
|
||||
match self.proxy_client {
|
||||
Some(addr) => format!("{} via {}", addr, self.stream.addr_string()),
|
||||
None => self.stream.addr_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolves the PROXY-protocol parse result into the client address at the
|
||||
/// configured `electrum-haproxy-depth` layer. A depth of 0, a missing PROXY
|
||||
/// header, or a non-existent layer all leave the client unidentified.
|
||||
fn set_proxy_client(&mut self, addresses: Option<Vec<SocketAddr>>) {
|
||||
self.proxy_client = match (self.haproxy_depth, addresses) {
|
||||
(0, _) | (_, None) => None,
|
||||
(depth, Some(addrs)) => addrs.get(depth - 1).copied(),
|
||||
};
|
||||
}
|
||||
|
||||
/// Registers this connection against its client key (the HAProxy-reported IP
|
||||
/// when available, otherwise the direct peer IP) and enforces the
|
||||
/// `electrum-connections-per-client` limit. Returns an error if the limit has
|
||||
/// already been reached, in which case the connection must be closed.
|
||||
fn register_client(&mut self) -> Result<()> {
|
||||
if self.connections_per_client == 0 {
|
||||
// Per-client limit disabled.
|
||||
return Ok(());
|
||||
}
|
||||
let key = match self
|
||||
.proxy_client
|
||||
.map(|addr| addr.ip())
|
||||
.or_else(|| self.stream.direct_ip())
|
||||
{
|
||||
Some(key) => key,
|
||||
// No usable client key (e.g. a unix socket with no PROXY header).
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
let mut counts = self.client_counts.lock().unwrap();
|
||||
let count = counts.entry(key).or_insert(0);
|
||||
if *count >= self.connections_per_client {
|
||||
bail!(
|
||||
"too many connections from client {} ({} max per client)",
|
||||
key,
|
||||
self.connections_per_client
|
||||
);
|
||||
}
|
||||
*count += 1;
|
||||
self.registered_ip = Some(key);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Releases this connection's slot in the per-client connection counter.
|
||||
fn unregister_client(&mut self) {
|
||||
if let Some(key) = self.registered_ip.take() {
|
||||
let mut counts = self.client_counts.lock().unwrap();
|
||||
if let Some(count) = counts.get_mut(&key) {
|
||||
*count -= 1;
|
||||
if *count == 0 {
|
||||
counts.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_replies(&mut self, shutdown: crossbeam_channel::Receiver<()>) -> Result<()> {
|
||||
let idle_timeout = Duration::from_secs(self.idle_timeout);
|
||||
loop {
|
||||
@ -714,14 +609,6 @@ impl Connection {
|
||||
self.chan.close();
|
||||
return Ok(());
|
||||
}
|
||||
Message::Proxy(addresses) => {
|
||||
self.set_proxy_client(addresses);
|
||||
if let Err(e) = self.register_client() {
|
||||
info!("[{}] {}", self.client_string(), e);
|
||||
self.chan.close();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
recv(shutdown) -> _ => {
|
||||
@ -781,117 +668,11 @@ impl Connection {
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads and parses any PROXY-protocol (HAProxy) headers found at the very
|
||||
/// start of the connection. Returns the source address reported by each
|
||||
/// proxy layer (outermost first), or `None` if no PROXY header was present,
|
||||
/// together with any bytes that were read past the header(s) and belong to
|
||||
/// the Electrum request stream.
|
||||
fn read_proxy_headers(
|
||||
stream: &mut ConnectionStream,
|
||||
) -> Result<(Option<Vec<SocketAddr>>, Vec<u8>)> {
|
||||
// Upper bound on how much we are willing to buffer while looking for
|
||||
// PROXY headers, to avoid unbounded memory use from a slow/malicious peer.
|
||||
const MAX_PROXY_HEADER_SIZE: usize = 4096;
|
||||
|
||||
enum Step {
|
||||
Parsed(usize, Option<SocketAddr>),
|
||||
NeedMore,
|
||||
Done,
|
||||
}
|
||||
|
||||
let mut buf: Vec<u8> = Vec::with_capacity(256);
|
||||
let mut addrs: Vec<SocketAddr> = Vec::new();
|
||||
let mut saw_proxy = false;
|
||||
let mut chunk = [0u8; 256];
|
||||
|
||||
loop {
|
||||
// Parse as many complete, stacked PROXY headers as the buffer allows.
|
||||
let need_more = loop {
|
||||
if buf.is_empty() {
|
||||
break true;
|
||||
}
|
||||
let step = match ppp::HeaderResult::parse(&buf) {
|
||||
ppp::HeaderResult::V2(Ok(header)) => {
|
||||
Step::Parsed(header.len(), proxy_v2_source(&header.addresses))
|
||||
}
|
||||
ppp::HeaderResult::V1(Ok(header)) => {
|
||||
Step::Parsed(header.header.len(), proxy_v1_source(&header.addresses))
|
||||
}
|
||||
other => {
|
||||
if other.is_incomplete() {
|
||||
Step::NeedMore
|
||||
} else {
|
||||
Step::Done
|
||||
}
|
||||
}
|
||||
};
|
||||
match step {
|
||||
Step::Parsed(consumed, src) => {
|
||||
saw_proxy = true;
|
||||
if let Some(src) = src {
|
||||
addrs.push(src);
|
||||
}
|
||||
if consumed == 0 || consumed > buf.len() {
|
||||
// Defensive: never spin forever on a degenerate parse.
|
||||
break false;
|
||||
}
|
||||
buf.drain(..consumed);
|
||||
}
|
||||
Step::NeedMore => break true,
|
||||
Step::Done => break false,
|
||||
}
|
||||
};
|
||||
|
||||
if !need_more {
|
||||
break;
|
||||
}
|
||||
if buf.len() > MAX_PROXY_HEADER_SIZE {
|
||||
bail!(
|
||||
"PROXY protocol header too large (exceeds {} bytes)",
|
||||
MAX_PROXY_HEADER_SIZE
|
||||
);
|
||||
}
|
||||
let n = stream
|
||||
.read(&mut chunk)
|
||||
.chain_err(|| "failed to read PROXY protocol header")?;
|
||||
if n == 0 {
|
||||
// EOF before another complete header; stop with what we have.
|
||||
break;
|
||||
}
|
||||
buf.extend_from_slice(&chunk[..n]);
|
||||
}
|
||||
|
||||
let result = if saw_proxy { Some(addrs) } else { None };
|
||||
Ok((result, buf))
|
||||
}
|
||||
|
||||
fn handle_requests(
|
||||
stream: ConnectionStream,
|
||||
mut reader: BufReader<ConnectionStream>,
|
||||
tx: crossbeam_channel::Sender<Message>,
|
||||
max_line_size: usize,
|
||||
) -> Result<()> {
|
||||
let mut stream = stream;
|
||||
|
||||
// Consume any PROXY-protocol (HAProxy) headers at the very start of the
|
||||
// connection before treating the stream as Electrum requests. We always
|
||||
// consume them — even when HAProxy support is disabled
|
||||
// (`electrum-haproxy-depth = 0`) — so that PROXY headers sent by an
|
||||
// accidentally-misconfigured upstream are stripped instead of corrupting
|
||||
// the Electrum request parser.
|
||||
//
|
||||
// Crucially, `read_proxy_headers` only ever buffers bytes it has already
|
||||
// read from the socket: when no PROXY header is present it returns those
|
||||
// bytes as `leftover` so the start of the first Electrum request is
|
||||
// preserved rather than discarded.
|
||||
//
|
||||
// The parsed addresses are forwarded over the channel; whether they are
|
||||
// actually used to identify the client is decided later based on the
|
||||
// configured `electrum-haproxy-depth` (a depth of 0 ignores them).
|
||||
let (proxy_addrs, leftover) = Connection::read_proxy_headers(&mut stream)?;
|
||||
tx.send(Message::Proxy(proxy_addrs))
|
||||
.chain_err(|| "channel closed")?;
|
||||
|
||||
let mut reader = BufReader::new(Cursor::new(leftover).chain(stream));
|
||||
loop {
|
||||
let mut line = Vec::<u8>::new();
|
||||
// Read up to max_line_size + 1 bytes to detect oversized lines
|
||||
@ -930,7 +711,7 @@ impl Connection {
|
||||
|
||||
pub fn run(mut self) {
|
||||
self.stats.clients.inc();
|
||||
let stream = self.stream.try_clone().expect("failed to clone TcpStream");
|
||||
let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
|
||||
let tx = self.chan.sender();
|
||||
|
||||
let die_please = self.die_please.take().unwrap();
|
||||
@ -950,12 +731,12 @@ impl Connection {
|
||||
|
||||
let max_line_size = self.max_line_size;
|
||||
let child = spawn_thread("reader", move || {
|
||||
Connection::handle_requests(stream, tx, max_line_size)
|
||||
Connection::handle_requests(reader, tx, max_line_size)
|
||||
});
|
||||
if let Err(e) = self.handle_replies(reply_receiver) {
|
||||
error!(
|
||||
"[{}] connection handling failed: {}",
|
||||
self.client_string(),
|
||||
self.stream.addr_string(),
|
||||
e.display_chain().to_string()
|
||||
);
|
||||
}
|
||||
@ -963,9 +744,8 @@ impl Connection {
|
||||
self.stats
|
||||
.subscriptions
|
||||
.sub(self.status_hashes.len() as i64);
|
||||
self.unregister_client();
|
||||
|
||||
let addr = self.client_string();
|
||||
let addr = self.stream.addr_string();
|
||||
debug!("[{}] shutting down connection", addr);
|
||||
// Drop the Arc so that the stream properly closes.
|
||||
drop(arc_stream);
|
||||
@ -1023,11 +803,6 @@ pub enum Message {
|
||||
Request(String),
|
||||
PeriodicUpdate,
|
||||
Done,
|
||||
/// The result of parsing zero or more PROXY-protocol (HAProxy) headers at
|
||||
/// the start of the connection. `None` means no PROXY header was present;
|
||||
/// `Some(addrs)` holds the source address reported by each proxy layer,
|
||||
/// outermost first.
|
||||
Proxy(Option<Vec<SocketAddr>>),
|
||||
}
|
||||
|
||||
pub enum Notification {
|
||||
@ -1116,10 +891,11 @@ impl RPC {
|
||||
|
||||
let notification = Channel::unbounded();
|
||||
|
||||
let server_features = {
|
||||
// Discovery is enabled when electrum-public-hosts is set
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
let discovery = config.electrum_public_hosts.clone().map(|hosts| {
|
||||
use crate::chain::genesis_hash;
|
||||
let hosts = config.electrum_public_hosts.clone().unwrap_or_default();
|
||||
Arc::new(ServerFeatures {
|
||||
let features = ServerFeatures {
|
||||
hosts,
|
||||
server_version: VERSION_STRING.clone(),
|
||||
genesis_hash: genesis_hash(config.network_type),
|
||||
@ -1127,15 +903,10 @@ impl RPC {
|
||||
protocol_max: PROTOCOL_VERSION,
|
||||
hash_function: "sha256".into(),
|
||||
pruning: None,
|
||||
})
|
||||
};
|
||||
|
||||
// Discovery is enabled when electrum-public-hosts is set
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
let discovery = config.electrum_public_hosts.as_ref().map(|_hosts| {
|
||||
};
|
||||
let discovery = Arc::new(DiscoveryManager::new(
|
||||
config.network_type,
|
||||
server_features.as_ref().clone(),
|
||||
features,
|
||||
PROTOCOL_VERSION,
|
||||
config.electrum_announce,
|
||||
config.tor_proxy,
|
||||
@ -1149,18 +920,12 @@ impl RPC {
|
||||
let max_subscriptions = config.electrum_max_subscriptions;
|
||||
let max_clients = config.electrum_max_clients;
|
||||
let idle_timeout = config.electrum_idle_timeout;
|
||||
let haproxy_depth = config.electrum_haproxy_depth;
|
||||
let connections_per_client = config.electrum_connections_per_client;
|
||||
|
||||
RPC {
|
||||
notification: notification.sender(),
|
||||
server: Some(spawn_thread("rpc", move || {
|
||||
let senders =
|
||||
Arc::new(Mutex::new(Vec::<crossbeam_channel::Sender<Message>>::new()));
|
||||
// Tracks the number of live connections per client (keyed by the
|
||||
// HAProxy-reported address when available, otherwise the peer IP).
|
||||
let client_counts: Arc<Mutex<HashMap<IpAddr, usize>>> =
|
||||
Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
let acceptor_shutdown = Channel::unbounded();
|
||||
let acceptor_shutdown_sender = acceptor_shutdown.sender();
|
||||
@ -1204,7 +969,6 @@ impl RPC {
|
||||
let query = Arc::clone(&query);
|
||||
let senders = Arc::clone(&senders);
|
||||
let stats = Arc::clone(&stats);
|
||||
let client_counts = Arc::clone(&client_counts);
|
||||
let garbage_sender = garbage_sender.clone();
|
||||
|
||||
// Kill the peers properly
|
||||
@ -1213,7 +977,6 @@ impl RPC {
|
||||
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
let discovery = discovery.clone();
|
||||
let server_features = Arc::clone(&server_features);
|
||||
|
||||
let spawned = spawn_thread("peer", move || {
|
||||
let addr = stream.addr_string();
|
||||
@ -1227,10 +990,6 @@ impl RPC {
|
||||
max_subscriptions,
|
||||
idle_timeout,
|
||||
peace_receiver,
|
||||
server_features,
|
||||
haproxy_depth,
|
||||
connections_per_client,
|
||||
client_counts,
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
discovery,
|
||||
);
|
||||
@ -1404,15 +1163,6 @@ impl ConnectionStream {
|
||||
}
|
||||
}
|
||||
|
||||
/// The direct peer IP address, if this is a TCP connection. Unix-socket
|
||||
/// connections have no IP and return `None`.
|
||||
fn direct_ip(&self) -> Option<IpAddr> {
|
||||
match self {
|
||||
ConnectionStream::Tcp(_, a) => Some(a.ip()),
|
||||
ConnectionStream::Unix(..) => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_clone(&self) -> std::io::Result<Self> {
|
||||
Ok(match self {
|
||||
ConnectionStream::Tcp(s, a) => ConnectionStream::Tcp(s.try_clone()?, *a),
|
||||
|
||||
@ -9,7 +9,7 @@ mod registry;
|
||||
|
||||
use asset::get_issuance_entropy;
|
||||
pub use asset::{lookup_asset, LiquidAsset};
|
||||
pub use registry::{AssetMeta, AssetRegistry, AssetSorting};
|
||||
pub use registry::{AssetRegistry, AssetSorting};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct IssuanceValue {
|
||||
|
||||
@ -14,8 +14,6 @@ use crate::errors::*;
|
||||
// (in number of hex characters, not bytes)
|
||||
|
||||
const DIR_PARTITION_LEN: usize = 2;
|
||||
const SEARCH_SORT_CANDIDATE_LIMIT: usize = 2000;
|
||||
|
||||
pub struct AssetRegistry {
|
||||
directory: path::PathBuf,
|
||||
assets_cache: HashMap<AssetId, (SystemTime, AssetMeta)>,
|
||||
@ -55,39 +53,6 @@ impl AssetRegistry {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn search(&self, query: &str, limit: usize) -> Vec<AssetEntry<'_>> {
|
||||
let query = query.trim();
|
||||
if query.is_empty() || limit == 0 {
|
||||
return vec![];
|
||||
}
|
||||
|
||||
let (mut results, candidates) = search_by(
|
||||
self.assets_cache
|
||||
.iter()
|
||||
.map(|(asset_id, (_, metadata))| (asset_id, metadata)),
|
||||
query,
|
||||
limit,
|
||||
|metadata| metadata.ticker.as_deref(),
|
||||
);
|
||||
|
||||
if results.len() < limit {
|
||||
let (name_matches, candidates) =
|
||||
search_by(candidates, query, limit - results.len(), |metadata| {
|
||||
Some(&metadata.name)
|
||||
});
|
||||
results.extend(name_matches);
|
||||
|
||||
if results.len() < limit {
|
||||
let (domain_matches, _) =
|
||||
search_by(candidates, query, limit - results.len(), AssetMeta::domain);
|
||||
results.extend(domain_matches);
|
||||
}
|
||||
}
|
||||
|
||||
results.truncate(limit);
|
||||
results
|
||||
}
|
||||
|
||||
pub fn fs_sync(&mut self) -> Result<()> {
|
||||
for entry in fs::read_dir(&self.directory).chain_err(|| "failed reading asset dir")? {
|
||||
let entry = entry.chain_err(|| "invalid fh")?;
|
||||
@ -161,7 +126,7 @@ pub struct AssetMeta {
|
||||
}
|
||||
|
||||
impl AssetMeta {
|
||||
pub(crate) fn domain(&self) -> Option<&str> {
|
||||
fn domain(&self) -> Option<&str> {
|
||||
self.entity["domain"].as_str()
|
||||
}
|
||||
}
|
||||
@ -227,72 +192,3 @@ fn lc_cmp_opt(a: &Option<String>, b: &Option<String>) -> cmp::Ordering {
|
||||
.map(|a| a.to_lowercase())
|
||||
.cmp(&b.as_ref().map(|b| b.to_lowercase()))
|
||||
}
|
||||
|
||||
fn search_by<'a, I, F>(
|
||||
candidates: I,
|
||||
query: &str,
|
||||
limit: usize,
|
||||
field: F,
|
||||
) -> (Vec<AssetEntry<'a>>, Vec<AssetEntry<'a>>)
|
||||
where
|
||||
I: IntoIterator<Item = AssetEntry<'a>>,
|
||||
F: Fn(&AssetMeta) -> Option<&str>,
|
||||
{
|
||||
let mut matches = vec![];
|
||||
let mut remaining = vec![];
|
||||
|
||||
for (asset_id, metadata) in candidates {
|
||||
let position = field(metadata).and_then(|field| {
|
||||
// registry fields are ascii, so we don't need full unicode case-folding
|
||||
ascii_ci_find(field, query).map(|position| (position, field))
|
||||
});
|
||||
|
||||
if let Some((position, field)) = position {
|
||||
if matches.len() >= SEARCH_SORT_CANDIDATE_LIMIT {
|
||||
continue;
|
||||
}
|
||||
matches.push((position, field, asset_id, metadata));
|
||||
} else {
|
||||
remaining.push((asset_id, metadata));
|
||||
}
|
||||
}
|
||||
|
||||
matches.sort_unstable_by(|a, b| {
|
||||
a.0.cmp(&b.0)
|
||||
.then_with(|| ascii_ci_cmp(a.1, b.1))
|
||||
.then_with(|| a.2.cmp(b.2))
|
||||
});
|
||||
|
||||
(
|
||||
matches
|
||||
.into_iter()
|
||||
.take(limit)
|
||||
.map(|(_, _, asset_id, metadata)| (asset_id, metadata))
|
||||
.collect(),
|
||||
remaining,
|
||||
)
|
||||
}
|
||||
|
||||
// zero-allocation case-insensitive ASCII substring search
|
||||
// returns the byte offset of the first match
|
||||
fn ascii_ci_find(haystack: &str, needle: &str) -> Option<usize> {
|
||||
let (haystack, needle) = (haystack.as_bytes(), needle.as_bytes());
|
||||
if needle.is_empty() {
|
||||
return Some(0);
|
||||
}
|
||||
haystack
|
||||
.windows(needle.len())
|
||||
.position(|window| window.eq_ignore_ascii_case(needle))
|
||||
}
|
||||
|
||||
// zero-allocation case-insensitive ASCII string comparison
|
||||
fn ascii_ci_cmp(a: &str, b: &str) -> cmp::Ordering {
|
||||
let (a, b) = (a.as_bytes(), b.as_bytes());
|
||||
for i in 0..a.len().min(b.len()) {
|
||||
match a[i].to_ascii_lowercase().cmp(&b[i].to_ascii_lowercase()) {
|
||||
cmp::Ordering::Equal => continue,
|
||||
ord => return ord,
|
||||
}
|
||||
}
|
||||
a.len().cmp(&b.len())
|
||||
}
|
||||
|
||||
@ -14,7 +14,7 @@ use crate::util::{is_spendable, BlockId, Bytes, TransactionStatus};
|
||||
#[cfg(feature = "liquid")]
|
||||
use crate::{
|
||||
chain::{asset::AssetRegistryLock, AssetId},
|
||||
elements::{lookup_asset, AssetMeta, AssetRegistry, AssetSorting, LiquidAsset},
|
||||
elements::{lookup_asset, AssetRegistry, AssetSorting, LiquidAsset},
|
||||
};
|
||||
|
||||
const FEE_ESTIMATES_TTL: u64 = 60; // seconds
|
||||
@ -271,15 +271,6 @@ impl Query {
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(feature = "liquid")]
|
||||
pub fn lookup_registry_asset(&self, asset_id: &AssetId) -> Result<Option<AssetMeta>> {
|
||||
let asset_db = self
|
||||
.asset_db
|
||||
.as_ref()
|
||||
.chain_err(|| "asset registry unavailable")?;
|
||||
Ok(asset_db.read().unwrap().get(asset_id).cloned())
|
||||
}
|
||||
|
||||
#[cfg(feature = "liquid")]
|
||||
pub fn list_registry_assets(
|
||||
&self,
|
||||
@ -307,27 +298,4 @@ impl Query {
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
Ok((total_num, results))
|
||||
}
|
||||
|
||||
#[cfg(feature = "liquid")]
|
||||
pub fn search_registry_assets<T, F>(
|
||||
&self,
|
||||
search: &str,
|
||||
limit: usize,
|
||||
mut map: F,
|
||||
) -> Result<Vec<T>>
|
||||
where
|
||||
F: FnMut(&AssetId, &AssetMeta) -> T,
|
||||
{
|
||||
let asset_db = self
|
||||
.asset_db
|
||||
.as_ref()
|
||||
.chain_err(|| "asset registry unavailable")?;
|
||||
Ok(asset_db
|
||||
.read()
|
||||
.unwrap()
|
||||
.search(search, limit)
|
||||
.into_iter()
|
||||
.map(|(asset_id, metadata)| map(asset_id, metadata))
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
|
||||
@ -997,6 +997,12 @@ impl ChainQuery {
|
||||
let mut processed_items = 0;
|
||||
let mut lastblock = None;
|
||||
|
||||
// If we need to iterate over 500 history entries to
|
||||
// get one utxo then your address is too active and should be
|
||||
// throttled.
|
||||
// TODO: Think of better way to throttle.
|
||||
let tx_history_limit = limit * 500;
|
||||
|
||||
for (history, blockid) in history_iter {
|
||||
processed_items += 1;
|
||||
lastblock = Some(blockid.hash);
|
||||
@ -1017,6 +1023,9 @@ impl ChainQuery {
|
||||
if utxos.len() > limit {
|
||||
bail!(ErrorKind::TooManyUtxos(limit))
|
||||
}
|
||||
if processed_items > tx_history_limit {
|
||||
bail!(ErrorKind::TooManyTxs(tx_history_limit))
|
||||
}
|
||||
}
|
||||
|
||||
Ok((utxos, lastblock, processed_items))
|
||||
|
||||
85
src/rest.rs
85
src/rest.rs
@ -35,7 +35,7 @@ use hyperlocal::UnixServerExt;
|
||||
use std::{cmp, fs};
|
||||
#[cfg(feature = "liquid")]
|
||||
use {
|
||||
crate::elements::{peg::PegoutValue, AssetMeta, AssetSorting, IssuanceValue},
|
||||
crate::elements::{peg::PegoutValue, AssetSorting, IssuanceValue},
|
||||
elements::{
|
||||
confidential::{Asset, Nonce, Value},
|
||||
encode, AssetId,
|
||||
@ -59,12 +59,6 @@ const MULTI_ADDRESS_LIMIT: usize = 300;
|
||||
const ASSETS_PER_PAGE: usize = 25;
|
||||
#[cfg(feature = "liquid")]
|
||||
const ASSETS_MAX_PER_PAGE: usize = 100;
|
||||
#[cfg(feature = "liquid")]
|
||||
const ASSETS_SEARCH_DEFAULT_LIMIT: usize = 15;
|
||||
#[cfg(feature = "liquid")]
|
||||
const ASSETS_SEARCH_MAX_LIMIT: usize = 100;
|
||||
#[cfg(feature = "liquid")]
|
||||
const ASSETS_SEARCH_MAX_QUERY_LEN: usize = 64;
|
||||
|
||||
const TTL_LONG: u32 = 157_784_630; // ttl for static resources (5 years)
|
||||
const TTL_SHORT: u32 = 10; // ttl for volatie resources
|
||||
@ -138,32 +132,6 @@ impl BlockValue {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "liquid")]
|
||||
#[derive(Serialize)]
|
||||
struct AssetRegistrySearchResult {
|
||||
asset_id: AssetId,
|
||||
name: String,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
ticker: Option<String>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
domain: Option<String>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "liquid")]
|
||||
impl AssetRegistrySearchResult {
|
||||
fn new(asset_id: &AssetId, meta: &AssetMeta) -> Self {
|
||||
let domain = meta.domain().map(String::from);
|
||||
Self {
|
||||
asset_id: *asset_id,
|
||||
name: meta.name.clone(),
|
||||
ticker: meta.ticker.clone(),
|
||||
domain,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculate the difficulty of a BlockHeader
|
||||
/// using Bitcoin Core code ported to Rust.
|
||||
///
|
||||
@ -1806,42 +1774,6 @@ fn handle_request(
|
||||
json_response(recent, TTL_MEMPOOL_RECENT)
|
||||
}
|
||||
|
||||
(&Method::GET, Some(&"fee-estimates"), None, None, None, None) => {
|
||||
json_response(query.estimate_fee_map(), TTL_SHORT)
|
||||
}
|
||||
|
||||
#[cfg(feature = "liquid")]
|
||||
(&Method::GET, Some(&"assets"), Some(&"registry"), Some(&"search"), None, None) => {
|
||||
let search = query_params.get("q").map(|q| q.trim()).unwrap_or("");
|
||||
let assets = if search.is_empty() {
|
||||
vec![]
|
||||
} else if search.chars().count() > ASSETS_SEARCH_MAX_QUERY_LEN {
|
||||
return Err(HttpError(
|
||||
StatusCode::BAD_REQUEST,
|
||||
"search query too long".to_string(),
|
||||
));
|
||||
} else {
|
||||
let limit = query_params
|
||||
.get("limit")
|
||||
.and_then(|n| n.parse::<usize>().ok())
|
||||
.unwrap_or(ASSETS_SEARCH_DEFAULT_LIMIT)
|
||||
.min(ASSETS_SEARCH_MAX_LIMIT);
|
||||
|
||||
query
|
||||
.search_registry_assets(search, limit, AssetRegistrySearchResult::new)
|
||||
.map_err(|e| {
|
||||
HttpError(StatusCode::SERVICE_UNAVAILABLE, e.description().to_string())
|
||||
})?
|
||||
};
|
||||
|
||||
Ok(Response::builder()
|
||||
// Disable caching because we don't currently support caching with query string params
|
||||
.header("Cache-Control", "no-store")
|
||||
.header("Content-Type", "application/json")
|
||||
.body(Body::from(serde_json::to_string(&assets)?))
|
||||
.unwrap())
|
||||
}
|
||||
|
||||
#[cfg(feature = "liquid")]
|
||||
(&Method::GET, Some(&"assets"), Some(&"registry"), None, None, None) => {
|
||||
let start_index: usize = query_params
|
||||
@ -1868,21 +1800,6 @@ fn handle_request(
|
||||
.unwrap())
|
||||
}
|
||||
|
||||
#[cfg(feature = "liquid")]
|
||||
(&Method::GET, Some(&"assets"), Some(&"registry"), Some(asset_str), None, None) => {
|
||||
let asset_id = AssetId::from_str(asset_str)?;
|
||||
let registry_entry = query
|
||||
.lookup_registry_asset(&asset_id)
|
||||
.map_err(|e| {
|
||||
HttpError(StatusCode::SERVICE_UNAVAILABLE, e.description().to_string())
|
||||
})?
|
||||
.ok_or_else(|| {
|
||||
HttpError::not_found("Asset id not found in registry".to_string())
|
||||
})?;
|
||||
|
||||
json_response(registry_entry, TTL_SHORT)
|
||||
}
|
||||
|
||||
#[cfg(feature = "liquid")]
|
||||
(&Method::GET, Some(&"asset"), Some(asset_str), None, None, None) => {
|
||||
let asset_id = AssetId::from_str(asset_str)?;
|
||||
|
||||
6
start
6
start
@ -5,7 +5,6 @@ DAEMON=bitcoin
|
||||
NETWORK=mainnet
|
||||
FEATURES=default
|
||||
DB_FOLDER=/electrs
|
||||
ASSET_DB_ARGS=()
|
||||
NODENAME=$(hostname|cut -d . -f1)
|
||||
LOCATION=$(hostname|cut -d . -f2)
|
||||
USAGE="Usage: $0 (mainnet|testnet|signet|liquid|liquidtestnet) [popular-scripts]"
|
||||
@ -65,7 +64,6 @@ case "${1}" in
|
||||
DAEMON=elements
|
||||
NETWORK=liquid
|
||||
FEATURES=liquid
|
||||
ASSET_DB_ARGS=(--asset-db-path /elements/asset_registry_db)
|
||||
THREADS=$((NPROC / 8))
|
||||
CRONJOB_TIMING="12 4 * * *"
|
||||
;;
|
||||
@ -73,7 +71,6 @@ case "${1}" in
|
||||
DAEMON=elements
|
||||
NETWORK=liquidtestnet
|
||||
FEATURES=liquid
|
||||
ASSET_DB_ARGS=(--asset-db-path /elements/asset_registry_testnet_db)
|
||||
THREADS=$((NPROC / 8))
|
||||
CRONJOB_TIMING="17 4 * * *"
|
||||
;;
|
||||
@ -158,7 +155,7 @@ do
|
||||
ELECTRUM_TXS_LIMIT=500
|
||||
ELECTRUM_MAX_LINE_SIZE=1048576 # 1 MiB
|
||||
ELECTRUM_MAX_SUBSCRIPTIONS=100
|
||||
ELECTRUM_MAX_CLIENTS=1000
|
||||
ELECTRUM_MAX_CLIENTS=10
|
||||
MAIN_LOOP_DELAY=500
|
||||
DAEMON_CONF="${HOME}/${DAEMON}.conf"
|
||||
HTTP_SOCKET_FILE="${HOME}/socket/esplora-${DAEMON}-${NETWORK}"
|
||||
@ -255,7 +252,6 @@ do
|
||||
--network "${NETWORK}" \
|
||||
--daemon-dir "${HOME}" \
|
||||
--db-dir "${DB_FOLDER}" \
|
||||
"${ASSET_DB_ARGS[@]}" \
|
||||
--main-loop-delay "${MAIN_LOOP_DELAY}" \
|
||||
--rpc-socket-file "${RPC_SOCKET_FILE}" \
|
||||
--http-socket-file "${HTTP_SOCKET_FILE}" \
|
||||
|
||||
Loading…
Reference in New Issue
Block a user