Compare commits
6 Commits
mempool
...
junderw/lo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d12575047 | ||
|
|
fc86775c4b | ||
|
|
071a72331d | ||
|
|
9f6506594d | ||
|
|
db47f5ff4f | ||
|
|
53de9f8c71 |
38
Cargo.lock
generated
38
Cargo.lock
generated
@ -391,6 +391,12 @@ dependencies = [
|
||||
"winapi 0.3.9",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "doc-comment"
|
||||
version = "0.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
|
||||
|
||||
[[package]]
|
||||
name = "either"
|
||||
version = "1.6.1"
|
||||
@ -843,9 +849,11 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"log",
|
||||
"memchr",
|
||||
"num_cpus",
|
||||
"page_size",
|
||||
"prometheus",
|
||||
"proxy-protocol",
|
||||
"rayon",
|
||||
"rocksdb",
|
||||
"serde",
|
||||
@ -1098,6 +1106,15 @@ version = "2.27.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf7e6d18738ecd0902d30d1ad232c9125985a3422929b16c65517b38adc14f96"
|
||||
|
||||
[[package]]
|
||||
name = "proxy-protocol"
|
||||
version = "0.5.0"
|
||||
source = "git+https://github.com/junderw/proxy-protocol?rev=5f5431ecdae75c7e8aba0f7aebcfc2e0102b70dc#5f5431ecdae75c7e8aba0f7aebcfc2e0102b70dc"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"snafu",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.31"
|
||||
@ -1512,6 +1529,27 @@ version = "1.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
|
||||
|
||||
[[package]]
|
||||
name = "snafu"
|
||||
version = "0.6.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eab12d3c261b2308b0d80c26fffb58d17eba81a4be97890101f416b478c79ca7"
|
||||
dependencies = [
|
||||
"doc-comment",
|
||||
"snafu-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "snafu-derive"
|
||||
version = "0.6.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1508efa03c362e23817f96cde18abed596a25219a8b2c66e8db33c03543d315b"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 1.0.91",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
version = "0.4.4"
|
||||
|
||||
@ -43,8 +43,10 @@ libc = "0.2.81"
|
||||
log = "0.4.11"
|
||||
socket2 = { version = "0.4", features = ["all"] }
|
||||
num_cpus = "1.12.0"
|
||||
memchr = "2.4.1"
|
||||
page_size = "0.4.2"
|
||||
prometheus = "0.13"
|
||||
proxy-protocol = { version = "0.5", features = ["always_exhaustive"] }
|
||||
rayon = "1.5.0"
|
||||
rocksdb = "0.21.0"
|
||||
serde = "1.0.118"
|
||||
@ -71,9 +73,13 @@ tempfile = "3.0"
|
||||
|
||||
[profile.release]
|
||||
lto = true
|
||||
panic = 'abort'
|
||||
panic = 'unwind' # This is default, but required, so explicitly writing it
|
||||
codegen-units = 1
|
||||
|
||||
[patch.crates-io.electrum-client]
|
||||
git = "https://github.com/Blockstream/rust-electrum-client"
|
||||
rev = "d3792352992a539afffbe11501d1aff9fd5b919d" # add-peer branch
|
||||
|
||||
[patch.crates-io.proxy-protocol]
|
||||
git = "https://github.com/junderw/proxy-protocol"
|
||||
rev = "5f5431ecdae75c7e8aba0f7aebcfc2e0102b70dc" # fix/eof-panics branch
|
||||
|
||||
@ -118,6 +118,15 @@ fn run_server(config: Arc<Config>) -> Result<()> {
|
||||
);
|
||||
}
|
||||
|
||||
if std::env::var("ELECTRS_PERIODIC_THREAD_LOGGER").is_ok() {
|
||||
electrs::util::spawn_thread("periodic_thread_logger", || loop {
|
||||
electrs::util::with_spawned_threads(|threads| {
|
||||
debug!("THREADS: {:?}", threads);
|
||||
});
|
||||
std::thread::sleep(std::time::Duration::from_millis(5000));
|
||||
});
|
||||
}
|
||||
|
||||
loop {
|
||||
if let Err(err) = signal.wait(Duration::from_millis(config.main_loop_delay), true) {
|
||||
info!("stopping server: {}", err);
|
||||
|
||||
@ -40,6 +40,8 @@ pub struct Config {
|
||||
pub daemon_rpc_addr: SocketAddr,
|
||||
pub cookie: Option<String>,
|
||||
pub electrum_rpc_addr: SocketAddr,
|
||||
pub electrum_proxy_depth: usize,
|
||||
pub rest_proxy_depth: usize,
|
||||
pub http_addr: SocketAddr,
|
||||
pub http_socket_file: Option<PathBuf>,
|
||||
pub rpc_socket_file: Option<PathBuf>,
|
||||
@ -150,6 +152,23 @@ impl Config {
|
||||
.help("Electrum server JSONRPC 'addr:port' to listen on (default: '127.0.0.1:50001' for mainnet, '127.0.0.1:60001' for testnet and '127.0.0.1:60401' for regtest)")
|
||||
.takes_value(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("electrum_proxy_depth")
|
||||
.long("electrum-proxy-depth")
|
||||
.help("Electrum server's PROXY protocol header depth. \
|
||||
ie. a value of 2 means the 2nd closest hop's PROXY header \
|
||||
will be used to find the source IP. A value of 0 means all \
|
||||
IPs are ignored. (default: 0)")
|
||||
.takes_value(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("rest_proxy_depth")
|
||||
.long("rest-proxy-depth")
|
||||
.help("REST server's X-Forwarded-For IP address depth. \
|
||||
ie. a value of 2 means the 2nd IP address in the header(s) is used. \
|
||||
(default: 0)")
|
||||
.takes_value(true),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("http_addr")
|
||||
.long("http-addr")
|
||||
@ -440,6 +459,16 @@ impl Config {
|
||||
.unwrap_or(&format!("127.0.0.1:{}", default_electrum_port)),
|
||||
"Electrum RPC",
|
||||
);
|
||||
let electrum_proxy_depth = m
|
||||
.value_of("electrum_proxy_depth")
|
||||
.unwrap_or("0")
|
||||
.parse::<usize>()
|
||||
.expect("invalid electrum_proxy_depth");
|
||||
let rest_proxy_depth = m
|
||||
.value_of("rest_proxy_depth")
|
||||
.unwrap_or("0")
|
||||
.parse::<usize>()
|
||||
.expect("invalid rest_proxy_depth");
|
||||
let http_addr: SocketAddr = str_to_socketaddr(
|
||||
m.value_of("http_addr")
|
||||
.unwrap_or(&format!("127.0.0.1:{}", default_http_port)),
|
||||
@ -515,6 +544,8 @@ impl Config {
|
||||
cookie,
|
||||
utxos_limit: value_t_or_exit!(m, "utxos_limit", usize),
|
||||
electrum_rpc_addr,
|
||||
electrum_proxy_depth,
|
||||
rest_proxy_depth,
|
||||
electrum_txs_limit: value_t_or_exit!(m, "electrum_txs_limit", usize),
|
||||
electrum_banner,
|
||||
http_addr,
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
use std::cell::{Cell, RefCell};
|
||||
use std::collections::HashMap;
|
||||
use std::convert::TryInto;
|
||||
use std::fs;
|
||||
@ -9,13 +10,14 @@ use std::os::unix::fs::FileTypeExt;
|
||||
use std::os::unix::net::{UnixListener, UnixStream};
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::mpsc::{Receiver, Sender};
|
||||
use std::sync::mpsc::Sender;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
|
||||
use bitcoin::hashes::sha256d::Hash as Sha256dHash;
|
||||
use error_chain::ChainedError;
|
||||
use hex;
|
||||
use proxy_protocol::{version1, version2, ProxyHeader};
|
||||
use serde_json::{from_str, Value};
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
@ -100,39 +102,78 @@ fn get_status_hash(txs: Vec<(Txid, Option<BlockId>)>, query: &Query) -> Option<F
|
||||
}
|
||||
|
||||
struct Connection {
|
||||
proxy_proto_addr: Cell<Option<Option<SocketAddr>>>,
|
||||
electrum_proxy_depth: usize,
|
||||
// Chain info related
|
||||
query: Arc<Query>,
|
||||
last_header_entry: Option<HeaderEntry>,
|
||||
status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
|
||||
stream: ConnectionStream,
|
||||
chan: SyncChannel<Message>,
|
||||
stats: Arc<Stats>,
|
||||
txs_limit: usize,
|
||||
die_please: Option<Receiver<()>>,
|
||||
// Stream related
|
||||
stream: ConnectionStream,
|
||||
_arc_stream: Arc<ConnectionStream>, // Needs to be kept alive until drop
|
||||
reader: RefCell<Option<BufReader<ConnectionStream>>>,
|
||||
// Channel related
|
||||
message_chan: SyncChannel<Message>,
|
||||
shutdown_replies: crossbeam_channel::Receiver<()>, // For reply select branch
|
||||
shutdown_send: crossbeam_channel::Sender<()>, // For Drop. Kills properly-die thread
|
||||
// Discovery related
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
discovery: Option<Arc<DiscoveryManager>>,
|
||||
}
|
||||
|
||||
impl Drop for Connection {
|
||||
fn drop(&mut self) {
|
||||
let _ = self.shutdown_send.send(());
|
||||
}
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
pub fn new(
|
||||
query: Arc<Query>,
|
||||
stream: ConnectionStream,
|
||||
stats: Arc<Stats>,
|
||||
txs_limit: usize,
|
||||
die_please: Receiver<()>,
|
||||
(txs_limit, electrum_proxy_depth): (usize, usize),
|
||||
shutdown: SyncChannel<()>,
|
||||
#[cfg(feature = "electrum-discovery")] discovery: Option<Arc<DiscoveryManager>>,
|
||||
) -> Connection {
|
||||
Connection {
|
||||
// Channels
|
||||
let (reply_killer, shutdown_replies) = crossbeam_channel::unbounded();
|
||||
let shutdown_send = shutdown.sender();
|
||||
|
||||
// Using this Arc to prevent any thread leaks from keeping the stream alive
|
||||
let _arc_stream = Arc::new(stream.try_clone().expect("failed to clone TcpStream"));
|
||||
let maybe_stream = Arc::downgrade(&_arc_stream);
|
||||
|
||||
spawn_thread("properly-die", move || {
|
||||
let _ = shutdown.receiver().map(|c| c.recv());
|
||||
let _ = maybe_stream.upgrade().map(|s| s.shutdown(Shutdown::Both));
|
||||
let _ = reply_killer.send(());
|
||||
});
|
||||
|
||||
let ret = Connection {
|
||||
proxy_proto_addr: Cell::new(None),
|
||||
electrum_proxy_depth,
|
||||
query,
|
||||
last_header_entry: None, // disable header subscription for now
|
||||
status_hashes: HashMap::new(),
|
||||
reader: RefCell::new(Some(BufReader::new(
|
||||
stream.try_clone().expect("failed to clone TcpStream"),
|
||||
))),
|
||||
stream,
|
||||
chan: SyncChannel::new(10),
|
||||
_arc_stream,
|
||||
message_chan: SyncChannel::new(10),
|
||||
stats,
|
||||
txs_limit,
|
||||
die_please: Some(die_please),
|
||||
shutdown_replies,
|
||||
shutdown_send,
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
discovery,
|
||||
}
|
||||
};
|
||||
// Wait for first request to find
|
||||
ret.get_source_addr();
|
||||
ret
|
||||
}
|
||||
|
||||
fn blockchain_headers_subscribe(&mut self) -> Result<Value> {
|
||||
@ -354,8 +395,12 @@ impl Connection {
|
||||
let tx = params.first().chain_err(|| "missing tx")?;
|
||||
let tx = tx.as_str().chain_err(|| "non-string tx")?.to_string();
|
||||
let txid = self.query.broadcast_raw(&tx)?;
|
||||
if let Err(e) = self.chan.sender().try_send(Message::PeriodicUpdate) {
|
||||
warn!("failed to issue PeriodicUpdate after broadcast: {}", e);
|
||||
if let Err(e) = self.message_chan.sender().try_send(Message::PeriodicUpdate) {
|
||||
warn!(
|
||||
"[{}] failed to issue PeriodicUpdate after broadcast: {}",
|
||||
self.get_source_addr_str(),
|
||||
e
|
||||
);
|
||||
}
|
||||
Ok(json!(txid))
|
||||
}
|
||||
@ -455,7 +500,8 @@ impl Connection {
|
||||
Ok(result) => json!({"jsonrpc": "2.0", "id": id, "result": result}),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"rpc #{} {} {:?} failed: {}",
|
||||
"[{}] rpc #{} {} {:?} failed: {}",
|
||||
self.get_source_addr_str(),
|
||||
id,
|
||||
method,
|
||||
params,
|
||||
@ -512,12 +558,45 @@ impl Connection {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_replies(&mut self, shutdown: crossbeam_channel::Receiver<()>) -> Result<()> {
|
||||
fn get_source_addr_str(&self) -> String {
|
||||
self.get_source_addr()
|
||||
.map(|s| s.to_string())
|
||||
.unwrap_or_else(|| self.stream.addr_string())
|
||||
}
|
||||
|
||||
/// This will only check the PROXY protocol once
|
||||
/// and store the result in the first Option.
|
||||
/// Some(None) means "we checked, but there was no address"
|
||||
/// The inner option is returned as a Copy.
|
||||
fn get_source_addr(&self) -> Option<SocketAddr> {
|
||||
// Option<SocketAddr> is Copy
|
||||
if let Some(v) = self.proxy_proto_addr.get() {
|
||||
v
|
||||
} else {
|
||||
let v = self
|
||||
.reader
|
||||
.borrow_mut()
|
||||
.as_mut()
|
||||
.and_then(|r| r.fill_buf().ok())
|
||||
.and_then(|mut available| {
|
||||
parse_proxy_headers(&mut available, self.electrum_proxy_depth).0
|
||||
})
|
||||
.map(|addr| {
|
||||
trace!("RPC Received PROXY Protocol address: {}", addr);
|
||||
addr
|
||||
});
|
||||
self.proxy_proto_addr.set(Some(v));
|
||||
v
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_replies(&mut self) -> Result<()> {
|
||||
let addr_str = self.get_source_addr_str();
|
||||
loop {
|
||||
crossbeam_channel::select! {
|
||||
recv(self.chan.receiver()) -> msg => {
|
||||
let msg = msg.chain_err(|| "channel closed")?;
|
||||
trace!("RPC {:?}", msg);
|
||||
recv(self.message_chan.receiver().chain_err(|| format!("[{addr_str}] channel closed"))?) -> msg => {
|
||||
let msg = msg.chain_err(|| format!("[{addr_str}] channel closed"))?;
|
||||
trace!("RPC [{addr_str}] {:?}", msg);
|
||||
match msg {
|
||||
Message::Request(line) => {
|
||||
let result = self.handle_line(&line);
|
||||
@ -526,17 +605,17 @@ impl Connection {
|
||||
Message::PeriodicUpdate => {
|
||||
let values = self
|
||||
.update_subscriptions()
|
||||
.chain_err(|| "failed to update subscriptions")?;
|
||||
.chain_err(|| format!("[{addr_str}] failed to update subscriptions"))?;
|
||||
self.send_values(&values)?
|
||||
}
|
||||
Message::Done => {
|
||||
self.chan.close();
|
||||
self.message_chan.close();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
recv(shutdown) -> _ => {
|
||||
self.chan.close();
|
||||
recv(self.shutdown_replies) -> _ => {
|
||||
self.message_chan.close();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
@ -587,29 +666,32 @@ impl Connection {
|
||||
|
||||
fn handle_requests(
|
||||
mut reader: BufReader<ConnectionStream>,
|
||||
tx: crossbeam_channel::Sender<Message>,
|
||||
tx: &crossbeam_channel::Sender<Message>,
|
||||
addr_str: &str,
|
||||
) -> Result<()> {
|
||||
loop {
|
||||
let mut line = Vec::<u8>::new();
|
||||
reader
|
||||
.read_until(b'\n', &mut line)
|
||||
.chain_err(|| "failed to read a request")?;
|
||||
if line.is_empty() {
|
||||
tx.send(Message::Done).chain_err(|| "channel closed")?;
|
||||
let mut recv_data = Vec::<u8>::new();
|
||||
match read_until(&mut reader, b'\n', &mut recv_data) {
|
||||
Ok(bytes) => trace!("[{addr_str}] Read {bytes} bytes from connection"),
|
||||
Err(e) => bail!("[{addr_str}] Failed to read: {}", e),
|
||||
}
|
||||
if recv_data.is_empty() {
|
||||
return Ok(());
|
||||
} else {
|
||||
if line.starts_with(&[22, 3, 1]) {
|
||||
if recv_data.starts_with(&[22, 3, 1]) {
|
||||
// (very) naive SSL handshake detection
|
||||
let _ = tx.send(Message::Done);
|
||||
bail!("invalid request - maybe SSL-encrypted data?: {:?}", line)
|
||||
bail!(
|
||||
"[{addr_str}] invalid request - maybe SSL-encrypted data?: {:?}",
|
||||
recv_data
|
||||
)
|
||||
}
|
||||
match String::from_utf8(line) {
|
||||
match serde_json::from_slice(&recv_data) {
|
||||
Ok(req) => tx
|
||||
.send(Message::Request(req))
|
||||
.chain_err(|| "channel closed")?,
|
||||
.chain_err(|| format!("[{}] channel closed", addr_str))?,
|
||||
Err(err) => {
|
||||
let _ = tx.send(Message::Done);
|
||||
bail!("invalid UTF8: {}", err)
|
||||
bail!("[{}] invalid UTF8: {}", addr_str, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -618,29 +700,36 @@ impl Connection {
|
||||
|
||||
pub fn run(mut self) {
|
||||
self.stats.clients.inc();
|
||||
let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
|
||||
let tx = self.chan.sender();
|
||||
let reader = self.reader.take().unwrap();
|
||||
let tx = self.message_chan.sender();
|
||||
|
||||
let die_please = self.die_please.take().unwrap();
|
||||
let (reply_killer, reply_receiver) = crossbeam_channel::unbounded();
|
||||
|
||||
// We create a clone of the stream and put it in an Arc
|
||||
// This will drop at the end of the function.
|
||||
let arc_stream = Arc::new(self.stream.try_clone().expect("failed to clone TcpStream"));
|
||||
// We don't want to keep the stream alive until SIGINT
|
||||
// It should drop (close) no matter what.
|
||||
let maybe_stream = Arc::downgrade(&arc_stream);
|
||||
spawn_thread("properly-die", move || {
|
||||
let _ = die_please.recv();
|
||||
let _ = maybe_stream.upgrade().map(|s| s.shutdown(Shutdown::Both));
|
||||
let _ = reply_killer.send(());
|
||||
let rpc_addr = self.get_source_addr();
|
||||
let addr_str = self.get_source_addr_str();
|
||||
let shutdown_send = self.shutdown_send.clone();
|
||||
let child = spawn_thread("reader", move || {
|
||||
let addr_str = rpc_addr
|
||||
.map(|a| a.to_string())
|
||||
.unwrap_or_else(|| reader.get_ref().addr_string());
|
||||
let result =
|
||||
std::panic::catch_unwind(|| Connection::handle_requests(reader, &tx, &addr_str))
|
||||
.unwrap_or_else(|e| {
|
||||
Err(format!(
|
||||
"[{}] RPC Panic in request handler: {}",
|
||||
addr_str,
|
||||
parse_panic_error(&e)
|
||||
)
|
||||
.into())
|
||||
});
|
||||
// This shuts down the other graceful shutdown thread,
|
||||
// which also shuts down the handle_replies loop
|
||||
// regardless of panic, Err, or Ok
|
||||
let _ = shutdown_send.send(());
|
||||
result
|
||||
});
|
||||
|
||||
let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
|
||||
if let Err(e) = self.handle_replies(reply_receiver) {
|
||||
if let Err(e) = self.handle_replies() {
|
||||
error!(
|
||||
"[{}] connection handling failed: {}",
|
||||
self.stream.addr_string(),
|
||||
addr_str,
|
||||
e.display_chain().to_string()
|
||||
);
|
||||
}
|
||||
@ -649,13 +738,11 @@ impl Connection {
|
||||
.subscriptions
|
||||
.sub(self.status_hashes.len() as i64);
|
||||
|
||||
let addr = self.stream.addr_string();
|
||||
debug!("[{}] shutting down connection", addr);
|
||||
// Drop the Arc so that the stream properly closes.
|
||||
drop(arc_stream);
|
||||
debug!("[{}] shutting down connection", addr_str);
|
||||
let _ = self.stream.shutdown(Shutdown::Both);
|
||||
if let Err(err) = child.join().expect("receiver panicked") {
|
||||
error!("[{}] receiver failed: {}", addr, err);
|
||||
self.message_chan.close();
|
||||
if let Err(err) = child.join().expect("receiver can't panic") {
|
||||
error!("[{}] receiver failed: {}", addr_str, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -813,6 +900,7 @@ impl RPC {
|
||||
});
|
||||
|
||||
let txs_limit = config.electrum_txs_limit;
|
||||
let electrum_proxy_depth = config.electrum_proxy_depth;
|
||||
|
||||
RPC {
|
||||
notification: notification.sender(),
|
||||
@ -834,7 +922,6 @@ impl RPC {
|
||||
let (garbage_sender, garbage_receiver) = crossbeam_channel::unbounded();
|
||||
|
||||
while let Some(stream) = acceptor.receiver().recv().unwrap() {
|
||||
let addr = stream.addr_string();
|
||||
// explicitely scope the shadowed variables for the new thread
|
||||
let query = Arc::clone(&query);
|
||||
let senders = Arc::clone(&senders);
|
||||
@ -842,36 +929,38 @@ impl RPC {
|
||||
let garbage_sender = garbage_sender.clone();
|
||||
|
||||
// Kill the peers properly
|
||||
let (killer, peace_receiver) = std::sync::mpsc::channel();
|
||||
let killer_clone = killer.clone();
|
||||
let shutdown_channel = SyncChannel::new(1);
|
||||
let shutdown_sender = shutdown_channel.sender();
|
||||
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
let discovery = discovery.clone();
|
||||
|
||||
let spawned = spawn_thread("peer", move || {
|
||||
let addr = stream.addr_string();
|
||||
info!("[{}] connected peer", addr);
|
||||
let shutdown_sender = shutdown_channel.sender();
|
||||
info!("connected peer. waiting for first request...");
|
||||
let conn = Connection::new(
|
||||
query,
|
||||
stream,
|
||||
stats,
|
||||
txs_limit,
|
||||
peace_receiver,
|
||||
(txs_limit, electrum_proxy_depth),
|
||||
shutdown_channel,
|
||||
#[cfg(feature = "electrum-discovery")]
|
||||
discovery,
|
||||
);
|
||||
senders.lock().unwrap().push(conn.chan.sender());
|
||||
let addr = conn.get_source_addr_str();
|
||||
info!("[{}] connected peer", addr);
|
||||
senders.lock().unwrap().push(conn.message_chan.sender());
|
||||
conn.run();
|
||||
info!("[{}] disconnected peer", addr);
|
||||
let _ = killer_clone.send(());
|
||||
let _ = shutdown_sender.send(());
|
||||
let _ = garbage_sender.send(std::thread::current().id());
|
||||
});
|
||||
|
||||
trace!("[{}] spawned {:?}", addr, spawned.thread().id());
|
||||
threads.insert(spawned.thread().id(), (spawned, killer));
|
||||
trace!("spawned {:?}", spawned.thread().id());
|
||||
threads.insert(spawned.thread().id(), (spawned, shutdown_sender));
|
||||
while let Ok(id) = garbage_receiver.try_recv() {
|
||||
if let Some((thread, killer)) = threads.remove(&id) {
|
||||
trace!("[{}] joining {:?}", addr, id);
|
||||
trace!("joining {:?}", id);
|
||||
let _ = killer.send(());
|
||||
if let Err(error) = thread.join() {
|
||||
error!("failed to join {:?}: {:?}", id, error);
|
||||
@ -1035,7 +1124,7 @@ impl ConnectionStream {
|
||||
fn addr_string(&self) -> String {
|
||||
match self {
|
||||
ConnectionStream::Tcp(_, a) => format!("{a}"),
|
||||
ConnectionStream::Unix(_, a, _) => format!("{a:?}"),
|
||||
ConnectionStream::Unix(_, _, _) => "(Unix socket)".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -1093,3 +1182,281 @@ impl Read for ConnectionStream {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This is a slightly modified version of read_until from standard library BufRead trait.
|
||||
/// After every read we check if there's a PROXY protocol header at the beginning of the read.
|
||||
fn read_until(r: &mut impl BufRead, delim: u8, buf: &mut Vec<u8>) -> std::io::Result<usize> {
|
||||
let mut read = 0;
|
||||
let mut carry_over_arr = [0_u8; 256];
|
||||
let mut carrying_over = 0;
|
||||
loop {
|
||||
let (done, used) = {
|
||||
let mut available = match r.fill_buf() {
|
||||
Ok(n) => n,
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
// If carry over, try to parse PROXY headers.
|
||||
let (carry_skipped_count, exit_error) = if carrying_over > 0 {
|
||||
process_carry_over(&mut carrying_over, &mut carry_over_arr, &mut available)
|
||||
} else {
|
||||
(0, false)
|
||||
};
|
||||
// Rare edge case. If we carry over a proxy parse and it still errors
|
||||
// It is most likely an unknown format
|
||||
if exit_error {
|
||||
return Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof));
|
||||
}
|
||||
|
||||
// Try parsing PROXY headers after every read.
|
||||
let (_, skipped_count, exit_error) = parse_proxy_headers(&mut available, 0);
|
||||
let skipped_count = carry_skipped_count + skipped_count;
|
||||
|
||||
match (memchr::memchr(delim, available), exit_error) {
|
||||
(Some(i), false) => {
|
||||
buf.extend_from_slice(&available[..=i]);
|
||||
(true, i + 1 + skipped_count)
|
||||
}
|
||||
(None, _) | (_, true) => {
|
||||
// Added: carry over
|
||||
insert_carry_over(&mut carrying_over, &mut carry_over_arr, available);
|
||||
|
||||
if !exit_error {
|
||||
buf.extend_from_slice(available);
|
||||
}
|
||||
|
||||
(false, available.len() + skipped_count)
|
||||
}
|
||||
}
|
||||
};
|
||||
r.consume(used);
|
||||
read += used;
|
||||
if done || used == 0 {
|
||||
return Ok(read);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn proxy_header_to_source_socket_addr(p_header: ProxyHeader) -> Option<SocketAddr> {
|
||||
match p_header {
|
||||
ProxyHeader::Version1 {
|
||||
addresses: version1::ProxyAddresses::Ipv4 { source, .. },
|
||||
} => Some(SocketAddr::V4(source)),
|
||||
ProxyHeader::Version1 {
|
||||
addresses: version1::ProxyAddresses::Ipv6 { source, .. },
|
||||
} => Some(SocketAddr::V6(source)),
|
||||
ProxyHeader::Version2 {
|
||||
addresses: version2::ProxyAddresses::Ipv4 { source, .. },
|
||||
..
|
||||
} => Some(SocketAddr::V4(source)),
|
||||
ProxyHeader::Version2 {
|
||||
addresses: version2::ProxyAddresses::Ipv6 { source, .. },
|
||||
..
|
||||
} => Some(SocketAddr::V6(source)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_proxy_headers(
|
||||
buf: &mut &[u8],
|
||||
electrum_proxy_depth: usize,
|
||||
) -> (Option<SocketAddr>, usize, bool) {
|
||||
trace!("Starting parse PROXY headers: {:?}", buf.get(..12));
|
||||
let mut addr = None;
|
||||
let mut current_header_index = 0;
|
||||
let before_len = buf.len();
|
||||
// Save the original state of the buf
|
||||
let mut original_buf = *buf;
|
||||
let mut error_exit = false;
|
||||
// The last header is the outer-most proxy
|
||||
// Warning do not early return. ONLY break the loop.
|
||||
loop {
|
||||
let p_header = match proxy_protocol::parse(buf) {
|
||||
Ok(h) => h,
|
||||
// NotProxyHeader definitely does not move the buf pointer.
|
||||
Err(proxy_protocol::ParseError::NotProxyHeader) => break,
|
||||
// This can move the buf cursor forward
|
||||
// and will most likely end in an error higher in the call stack
|
||||
// This means "PROXY protocol was used, but it was in an unknown format"
|
||||
// (Maybe someday if nginx/etc. uses a new version of the protocol
|
||||
// and we don't update the dependency to a version that handles the new
|
||||
// version, it might break.)
|
||||
// OR it could mean a PROXY header fell on the BufReader buffer boundary.
|
||||
Err(_) => {
|
||||
// This is the buf state before this error returned.
|
||||
// This match arm will modify buf past the version bytes
|
||||
// and stop in a state pointing to the middle of a broken header.
|
||||
// We return the buf state to the beginning of the previous version bytes.
|
||||
*buf = original_buf;
|
||||
error_exit = true;
|
||||
break;
|
||||
}
|
||||
};
|
||||
// After each successful header parse, save the new buf state.
|
||||
original_buf = *buf;
|
||||
trace!("Parsed PROXY protocol header: {:?}", p_header);
|
||||
// Increment from 0 to 1 before the first check
|
||||
current_header_index += 1;
|
||||
// 0 should always continue
|
||||
// 1 should only get the 1st header's IP address etc.
|
||||
if current_header_index != electrum_proxy_depth {
|
||||
continue;
|
||||
}
|
||||
// The address is only attempted to be
|
||||
// parsed when the 1 based index is equal
|
||||
addr = proxy_header_to_source_socket_addr(p_header);
|
||||
}
|
||||
(addr, before_len - buf.len(), error_exit)
|
||||
}
|
||||
|
||||
fn parse_panic_error(e: &(dyn std::any::Any + Send)) -> &str {
|
||||
if let Some(s) = e.downcast_ref::<&str>() {
|
||||
s
|
||||
} else if let Some(s) = e.downcast_ref::<String>() {
|
||||
s
|
||||
} else {
|
||||
"Unknown panic"
|
||||
}
|
||||
}
|
||||
|
||||
/// The goal of this function is to take the carried over bytes from the last loop
|
||||
/// and connect them with the first bytes of the next read, then check if it's a header.
|
||||
/// This is to prevent headers from straddling the BufReader's buffer end.
|
||||
/// A simple static array should be quick and easy.
|
||||
fn process_carry_over(
|
||||
carrying_over: &mut usize,
|
||||
carry_over: &mut [u8],
|
||||
available: &mut &[u8],
|
||||
) -> (usize, bool) {
|
||||
// Step 0: Copy as much from available into carry_over to try and parse
|
||||
// How much space do we have left in the array?
|
||||
let empty_space = carry_over.len() - *carrying_over;
|
||||
// How many bytes should we copy over?
|
||||
let copy_bytes = available.len().min(empty_space);
|
||||
// Copy over the bytes to join with the carried over bytes
|
||||
carry_over[*carrying_over..*carrying_over + copy_bytes]
|
||||
.copy_from_slice(&available[..copy_bytes]);
|
||||
|
||||
// Step 1: Figure out if it was a proxy header or not.
|
||||
#[allow(clippy::redundant_slicing)]
|
||||
let mut cursor = &carry_over[..];
|
||||
let (_, skipped_count, exit_error) = parse_proxy_headers(&mut cursor, 0);
|
||||
|
||||
// Step 2: Figure out how much we need to skip available forward
|
||||
let skip_count = skipped_count.saturating_sub(*carrying_over);
|
||||
|
||||
// Step 3: Move the available cursor
|
||||
*available = &available[skip_count..];
|
||||
// Step 4: Reset carrying over (writing 0s to the array is unnecessary)
|
||||
*carrying_over = 0;
|
||||
|
||||
// Return the skip count so we can call consume later
|
||||
(skip_count, exit_error)
|
||||
}
|
||||
|
||||
/// Insert the carry over
|
||||
fn insert_carry_over(carrying_over: &mut usize, carry_over_arr: &mut [u8], available: &[u8]) {
|
||||
*carrying_over = available.len().min(carry_over_arr.len());
|
||||
carry_over_arr[..*carrying_over].copy_from_slice(&available[..*carrying_over]);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_read_until() {
|
||||
// len 48
|
||||
let v1 = "PROXY TCP6 ab:ce:ef:01:23:45:67:89 ::1 0 65535\r\n"
|
||||
.as_bytes()
|
||||
.to_vec();
|
||||
// len 31
|
||||
let v2 =
|
||||
hex::decode("0d0a0d0a000d0a515549540a2111000f7f000001c0a80001ffff0101450000").unwrap();
|
||||
let simple_json = "{}\n".as_bytes().to_vec();
|
||||
// len 26
|
||||
let larger_json = r#"{"id":2,"name":"electrs"}\n"#.as_bytes().to_vec();
|
||||
|
||||
let vectors: Vec<(Vec<u8>, usize, &[u8], std::result::Result<usize, String>)> = vec![
|
||||
// Simple JSON with LF
|
||||
(simple_json.clone(), 3, &simple_json, Ok(3)),
|
||||
// Simple JSON with LF + PROXY v1
|
||||
(
|
||||
[v1.clone(), simple_json.clone()].concat(),
|
||||
51,
|
||||
&simple_json,
|
||||
Ok(51),
|
||||
),
|
||||
// Simple JSON with LF + PROXY v2
|
||||
(
|
||||
[v2.clone(), simple_json.clone()].concat(),
|
||||
34,
|
||||
&simple_json,
|
||||
Ok(34),
|
||||
),
|
||||
// Simple JSON with LF + two layers of proxy
|
||||
(
|
||||
[v1.clone(), v2.clone(), simple_json.clone()].concat(),
|
||||
82,
|
||||
&simple_json,
|
||||
Ok(82),
|
||||
),
|
||||
// Simple JSON that goes over the buffer boundary
|
||||
(
|
||||
larger_json.clone(),
|
||||
2, // capacity
|
||||
&larger_json,
|
||||
Ok(27),
|
||||
),
|
||||
// Simple JSON with LF + two layers of proxy
|
||||
(
|
||||
[v1.clone(), v2.clone(), simple_json.clone()].concat(),
|
||||
45, // 3 bytes before v1 header ends
|
||||
&simple_json,
|
||||
Ok(82),
|
||||
),
|
||||
// Simple JSON with LF + two layers of proxy
|
||||
(
|
||||
[v1.clone(), v2.clone(), simple_json.clone()].concat(),
|
||||
46, // 2 bytes before v1 header ends
|
||||
&simple_json,
|
||||
Ok(82),
|
||||
),
|
||||
// Capacity exactly on the last byte of v1 == parser error (library)
|
||||
(
|
||||
[v1.clone(), v2.clone(), simple_json.clone()].concat(),
|
||||
47, // 1 bytes before v1 header ends (just befor \n)
|
||||
&simple_json,
|
||||
Ok(82),
|
||||
),
|
||||
// TODO: When the BufReader boundary hits in a v2 PROXY header it crashes
|
||||
// (
|
||||
// [v1.clone(), v2.clone(), simple_json.clone()].concat(),
|
||||
// 49, // 1 byte into v2 header
|
||||
// &simple_json,
|
||||
// Ok(82),
|
||||
// ),
|
||||
(
|
||||
[v1.clone(), v2.clone(), simple_json.clone()].concat(),
|
||||
77, // 2 bytes short of v2 ending
|
||||
&simple_json,
|
||||
Ok(82),
|
||||
),
|
||||
(
|
||||
[v1.clone(), v2.clone(), larger_json.clone()].concat(),
|
||||
80, // 1 after v2 ends
|
||||
&larger_json,
|
||||
Ok(106),
|
||||
),
|
||||
];
|
||||
|
||||
for (input, capacity, bytes, count) in vectors {
|
||||
let mut buf = BufReader::with_capacity(capacity, input.as_slice());
|
||||
let mut v = vec![];
|
||||
let result_count = read_until(&mut buf, b'\n', &mut v).map_err(|e| format!("{e}"));
|
||||
assert_eq!(count, result_count);
|
||||
assert_eq!(bytes, &v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -74,7 +74,7 @@ fn bitcoind_fetcher(
|
||||
let chan = SyncChannel::new(1);
|
||||
let sender = chan.sender();
|
||||
Ok(Fetcher::from(
|
||||
chan.into_receiver(),
|
||||
chan.into_receiver().expect("not closed"),
|
||||
spawn_thread("bitcoind_fetcher", move || {
|
||||
for entries in new_headers.chunks(100) {
|
||||
let blockhashes: Vec<BlockHash> = entries.iter().map(|e| *e.hash()).collect();
|
||||
@ -115,7 +115,7 @@ fn blkfiles_fetcher(
|
||||
|
||||
let parser = blkfiles_parser(blkfiles_reader(blk_files), magic);
|
||||
Ok(Fetcher::from(
|
||||
chan.into_receiver(),
|
||||
chan.into_receiver().expect("not closed"),
|
||||
spawn_thread("blkfiles_fetcher", move || {
|
||||
parser.map(|sizedblocks| {
|
||||
let block_entries: Vec<BlockEntry> = sizedblocks
|
||||
@ -151,7 +151,7 @@ fn blkfiles_reader(blk_files: Vec<PathBuf>) -> Fetcher<Vec<u8>> {
|
||||
let sender = chan.sender();
|
||||
|
||||
Fetcher::from(
|
||||
chan.into_receiver(),
|
||||
chan.into_receiver().expect("not closed"),
|
||||
spawn_thread("blkfiles_reader", move || {
|
||||
for path in blk_files {
|
||||
trace!("reading {:?}", path);
|
||||
@ -170,7 +170,7 @@ fn blkfiles_parser(blobs: Fetcher<Vec<u8>>, magic: u32) -> Fetcher<Vec<SizedBloc
|
||||
let sender = chan.sender();
|
||||
|
||||
Fetcher::from(
|
||||
chan.into_receiver(),
|
||||
chan.into_receiver().expect("not closed"),
|
||||
spawn_thread("blkfiles_parser", move || {
|
||||
blobs.map(|blob| {
|
||||
trace!("parsing {} bytes", blob.len());
|
||||
|
||||
47
src/rest.rs
47
src/rest.rs
@ -4,9 +4,10 @@ use crate::errors;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::new_index::{compute_script_hash, Query, SpendingInput, Utxo};
|
||||
use crate::util::{
|
||||
create_socket, electrum_merkle, extract_tx_prevouts, full_hash, get_innerscripts, get_tx_fee,
|
||||
has_prevout, is_coinbase, transaction_sigop_count, BlockHeaderMeta, BlockId, FullHash,
|
||||
ScriptToAddr, ScriptToAsm, TransactionStatus,
|
||||
create_socket, electrum_merkle, extract_tx_prevouts, full_hash, get_innerscripts,
|
||||
get_rest_addr, get_tx_fee, has_prevout, is_coinbase, set_rest_addr, transaction_sigop_count,
|
||||
BlockHeaderMeta, BlockId, FullHash, ScriptToAddr, ScriptToAsm, TransactionStatus,
|
||||
REST_CLIENT_ADDR,
|
||||
};
|
||||
|
||||
#[cfg(not(feature = "liquid"))]
|
||||
@ -17,11 +18,15 @@ use bitcoin::hashes::hex::{FromHex, ToHex};
|
||||
use bitcoin::hashes::Error as HashError;
|
||||
use hex::{self, FromHexError};
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Method, Response, Server, StatusCode};
|
||||
use hyper::{header::HeaderValue, Body, HeaderMap, Method, Response, Server, StatusCode};
|
||||
use itertools::Itertools;
|
||||
use prometheus::{HistogramOpts, HistogramVec};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use hyperlocal::UnixServerExt;
|
||||
use std::borrow::Cow;
|
||||
use std::cell::Cell;
|
||||
use std::net::IpAddr;
|
||||
use std::{cmp, fs};
|
||||
#[cfg(feature = "liquid")]
|
||||
use {
|
||||
@ -564,6 +569,28 @@ fn prepare_txs(
|
||||
.collect()
|
||||
}
|
||||
|
||||
// Get the first valid IP address in the X-Forwarded-For header
|
||||
// Supports multiple headers.
|
||||
fn get_client_ip(headers: &HeaderMap<HeaderValue>, rest_proxy_depth: usize) -> Option<IpAddr> {
|
||||
if rest_proxy_depth == 0 {
|
||||
return None;
|
||||
}
|
||||
headers
|
||||
.get_all("X-Forwarded-For")
|
||||
.iter()
|
||||
.filter_map(|v| v.to_str().ok())
|
||||
.join(",")
|
||||
.split(',')
|
||||
.nth(rest_proxy_depth - 1)
|
||||
.and_then(|ip| ip.trim().parse::<IpAddr>().ok())
|
||||
}
|
||||
|
||||
fn get_client_ip_str() -> Cow<'static, str> {
|
||||
get_rest_addr()
|
||||
.map(|a| Cow::Owned(a.to_string()))
|
||||
.unwrap_or_else(|| Cow::Borrowed("Unknown IP"))
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn run_server(
|
||||
config: Arc<Config>,
|
||||
@ -588,16 +615,20 @@ async fn run_server(
|
||||
let config = Arc::clone(&config);
|
||||
let timer = metric.with_label_values(&["all_methods"]).start_timer();
|
||||
|
||||
async move {
|
||||
REST_CLIENT_ADDR.scope(Cell::new(None), async move {
|
||||
let method = req.method().clone();
|
||||
let uri = req.uri().clone();
|
||||
|
||||
// Set the task local IP addr from X-Forwarded-For
|
||||
set_rest_addr(get_client_ip(req.headers(), config.rest_proxy_depth));
|
||||
|
||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||
|
||||
let mut resp = tokio::task::block_in_place(|| {
|
||||
handle_request(method, uri, body, &query, &config)
|
||||
})
|
||||
.unwrap_or_else(|err| {
|
||||
warn!("{:?}", err);
|
||||
warn!("[{}] {:?}", get_client_ip_str(), err);
|
||||
Response::builder()
|
||||
.status(err.0)
|
||||
.header("Content-Type", "text/plain")
|
||||
@ -611,7 +642,7 @@ async fn run_server(
|
||||
}
|
||||
timer.observe_duration();
|
||||
Ok::<_, hyper::Error>(resp)
|
||||
}
|
||||
})
|
||||
}))
|
||||
}
|
||||
};
|
||||
@ -699,7 +730,7 @@ fn handle_request(
|
||||
None => HashMap::new(),
|
||||
};
|
||||
|
||||
info!("handle {:?} {:?}", method, uri);
|
||||
info!("[{}] handle {:?} {:?}", get_client_ip_str(), method, uri);
|
||||
match (
|
||||
&method,
|
||||
path.first(),
|
||||
|
||||
@ -14,6 +14,7 @@ pub use self::transaction::{
|
||||
sigops::transaction_sigop_count, TransactionStatus, TxInput,
|
||||
};
|
||||
|
||||
use std::cell::Cell;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||
@ -23,7 +24,7 @@ use std::thread::{self, ThreadId};
|
||||
use crate::chain::BlockHeader;
|
||||
use bitcoin::hashes::sha256d::Hash as Sha256dHash;
|
||||
use socket2::{Domain, Protocol, Socket, Type};
|
||||
use std::net::SocketAddr;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
|
||||
pub type Bytes = Vec<u8>;
|
||||
pub type HeaderMap = HashMap<Sha256dHash, BlockHeader>;
|
||||
@ -38,29 +39,26 @@ pub fn full_hash(hash: &[u8]) -> FullHash {
|
||||
}
|
||||
|
||||
pub struct SyncChannel<T> {
|
||||
tx: Option<crossbeam_channel::Sender<T>>,
|
||||
tx: crossbeam_channel::Sender<T>,
|
||||
rx: Option<crossbeam_channel::Receiver<T>>,
|
||||
}
|
||||
|
||||
impl<T> SyncChannel<T> {
|
||||
pub fn new(size: usize) -> SyncChannel<T> {
|
||||
let (tx, rx) = crossbeam_channel::bounded(size);
|
||||
SyncChannel {
|
||||
tx: Some(tx),
|
||||
rx: Some(rx),
|
||||
}
|
||||
SyncChannel { tx, rx: Some(rx) }
|
||||
}
|
||||
|
||||
pub fn sender(&self) -> crossbeam_channel::Sender<T> {
|
||||
self.tx.as_ref().expect("No Sender").clone()
|
||||
self.tx.clone()
|
||||
}
|
||||
|
||||
pub fn receiver(&self) -> &crossbeam_channel::Receiver<T> {
|
||||
self.rx.as_ref().expect("No Receiver")
|
||||
pub fn receiver(&self) -> Option<&crossbeam_channel::Receiver<T>> {
|
||||
self.rx.as_ref()
|
||||
}
|
||||
|
||||
pub fn into_receiver(self) -> crossbeam_channel::Receiver<T> {
|
||||
self.rx.expect("No Receiver")
|
||||
pub fn into_receiver(self) -> Option<crossbeam_channel::Receiver<T>> {
|
||||
self.rx
|
||||
}
|
||||
|
||||
/// This drops the sender and receiver, causing all other methods to panic.
|
||||
@ -68,7 +66,6 @@ impl<T> SyncChannel<T> {
|
||||
/// Use only when you know that the channel will no longer be used.
|
||||
/// ie. shutdown.
|
||||
pub fn close(&mut self) -> Option<crossbeam_channel::Receiver<T>> {
|
||||
self.tx.take();
|
||||
self.rx.take()
|
||||
}
|
||||
}
|
||||
@ -225,3 +222,15 @@ pub mod serde_hex {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_rest_addr() -> Option<IpAddr> {
|
||||
REST_CLIENT_ADDR.with(|addr| addr.get())
|
||||
}
|
||||
pub(crate) fn set_rest_addr(input: Option<IpAddr>) {
|
||||
REST_CLIENT_ADDR.with(|addr| {
|
||||
addr.set(input);
|
||||
});
|
||||
}
|
||||
tokio::task_local! {
|
||||
pub(crate) static REST_CLIENT_ADDR: Cell<Option<IpAddr>>;
|
||||
}
|
||||
|
||||
19
start
19
start
@ -8,6 +8,23 @@ DB_FOLDER=/electrs
|
||||
NODENAME=$(hostname|cut -d . -f1)
|
||||
LOCATION=$(hostname|cut -d . -f2)
|
||||
|
||||
# since we know that our nginx will always be the first
|
||||
# (closest) proxy from electrs, we set it to 1.
|
||||
# If some servers prefer to disable PROXY protocol,
|
||||
# set to 0. If the client themselves is proxying the
|
||||
# electrum RPCs from other clients, this should be set to
|
||||
# the hop that you want to trust to source IP addresses.
|
||||
# If that hop doesn't exist, TcpStream's local_addr or
|
||||
# "Unix socket" is used, and the PROXY headers are discarded.
|
||||
# Image:
|
||||
# [electrs] <> [proxy "1"] <> [proxy "2"] <> [electrum client]
|
||||
RPC_PROXY_DEPTH=1
|
||||
# This chooses the depth of X-Forwarded-For IP to show.
|
||||
# A value of 1 is the first value from left to right.
|
||||
# Image:
|
||||
# "X-Forwarded-For: proxy1, proxy2, proxy3..."
|
||||
REST_PROXY_DEPTH=1
|
||||
|
||||
# load rust if necessary
|
||||
if [ -e "${HOME}/.cargo/env" ];then
|
||||
source "${HOME}/.cargo/env"
|
||||
@ -161,6 +178,8 @@ do
|
||||
--address-search \
|
||||
--utxos-limit "${UTXOS_LIMIT}" \
|
||||
--electrum-txs-limit "${ELECTRUM_TXS_LIMIT}" \
|
||||
--electrum-proxy-depth "${RPC_PROXY_DEPTH}" \
|
||||
--rest-proxy-depth "${REST_PROXY_DEPTH}" \
|
||||
-vv
|
||||
sleep 1
|
||||
done
|
||||
|
||||
Loading…
Reference in New Issue
Block a user