Implement timeouts when there are multiple socket addrs

This commit is contained in:
Alekos Filini 2021-02-22 12:08:31 +01:00
parent 25dc3fa5e5
commit fa9359cf73
No known key found for this signature in database
GPG Key ID: 431401E4A4530061
2 changed files with 38 additions and 19 deletions

View File

@ -5,7 +5,7 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::io::{BufRead, BufReader, Read, Write};
use std::mem::drop;
use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
use std::net::{TcpStream, ToSocketAddrs};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex, TryLockError};
@ -161,8 +161,7 @@ impl RawClient<ElectrumPlaintextStream> {
) -> Result<Self, Error> {
let stream = match timeout {
Some(timeout) => {
let socket_addr = get_one_socket_addr(socket_addrs)?;
let stream = TcpStream::connect_timeout(&socket_addr, timeout)?;
let stream = connect_with_total_timeout(socket_addrs, timeout)?;
stream.set_read_timeout(Some(timeout))?;
stream.set_write_timeout(Some(timeout))?;
stream
@ -174,16 +173,41 @@ impl RawClient<ElectrumPlaintextStream> {
}
}
fn get_one_socket_addr<A: ToSocketAddrs>(socket_addrs: A) -> Result<SocketAddr, Error> {
let mut socket_iter = socket_addrs.to_socket_addrs()?;
let socket_addr = socket_iter
.next()
.ok_or(Error::WrongAddrsNumberWithTimeout)?;
// Unlike `connect`, `connect_timeout` takes a single [`SocketAddr`]
match socket_iter.next() {
None => Ok(socket_addr),
Some(_) => Err(Error::WrongAddrsNumberWithTimeout),
fn connect_with_total_timeout<A: ToSocketAddrs>(
socket_addrs: A,
mut timeout: Duration,
) -> Result<TcpStream, Error> {
// Use the same algorithm as curl: 1/2 on the first host, 1/4 on the second one, etc.
// https://curl.se/mail/lib-2014-11/0164.html
let mut errors = Vec::new();
let addrs = socket_addrs
.to_socket_addrs()?
.enumerate()
.collect::<Vec<_>>();
for (index, addr) in &addrs {
if *index < addrs.len() - 1 {
timeout = timeout.div_f32(2.0);
}
info!(
"Trying to connect to {} (attempt {}/{}) with timeout {:?}",
addr,
index + 1,
addrs.len(),
timeout
);
match TcpStream::connect_timeout(addr, timeout) {
Ok(socket) => return Ok(socket),
Err(e) => {
warn!("Connection error: {:?}", e);
errors.push(e.into());
}
}
}
Err(Error::AllAttemptsErrored(errors))
}
#[cfg(feature = "use-openssl")]
@ -209,8 +233,7 @@ impl RawClient<ElectrumSslStream> {
}
match timeout {
Some(timeout) => {
let socket_addr = get_one_socket_addr(socket_addrs.clone())?;
let stream = TcpStream::connect_timeout(&socket_addr, timeout)?;
let stream = connect_with_total_timeout(socket_addrs.clone(), timeout)?;
stream.set_read_timeout(Some(timeout))?;
stream.set_write_timeout(Some(timeout))?;
Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)
@ -300,8 +323,7 @@ impl RawClient<ElectrumSslStream> {
}
match timeout {
Some(timeout) => {
let socket_addr = get_one_socket_addr(socket_addrs.clone())?;
let stream = TcpStream::connect_timeout(&socket_addr, timeout)?;
let stream = connect_with_total_timeout(socket_addrs.clone(), timeout)?;
stream.set_read_timeout(Some(timeout))?;
stream.set_write_timeout(Some(timeout))?;
Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)

View File

@ -296,8 +296,6 @@ pub enum Error {
SharedIOError(Arc<std::io::Error>),
/// Setting both a proxy and a timeout in `Config` is an error
BothSocksAndTimeout,
/// Setting both a timeout and passing zero or more than one socket addrs is an error
WrongAddrsNumberWithTimeout,
/// Couldn't take a lock on the reader mutex. This means that there's already another reader
/// thread running
@ -343,7 +341,6 @@ impl Display for Error {
Error::MissingDomain => f.write_str("Missing domain while it was explicitly asked to validate it"),
Error::BothSocksAndTimeout => f.write_str("Setting both a proxy and a timeout in `Config` is an error"),
Error::WrongAddrsNumberWithTimeout => f.write_str("Setting both a timeout and passing zero or more than one socket addrs is an error"),
Error::CouldntLockReader => f.write_str("Couldn't take a lock on the reader mutex. This means that there's already another reader thread is running"),
}
}