Compare commits

...

6 Commits

Author SHA1 Message Date
junderw
0d12575047
Fix: Upstream issues with rare panics 2024-09-04 22:49:20 +09:00
junderw
fc86775c4b
Merge remote-tracking branch 'mempool/mempool' into junderw/log-client-ip 2024-09-04 21:40:37 +09:00
junderw
071a72331d
Fix cargo fmt 2024-07-27 15:32:36 +09:00
junderw
9f6506594d
Unwind panics 2024-07-27 15:18:39 +09:00
junderw
db47f5ff4f
Fix: Indexing panic + read errors when headers hit buffer boundary 2024-07-27 15:18:39 +09:00
junderw
53de9f8c71
Feat: Log client IP during REST and Electrum requests 2024-07-27 15:18:36 +09:00
9 changed files with 607 additions and 97 deletions

38
Cargo.lock generated
View File

@ -391,6 +391,12 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "doc-comment"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
[[package]]
name = "either"
version = "1.6.1"
@ -843,9 +849,11 @@ dependencies = [
"lazy_static",
"libc",
"log",
"memchr",
"num_cpus",
"page_size",
"prometheus",
"proxy-protocol",
"rayon",
"rocksdb",
"serde",
@ -1098,6 +1106,15 @@ version = "2.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf7e6d18738ecd0902d30d1ad232c9125985a3422929b16c65517b38adc14f96"
[[package]]
name = "proxy-protocol"
version = "0.5.0"
source = "git+https://github.com/junderw/proxy-protocol?rev=5f5431ecdae75c7e8aba0f7aebcfc2e0102b70dc#5f5431ecdae75c7e8aba0f7aebcfc2e0102b70dc"
dependencies = [
"bytes",
"snafu",
]
[[package]]
name = "quote"
version = "1.0.31"
@ -1512,6 +1529,27 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
[[package]]
name = "snafu"
version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eab12d3c261b2308b0d80c26fffb58d17eba81a4be97890101f416b478c79ca7"
dependencies = [
"doc-comment",
"snafu-derive",
]
[[package]]
name = "snafu-derive"
version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1508efa03c362e23817f96cde18abed596a25219a8b2c66e8db33c03543d315b"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.91",
]
[[package]]
name = "socket2"
version = "0.4.4"

View File

@ -43,8 +43,10 @@ libc = "0.2.81"
log = "0.4.11"
socket2 = { version = "0.4", features = ["all"] }
num_cpus = "1.12.0"
memchr = "2.4.1"
page_size = "0.4.2"
prometheus = "0.13"
proxy-protocol = { version = "0.5", features = ["always_exhaustive"] }
rayon = "1.5.0"
rocksdb = "0.21.0"
serde = "1.0.118"
@ -71,9 +73,13 @@ tempfile = "3.0"
[profile.release]
lto = true
panic = 'abort'
panic = 'unwind' # This is default, but required, so explicitly writing it
codegen-units = 1
[patch.crates-io.electrum-client]
git = "https://github.com/Blockstream/rust-electrum-client"
rev = "d3792352992a539afffbe11501d1aff9fd5b919d" # add-peer branch
[patch.crates-io.proxy-protocol]
git = "https://github.com/junderw/proxy-protocol"
rev = "5f5431ecdae75c7e8aba0f7aebcfc2e0102b70dc" # fix/eof-panics branch

View File

@ -118,6 +118,15 @@ fn run_server(config: Arc<Config>) -> Result<()> {
);
}
if std::env::var("ELECTRS_PERIODIC_THREAD_LOGGER").is_ok() {
electrs::util::spawn_thread("periodic_thread_logger", || loop {
electrs::util::with_spawned_threads(|threads| {
debug!("THREADS: {:?}", threads);
});
std::thread::sleep(std::time::Duration::from_millis(5000));
});
}
loop {
if let Err(err) = signal.wait(Duration::from_millis(config.main_loop_delay), true) {
info!("stopping server: {}", err);

View File

@ -40,6 +40,8 @@ pub struct Config {
pub daemon_rpc_addr: SocketAddr,
pub cookie: Option<String>,
pub electrum_rpc_addr: SocketAddr,
pub electrum_proxy_depth: usize,
pub rest_proxy_depth: usize,
pub http_addr: SocketAddr,
pub http_socket_file: Option<PathBuf>,
pub rpc_socket_file: Option<PathBuf>,
@ -150,6 +152,23 @@ impl Config {
.help("Electrum server JSONRPC 'addr:port' to listen on (default: '127.0.0.1:50001' for mainnet, '127.0.0.1:60001' for testnet and '127.0.0.1:60401' for regtest)")
.takes_value(true),
)
.arg(
Arg::with_name("electrum_proxy_depth")
.long("electrum-proxy-depth")
.help("Electrum server's PROXY protocol header depth. \
ie. a value of 2 means the 2nd closest hop's PROXY header \
will be used to find the source IP. A value of 0 means all \
IPs are ignored. (default: 0)")
.takes_value(true),
)
.arg(
Arg::with_name("rest_proxy_depth")
.long("rest-proxy-depth")
.help("REST server's X-Forwarded-For IP address depth. \
ie. a value of 2 means the 2nd IP address in the header(s) is used. \
(default: 0)")
.takes_value(true),
)
.arg(
Arg::with_name("http_addr")
.long("http-addr")
@ -440,6 +459,16 @@ impl Config {
.unwrap_or(&format!("127.0.0.1:{}", default_electrum_port)),
"Electrum RPC",
);
let electrum_proxy_depth = m
.value_of("electrum_proxy_depth")
.unwrap_or("0")
.parse::<usize>()
.expect("invalid electrum_proxy_depth");
let rest_proxy_depth = m
.value_of("rest_proxy_depth")
.unwrap_or("0")
.parse::<usize>()
.expect("invalid rest_proxy_depth");
let http_addr: SocketAddr = str_to_socketaddr(
m.value_of("http_addr")
.unwrap_or(&format!("127.0.0.1:{}", default_http_port)),
@ -515,6 +544,8 @@ impl Config {
cookie,
utxos_limit: value_t_or_exit!(m, "utxos_limit", usize),
electrum_rpc_addr,
electrum_proxy_depth,
rest_proxy_depth,
electrum_txs_limit: value_t_or_exit!(m, "electrum_txs_limit", usize),
electrum_banner,
http_addr,

View File

@ -1,3 +1,4 @@
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::convert::TryInto;
use std::fs;
@ -9,13 +10,14 @@ use std::os::unix::fs::FileTypeExt;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::Path;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::thread;
use bitcoin::hashes::sha256d::Hash as Sha256dHash;
use error_chain::ChainedError;
use hex;
use proxy_protocol::{version1, version2, ProxyHeader};
use serde_json::{from_str, Value};
use sha2::{Digest, Sha256};
@ -100,39 +102,78 @@ fn get_status_hash(txs: Vec<(Txid, Option<BlockId>)>, query: &Query) -> Option<F
}
struct Connection {
proxy_proto_addr: Cell<Option<Option<SocketAddr>>>,
electrum_proxy_depth: usize,
// Chain info related
query: Arc<Query>,
last_header_entry: Option<HeaderEntry>,
status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
stream: ConnectionStream,
chan: SyncChannel<Message>,
stats: Arc<Stats>,
txs_limit: usize,
die_please: Option<Receiver<()>>,
// Stream related
stream: ConnectionStream,
_arc_stream: Arc<ConnectionStream>, // Needs to be kept alive until drop
reader: RefCell<Option<BufReader<ConnectionStream>>>,
// Channel related
message_chan: SyncChannel<Message>,
shutdown_replies: crossbeam_channel::Receiver<()>, // For reply select branch
shutdown_send: crossbeam_channel::Sender<()>, // For Drop. Kills properly-die thread
// Discovery related
#[cfg(feature = "electrum-discovery")]
discovery: Option<Arc<DiscoveryManager>>,
}
impl Drop for Connection {
fn drop(&mut self) {
let _ = self.shutdown_send.send(());
}
}
impl Connection {
pub fn new(
query: Arc<Query>,
stream: ConnectionStream,
stats: Arc<Stats>,
txs_limit: usize,
die_please: Receiver<()>,
(txs_limit, electrum_proxy_depth): (usize, usize),
shutdown: SyncChannel<()>,
#[cfg(feature = "electrum-discovery")] discovery: Option<Arc<DiscoveryManager>>,
) -> Connection {
Connection {
// Channels
let (reply_killer, shutdown_replies) = crossbeam_channel::unbounded();
let shutdown_send = shutdown.sender();
// Using this Arc to prevent any thread leaks from keeping the stream alive
let _arc_stream = Arc::new(stream.try_clone().expect("failed to clone TcpStream"));
let maybe_stream = Arc::downgrade(&_arc_stream);
spawn_thread("properly-die", move || {
let _ = shutdown.receiver().map(|c| c.recv());
let _ = maybe_stream.upgrade().map(|s| s.shutdown(Shutdown::Both));
let _ = reply_killer.send(());
});
let ret = Connection {
proxy_proto_addr: Cell::new(None),
electrum_proxy_depth,
query,
last_header_entry: None, // disable header subscription for now
status_hashes: HashMap::new(),
reader: RefCell::new(Some(BufReader::new(
stream.try_clone().expect("failed to clone TcpStream"),
))),
stream,
chan: SyncChannel::new(10),
_arc_stream,
message_chan: SyncChannel::new(10),
stats,
txs_limit,
die_please: Some(die_please),
shutdown_replies,
shutdown_send,
#[cfg(feature = "electrum-discovery")]
discovery,
}
};
// Wait for first request to find
ret.get_source_addr();
ret
}
fn blockchain_headers_subscribe(&mut self) -> Result<Value> {
@ -354,8 +395,12 @@ impl Connection {
let tx = params.first().chain_err(|| "missing tx")?;
let tx = tx.as_str().chain_err(|| "non-string tx")?.to_string();
let txid = self.query.broadcast_raw(&tx)?;
if let Err(e) = self.chan.sender().try_send(Message::PeriodicUpdate) {
warn!("failed to issue PeriodicUpdate after broadcast: {}", e);
if let Err(e) = self.message_chan.sender().try_send(Message::PeriodicUpdate) {
warn!(
"[{}] failed to issue PeriodicUpdate after broadcast: {}",
self.get_source_addr_str(),
e
);
}
Ok(json!(txid))
}
@ -455,7 +500,8 @@ impl Connection {
Ok(result) => json!({"jsonrpc": "2.0", "id": id, "result": result}),
Err(e) => {
warn!(
"rpc #{} {} {:?} failed: {}",
"[{}] rpc #{} {} {:?} failed: {}",
self.get_source_addr_str(),
id,
method,
params,
@ -512,12 +558,45 @@ impl Connection {
Ok(())
}
fn handle_replies(&mut self, shutdown: crossbeam_channel::Receiver<()>) -> Result<()> {
fn get_source_addr_str(&self) -> String {
self.get_source_addr()
.map(|s| s.to_string())
.unwrap_or_else(|| self.stream.addr_string())
}
/// This will only check the PROXY protocol once
/// and store the result in the first Option.
/// Some(None) means "we checked, but there was no address"
/// The inner option is returned as a Copy.
fn get_source_addr(&self) -> Option<SocketAddr> {
// Option<SocketAddr> is Copy
if let Some(v) = self.proxy_proto_addr.get() {
v
} else {
let v = self
.reader
.borrow_mut()
.as_mut()
.and_then(|r| r.fill_buf().ok())
.and_then(|mut available| {
parse_proxy_headers(&mut available, self.electrum_proxy_depth).0
})
.map(|addr| {
trace!("RPC Received PROXY Protocol address: {}", addr);
addr
});
self.proxy_proto_addr.set(Some(v));
v
}
}
fn handle_replies(&mut self) -> Result<()> {
let addr_str = self.get_source_addr_str();
loop {
crossbeam_channel::select! {
recv(self.chan.receiver()) -> msg => {
let msg = msg.chain_err(|| "channel closed")?;
trace!("RPC {:?}", msg);
recv(self.message_chan.receiver().chain_err(|| format!("[{addr_str}] channel closed"))?) -> msg => {
let msg = msg.chain_err(|| format!("[{addr_str}] channel closed"))?;
trace!("RPC [{addr_str}] {:?}", msg);
match msg {
Message::Request(line) => {
let result = self.handle_line(&line);
@ -526,17 +605,17 @@ impl Connection {
Message::PeriodicUpdate => {
let values = self
.update_subscriptions()
.chain_err(|| "failed to update subscriptions")?;
.chain_err(|| format!("[{addr_str}] failed to update subscriptions"))?;
self.send_values(&values)?
}
Message::Done => {
self.chan.close();
self.message_chan.close();
return Ok(());
}
}
}
recv(shutdown) -> _ => {
self.chan.close();
recv(self.shutdown_replies) -> _ => {
self.message_chan.close();
return Ok(());
}
}
@ -587,29 +666,32 @@ impl Connection {
fn handle_requests(
mut reader: BufReader<ConnectionStream>,
tx: crossbeam_channel::Sender<Message>,
tx: &crossbeam_channel::Sender<Message>,
addr_str: &str,
) -> Result<()> {
loop {
let mut line = Vec::<u8>::new();
reader
.read_until(b'\n', &mut line)
.chain_err(|| "failed to read a request")?;
if line.is_empty() {
tx.send(Message::Done).chain_err(|| "channel closed")?;
let mut recv_data = Vec::<u8>::new();
match read_until(&mut reader, b'\n', &mut recv_data) {
Ok(bytes) => trace!("[{addr_str}] Read {bytes} bytes from connection"),
Err(e) => bail!("[{addr_str}] Failed to read: {}", e),
}
if recv_data.is_empty() {
return Ok(());
} else {
if line.starts_with(&[22, 3, 1]) {
if recv_data.starts_with(&[22, 3, 1]) {
// (very) naive SSL handshake detection
let _ = tx.send(Message::Done);
bail!("invalid request - maybe SSL-encrypted data?: {:?}", line)
bail!(
"[{addr_str}] invalid request - maybe SSL-encrypted data?: {:?}",
recv_data
)
}
match String::from_utf8(line) {
match serde_json::from_slice(&recv_data) {
Ok(req) => tx
.send(Message::Request(req))
.chain_err(|| "channel closed")?,
.chain_err(|| format!("[{}] channel closed", addr_str))?,
Err(err) => {
let _ = tx.send(Message::Done);
bail!("invalid UTF8: {}", err)
bail!("[{}] invalid UTF8: {}", addr_str, err)
}
}
}
@ -618,29 +700,36 @@ impl Connection {
pub fn run(mut self) {
self.stats.clients.inc();
let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
let tx = self.chan.sender();
let reader = self.reader.take().unwrap();
let tx = self.message_chan.sender();
let die_please = self.die_please.take().unwrap();
let (reply_killer, reply_receiver) = crossbeam_channel::unbounded();
// We create a clone of the stream and put it in an Arc
// This will drop at the end of the function.
let arc_stream = Arc::new(self.stream.try_clone().expect("failed to clone TcpStream"));
// We don't want to keep the stream alive until SIGINT
// It should drop (close) no matter what.
let maybe_stream = Arc::downgrade(&arc_stream);
spawn_thread("properly-die", move || {
let _ = die_please.recv();
let _ = maybe_stream.upgrade().map(|s| s.shutdown(Shutdown::Both));
let _ = reply_killer.send(());
let rpc_addr = self.get_source_addr();
let addr_str = self.get_source_addr_str();
let shutdown_send = self.shutdown_send.clone();
let child = spawn_thread("reader", move || {
let addr_str = rpc_addr
.map(|a| a.to_string())
.unwrap_or_else(|| reader.get_ref().addr_string());
let result =
std::panic::catch_unwind(|| Connection::handle_requests(reader, &tx, &addr_str))
.unwrap_or_else(|e| {
Err(format!(
"[{}] RPC Panic in request handler: {}",
addr_str,
parse_panic_error(&e)
)
.into())
});
// This shuts down the other graceful shutdown thread,
// which also shuts down the handle_replies loop
// regardless of panic, Err, or Ok
let _ = shutdown_send.send(());
result
});
let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
if let Err(e) = self.handle_replies(reply_receiver) {
if let Err(e) = self.handle_replies() {
error!(
"[{}] connection handling failed: {}",
self.stream.addr_string(),
addr_str,
e.display_chain().to_string()
);
}
@ -649,13 +738,11 @@ impl Connection {
.subscriptions
.sub(self.status_hashes.len() as i64);
let addr = self.stream.addr_string();
debug!("[{}] shutting down connection", addr);
// Drop the Arc so that the stream properly closes.
drop(arc_stream);
debug!("[{}] shutting down connection", addr_str);
let _ = self.stream.shutdown(Shutdown::Both);
if let Err(err) = child.join().expect("receiver panicked") {
error!("[{}] receiver failed: {}", addr, err);
self.message_chan.close();
if let Err(err) = child.join().expect("receiver can't panic") {
error!("[{}] receiver failed: {}", addr_str, err);
}
}
}
@ -813,6 +900,7 @@ impl RPC {
});
let txs_limit = config.electrum_txs_limit;
let electrum_proxy_depth = config.electrum_proxy_depth;
RPC {
notification: notification.sender(),
@ -834,7 +922,6 @@ impl RPC {
let (garbage_sender, garbage_receiver) = crossbeam_channel::unbounded();
while let Some(stream) = acceptor.receiver().recv().unwrap() {
let addr = stream.addr_string();
// explicitely scope the shadowed variables for the new thread
let query = Arc::clone(&query);
let senders = Arc::clone(&senders);
@ -842,36 +929,38 @@ impl RPC {
let garbage_sender = garbage_sender.clone();
// Kill the peers properly
let (killer, peace_receiver) = std::sync::mpsc::channel();
let killer_clone = killer.clone();
let shutdown_channel = SyncChannel::new(1);
let shutdown_sender = shutdown_channel.sender();
#[cfg(feature = "electrum-discovery")]
let discovery = discovery.clone();
let spawned = spawn_thread("peer", move || {
let addr = stream.addr_string();
info!("[{}] connected peer", addr);
let shutdown_sender = shutdown_channel.sender();
info!("connected peer. waiting for first request...");
let conn = Connection::new(
query,
stream,
stats,
txs_limit,
peace_receiver,
(txs_limit, electrum_proxy_depth),
shutdown_channel,
#[cfg(feature = "electrum-discovery")]
discovery,
);
senders.lock().unwrap().push(conn.chan.sender());
let addr = conn.get_source_addr_str();
info!("[{}] connected peer", addr);
senders.lock().unwrap().push(conn.message_chan.sender());
conn.run();
info!("[{}] disconnected peer", addr);
let _ = killer_clone.send(());
let _ = shutdown_sender.send(());
let _ = garbage_sender.send(std::thread::current().id());
});
trace!("[{}] spawned {:?}", addr, spawned.thread().id());
threads.insert(spawned.thread().id(), (spawned, killer));
trace!("spawned {:?}", spawned.thread().id());
threads.insert(spawned.thread().id(), (spawned, shutdown_sender));
while let Ok(id) = garbage_receiver.try_recv() {
if let Some((thread, killer)) = threads.remove(&id) {
trace!("[{}] joining {:?}", addr, id);
trace!("joining {:?}", id);
let _ = killer.send(());
if let Err(error) = thread.join() {
error!("failed to join {:?}: {:?}", id, error);
@ -1035,7 +1124,7 @@ impl ConnectionStream {
fn addr_string(&self) -> String {
match self {
ConnectionStream::Tcp(_, a) => format!("{a}"),
ConnectionStream::Unix(_, a, _) => format!("{a:?}"),
ConnectionStream::Unix(_, _, _) => "(Unix socket)".to_string(),
}
}
@ -1093,3 +1182,281 @@ impl Read for ConnectionStream {
}
}
}
/// This is a slightly modified version of read_until from standard library BufRead trait.
/// After every read we check if there's a PROXY protocol header at the beginning of the read.
fn read_until(r: &mut impl BufRead, delim: u8, buf: &mut Vec<u8>) -> std::io::Result<usize> {
let mut read = 0;
let mut carry_over_arr = [0_u8; 256];
let mut carrying_over = 0;
loop {
let (done, used) = {
let mut available = match r.fill_buf() {
Ok(n) => n,
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
// If carry over, try to parse PROXY headers.
let (carry_skipped_count, exit_error) = if carrying_over > 0 {
process_carry_over(&mut carrying_over, &mut carry_over_arr, &mut available)
} else {
(0, false)
};
// Rare edge case. If we carry over a proxy parse and it still errors
// It is most likely an unknown format
if exit_error {
return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof));
}
// Try parsing PROXY headers after every read.
let (_, skipped_count, exit_error) = parse_proxy_headers(&mut available, 0);
let skipped_count = carry_skipped_count + skipped_count;
match (memchr::memchr(delim, available), exit_error) {
(Some(i), false) => {
buf.extend_from_slice(&available[..=i]);
(true, i + 1 + skipped_count)
}
(None, _) | (_, true) => {
// Added: carry over
insert_carry_over(&mut carrying_over, &mut carry_over_arr, available);
if !exit_error {
buf.extend_from_slice(available);
}
(false, available.len() + skipped_count)
}
}
};
r.consume(used);
read += used;
if done || used == 0 {
return Ok(read);
}
}
}
fn proxy_header_to_source_socket_addr(p_header: ProxyHeader) -> Option<SocketAddr> {
match p_header {
ProxyHeader::Version1 {
addresses: version1::ProxyAddresses::Ipv4 { source, .. },
} => Some(SocketAddr::V4(source)),
ProxyHeader::Version1 {
addresses: version1::ProxyAddresses::Ipv6 { source, .. },
} => Some(SocketAddr::V6(source)),
ProxyHeader::Version2 {
addresses: version2::ProxyAddresses::Ipv4 { source, .. },
..
} => Some(SocketAddr::V4(source)),
ProxyHeader::Version2 {
addresses: version2::ProxyAddresses::Ipv6 { source, .. },
..
} => Some(SocketAddr::V6(source)),
_ => None,
}
}
fn parse_proxy_headers(
buf: &mut &[u8],
electrum_proxy_depth: usize,
) -> (Option<SocketAddr>, usize, bool) {
trace!("Starting parse PROXY headers: {:?}", buf.get(..12));
let mut addr = None;
let mut current_header_index = 0;
let before_len = buf.len();
// Save the original state of the buf
let mut original_buf = *buf;
let mut error_exit = false;
// The last header is the outer-most proxy
// Warning do not early return. ONLY break the loop.
loop {
let p_header = match proxy_protocol::parse(buf) {
Ok(h) => h,
// NotProxyHeader definitely does not move the buf pointer.
Err(proxy_protocol::ParseError::NotProxyHeader) => break,
// This can move the buf cursor forward
// and will most likely end in an error higher in the call stack
// This means "PROXY protocol was used, but it was in an unknown format"
// (Maybe someday if nginx/etc. uses a new version of the protocol
// and we don't update the dependency to a version that handles the new
// version, it might break.)
// OR it could mean a PROXY header fell on the BufReader buffer boundary.
Err(_) => {
// This is the buf state before this error returned.
// This match arm will modify buf past the version bytes
// and stop in a state pointing to the middle of a broken header.
// We return the buf state to the beginning of the previous version bytes.
*buf = original_buf;
error_exit = true;
break;
}
};
// After each successful header parse, save the new buf state.
original_buf = *buf;
trace!("Parsed PROXY protocol header: {:?}", p_header);
// Increment from 0 to 1 before the first check
current_header_index += 1;
// 0 should always continue
// 1 should only get the 1st header's IP address etc.
if current_header_index != electrum_proxy_depth {
continue;
}
// The address is only attempted to be
// parsed when the 1 based index is equal
addr = proxy_header_to_source_socket_addr(p_header);
}
(addr, before_len - buf.len(), error_exit)
}
fn parse_panic_error(e: &(dyn std::any::Any + Send)) -> &str {
if let Some(s) = e.downcast_ref::<&str>() {
s
} else if let Some(s) = e.downcast_ref::<String>() {
s
} else {
"Unknown panic"
}
}
/// The goal of this function is to take the carried over bytes from the last loop
/// and connect them with the first bytes of the next read, then check if it's a header.
/// This is to prevent headers from straddling the BufReader's buffer end.
/// A simple static array should be quick and easy.
fn process_carry_over(
carrying_over: &mut usize,
carry_over: &mut [u8],
available: &mut &[u8],
) -> (usize, bool) {
// Step 0: Copy as much from available into carry_over to try and parse
// How much space do we have left in the array?
let empty_space = carry_over.len() - *carrying_over;
// How many bytes should we copy over?
let copy_bytes = available.len().min(empty_space);
// Copy over the bytes to join with the carried over bytes
carry_over[*carrying_over..*carrying_over + copy_bytes]
.copy_from_slice(&available[..copy_bytes]);
// Step 1: Figure out if it was a proxy header or not.
#[allow(clippy::redundant_slicing)]
let mut cursor = &carry_over[..];
let (_, skipped_count, exit_error) = parse_proxy_headers(&mut cursor, 0);
// Step 2: Figure out how much we need to skip available forward
let skip_count = skipped_count.saturating_sub(*carrying_over);
// Step 3: Move the available cursor
*available = &available[skip_count..];
// Step 4: Reset carrying over (writing 0s to the array is unnecessary)
*carrying_over = 0;
// Return the skip count so we can call consume later
(skip_count, exit_error)
}
/// Insert the carry over
fn insert_carry_over(carrying_over: &mut usize, carry_over_arr: &mut [u8], available: &[u8]) {
*carrying_over = available.len().min(carry_over_arr.len());
carry_over_arr[..*carrying_over].copy_from_slice(&available[..*carrying_over]);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_read_until() {
// len 48
let v1 = "PROXY TCP6 ab:ce:ef:01:23:45:67:89 ::1 0 65535\r\n"
.as_bytes()
.to_vec();
// len 31
let v2 =
hex::decode("0d0a0d0a000d0a515549540a2111000f7f000001c0a80001ffff0101450000").unwrap();
let simple_json = "{}\n".as_bytes().to_vec();
// len 26
let larger_json = r#"{"id":2,"name":"electrs"}\n"#.as_bytes().to_vec();
let vectors: Vec<(Vec<u8>, usize, &[u8], std::result::Result<usize, String>)> = vec![
// Simple JSON with LF
(simple_json.clone(), 3, &simple_json, Ok(3)),
// Simple JSON with LF + PROXY v1
(
[v1.clone(), simple_json.clone()].concat(),
51,
&simple_json,
Ok(51),
),
// Simple JSON with LF + PROXY v2
(
[v2.clone(), simple_json.clone()].concat(),
34,
&simple_json,
Ok(34),
),
// Simple JSON with LF + two layers of proxy
(
[v1.clone(), v2.clone(), simple_json.clone()].concat(),
82,
&simple_json,
Ok(82),
),
// Simple JSON that goes over the buffer boundary
(
larger_json.clone(),
2, // capacity
&larger_json,
Ok(27),
),
// Simple JSON with LF + two layers of proxy
(
[v1.clone(), v2.clone(), simple_json.clone()].concat(),
45, // 3 bytes before v1 header ends
&simple_json,
Ok(82),
),
// Simple JSON with LF + two layers of proxy
(
[v1.clone(), v2.clone(), simple_json.clone()].concat(),
46, // 2 bytes before v1 header ends
&simple_json,
Ok(82),
),
// Capacity exactly on the last byte of v1 == parser error (library)
(
[v1.clone(), v2.clone(), simple_json.clone()].concat(),
47, // 1 bytes before v1 header ends (just befor \n)
&simple_json,
Ok(82),
),
// TODO: When the BufReader boundary hits in a v2 PROXY header it crashes
// (
// [v1.clone(), v2.clone(), simple_json.clone()].concat(),
// 49, // 1 byte into v2 header
// &simple_json,
// Ok(82),
// ),
(
[v1.clone(), v2.clone(), simple_json.clone()].concat(),
77, // 2 bytes short of v2 ending
&simple_json,
Ok(82),
),
(
[v1.clone(), v2.clone(), larger_json.clone()].concat(),
80, // 1 after v2 ends
&larger_json,
Ok(106),
),
];
for (input, capacity, bytes, count) in vectors {
let mut buf = BufReader::with_capacity(capacity, input.as_slice());
let mut v = vec![];
let result_count = read_until(&mut buf, b'\n', &mut v).map_err(|e| format!("{e}"));
assert_eq!(count, result_count);
assert_eq!(bytes, &v);
}
}
}

View File

@ -74,7 +74,7 @@ fn bitcoind_fetcher(
let chan = SyncChannel::new(1);
let sender = chan.sender();
Ok(Fetcher::from(
chan.into_receiver(),
chan.into_receiver().expect("not closed"),
spawn_thread("bitcoind_fetcher", move || {
for entries in new_headers.chunks(100) {
let blockhashes: Vec<BlockHash> = entries.iter().map(|e| *e.hash()).collect();
@ -115,7 +115,7 @@ fn blkfiles_fetcher(
let parser = blkfiles_parser(blkfiles_reader(blk_files), magic);
Ok(Fetcher::from(
chan.into_receiver(),
chan.into_receiver().expect("not closed"),
spawn_thread("blkfiles_fetcher", move || {
parser.map(|sizedblocks| {
let block_entries: Vec<BlockEntry> = sizedblocks
@ -151,7 +151,7 @@ fn blkfiles_reader(blk_files: Vec<PathBuf>) -> Fetcher<Vec<u8>> {
let sender = chan.sender();
Fetcher::from(
chan.into_receiver(),
chan.into_receiver().expect("not closed"),
spawn_thread("blkfiles_reader", move || {
for path in blk_files {
trace!("reading {:?}", path);
@ -170,7 +170,7 @@ fn blkfiles_parser(blobs: Fetcher<Vec<u8>>, magic: u32) -> Fetcher<Vec<SizedBloc
let sender = chan.sender();
Fetcher::from(
chan.into_receiver(),
chan.into_receiver().expect("not closed"),
spawn_thread("blkfiles_parser", move || {
blobs.map(|blob| {
trace!("parsing {} bytes", blob.len());

View File

@ -4,9 +4,10 @@ use crate::errors;
use crate::metrics::Metrics;
use crate::new_index::{compute_script_hash, Query, SpendingInput, Utxo};
use crate::util::{
create_socket, electrum_merkle, extract_tx_prevouts, full_hash, get_innerscripts, get_tx_fee,
has_prevout, is_coinbase, transaction_sigop_count, BlockHeaderMeta, BlockId, FullHash,
ScriptToAddr, ScriptToAsm, TransactionStatus,
create_socket, electrum_merkle, extract_tx_prevouts, full_hash, get_innerscripts,
get_rest_addr, get_tx_fee, has_prevout, is_coinbase, set_rest_addr, transaction_sigop_count,
BlockHeaderMeta, BlockId, FullHash, ScriptToAddr, ScriptToAsm, TransactionStatus,
REST_CLIENT_ADDR,
};
#[cfg(not(feature = "liquid"))]
@ -17,11 +18,15 @@ use bitcoin::hashes::hex::{FromHex, ToHex};
use bitcoin::hashes::Error as HashError;
use hex::{self, FromHexError};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Response, Server, StatusCode};
use hyper::{header::HeaderValue, Body, HeaderMap, Method, Response, Server, StatusCode};
use itertools::Itertools;
use prometheus::{HistogramOpts, HistogramVec};
use tokio::sync::oneshot;
use hyperlocal::UnixServerExt;
use std::borrow::Cow;
use std::cell::Cell;
use std::net::IpAddr;
use std::{cmp, fs};
#[cfg(feature = "liquid")]
use {
@ -564,6 +569,28 @@ fn prepare_txs(
.collect()
}
// Get the first valid IP address in the X-Forwarded-For header
// Supports multiple headers.
fn get_client_ip(headers: &HeaderMap<HeaderValue>, rest_proxy_depth: usize) -> Option<IpAddr> {
if rest_proxy_depth == 0 {
return None;
}
headers
.get_all("X-Forwarded-For")
.iter()
.filter_map(|v| v.to_str().ok())
.join(",")
.split(',')
.nth(rest_proxy_depth - 1)
.and_then(|ip| ip.trim().parse::<IpAddr>().ok())
}
fn get_client_ip_str() -> Cow<'static, str> {
get_rest_addr()
.map(|a| Cow::Owned(a.to_string()))
.unwrap_or_else(|| Cow::Borrowed("Unknown IP"))
}
#[tokio::main]
async fn run_server(
config: Arc<Config>,
@ -588,16 +615,20 @@ async fn run_server(
let config = Arc::clone(&config);
let timer = metric.with_label_values(&["all_methods"]).start_timer();
async move {
REST_CLIENT_ADDR.scope(Cell::new(None), async move {
let method = req.method().clone();
let uri = req.uri().clone();
// Set the task local IP addr from X-Forwarded-For
set_rest_addr(get_client_ip(req.headers(), config.rest_proxy_depth));
let body = hyper::body::to_bytes(req.into_body()).await?;
let mut resp = tokio::task::block_in_place(|| {
handle_request(method, uri, body, &query, &config)
})
.unwrap_or_else(|err| {
warn!("{:?}", err);
warn!("[{}] {:?}", get_client_ip_str(), err);
Response::builder()
.status(err.0)
.header("Content-Type", "text/plain")
@ -611,7 +642,7 @@ async fn run_server(
}
timer.observe_duration();
Ok::<_, hyper::Error>(resp)
}
})
}))
}
};
@ -699,7 +730,7 @@ fn handle_request(
None => HashMap::new(),
};
info!("handle {:?} {:?}", method, uri);
info!("[{}] handle {:?} {:?}", get_client_ip_str(), method, uri);
match (
&method,
path.first(),

View File

@ -14,6 +14,7 @@ pub use self::transaction::{
sigops::transaction_sigop_count, TransactionStatus, TxInput,
};
use std::cell::Cell;
use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, Sender};
@ -23,7 +24,7 @@ use std::thread::{self, ThreadId};
use crate::chain::BlockHeader;
use bitcoin::hashes::sha256d::Hash as Sha256dHash;
use socket2::{Domain, Protocol, Socket, Type};
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};
pub type Bytes = Vec<u8>;
pub type HeaderMap = HashMap<Sha256dHash, BlockHeader>;
@ -38,29 +39,26 @@ pub fn full_hash(hash: &[u8]) -> FullHash {
}
pub struct SyncChannel<T> {
tx: Option<crossbeam_channel::Sender<T>>,
tx: crossbeam_channel::Sender<T>,
rx: Option<crossbeam_channel::Receiver<T>>,
}
impl<T> SyncChannel<T> {
pub fn new(size: usize) -> SyncChannel<T> {
let (tx, rx) = crossbeam_channel::bounded(size);
SyncChannel {
tx: Some(tx),
rx: Some(rx),
}
SyncChannel { tx, rx: Some(rx) }
}
pub fn sender(&self) -> crossbeam_channel::Sender<T> {
self.tx.as_ref().expect("No Sender").clone()
self.tx.clone()
}
pub fn receiver(&self) -> &crossbeam_channel::Receiver<T> {
self.rx.as_ref().expect("No Receiver")
pub fn receiver(&self) -> Option<&crossbeam_channel::Receiver<T>> {
self.rx.as_ref()
}
pub fn into_receiver(self) -> crossbeam_channel::Receiver<T> {
self.rx.expect("No Receiver")
pub fn into_receiver(self) -> Option<crossbeam_channel::Receiver<T>> {
self.rx
}
/// This drops the sender and receiver, causing all other methods to panic.
@ -68,7 +66,6 @@ impl<T> SyncChannel<T> {
/// Use only when you know that the channel will no longer be used.
/// ie. shutdown.
pub fn close(&mut self) -> Option<crossbeam_channel::Receiver<T>> {
self.tx.take();
self.rx.take()
}
}
@ -225,3 +222,15 @@ pub mod serde_hex {
}
}
}
pub(crate) fn get_rest_addr() -> Option<IpAddr> {
REST_CLIENT_ADDR.with(|addr| addr.get())
}
pub(crate) fn set_rest_addr(input: Option<IpAddr>) {
REST_CLIENT_ADDR.with(|addr| {
addr.set(input);
});
}
tokio::task_local! {
pub(crate) static REST_CLIENT_ADDR: Cell<Option<IpAddr>>;
}

19
start
View File

@ -8,6 +8,23 @@ DB_FOLDER=/electrs
NODENAME=$(hostname|cut -d . -f1)
LOCATION=$(hostname|cut -d . -f2)
# since we know that our nginx will always be the first
# (closest) proxy from electrs, we set it to 1.
# If some servers prefer to disable PROXY protocol,
# set to 0. If the client themselves is proxying the
# electrum RPCs from other clients, this should be set to
# the hop that you want to trust to source IP addresses.
# If that hop doesn't exist, TcpStream's local_addr or
# "Unix socket" is used, and the PROXY headers are discarded.
# Image:
# [electrs] <> [proxy "1"] <> [proxy "2"] <> [electrum client]
RPC_PROXY_DEPTH=1
# This chooses the depth of X-Forwarded-For IP to show.
# A value of 1 is the first value from left to right.
# Image:
# "X-Forwarded-For: proxy1, proxy2, proxy3..."
REST_PROXY_DEPTH=1
# load rust if necessary
if [ -e "${HOME}/.cargo/env" ];then
source "${HOME}/.cargo/env"
@ -161,6 +178,8 @@ do
--address-search \
--utxos-limit "${UTXOS_LIMIT}" \
--electrum-txs-limit "${ELECTRUM_TXS_LIMIT}" \
--electrum-proxy-depth "${RPC_PROXY_DEPTH}" \
--rest-proxy-depth "${REST_PROXY_DEPTH}" \
-vv
sleep 1
done