[wip] try to reconnect

This commit is contained in:
Alekos Filini 2020-03-19 12:25:20 +01:00
parent c240adcd1d
commit d5eef6905a
No known key found for this signature in database
GPG Key ID: 5E8AFC3034FDFA4F
3 changed files with 180 additions and 40 deletions

View File

@ -1,9 +1,42 @@
extern crate electrum_client;
extern crate log;
use electrum_client::Client;
fn main() {
let mut client = Client::new("kirsche.emzy.de:50001").unwrap();
let res = client.server_features();
println!("{:#?}", res);
use log::{Level, Metadata, Record};
struct SimpleLogger;
impl log::Log for SimpleLogger {
fn enabled(&self, metadata: &Metadata) -> bool {
metadata.level() <= Level::Trace
}
fn log(&self, record: &Record) {
if self.enabled(record.metadata()) {
println!("{} - {}", record.level(), record.args());
}
}
fn flush(&self) {}
}
use log::{LevelFilter, SetLoggerError};
static LOGGER: SimpleLogger = SimpleLogger;
pub fn init() -> Result<(), SetLoggerError> {
log::set_logger(&LOGGER).map(|()| log::set_max_level(LevelFilter::Trace))
}
fn main() {
init().unwrap();
let mut client = Client::new("localhost:50001").unwrap();
loop {
let res = client.relay_fee();
println!("{:?}", res);
std::thread::sleep(std::time::Duration::from_secs(3));
}
}

View File

@ -7,7 +7,7 @@ use std::io::{self, BufRead, BufReader, Read, Write};
use std::net::{TcpStream, ToSocketAddrs};
#[allow(unused_imports)]
use log::{debug, error, info, trace};
use log::{debug, error, info, trace, warn};
use bitcoin::consensus::encode::{deserialize, serialize};
use bitcoin::hashes::hex::{FromHex, ToHex};
@ -24,7 +24,7 @@ use rustls::{ClientConfig, ClientSession, StreamOwned};
#[cfg(any(feature = "default", feature = "proxy"))]
use socks::{Socks5Stream, ToTargetAddr};
use stream::ClonableStream;
use stream::{ClonableStream, ReconnectStream};
use batch::Batch;
use types::*;
@ -86,7 +86,7 @@ impl ToSocketAddrsDomain for (&str, u16) {
#[derive(Debug)]
pub struct Client<S>
where
S: Read + Write,
S: Read + Write + ReconnectStream,
{
stream: ClonableStream<S>,
buf_reader: BufReader<ClonableStream<S>>,
@ -98,12 +98,12 @@ where
calls: usize,
}
impl<S> From<S> for Client<S>
/*impl<S> From<S> for Client<S>
where
S: Read + Write,
S: Read + Write + ReconnectStream,
{
fn from(stream: S) -> Self {
let stream: ClonableStream<_> = stream.into();
//let stream: ClonableStream<_> = stream.into();
Self {
buf_reader: BufReader::new(stream.clone()),
@ -115,19 +115,31 @@ where
calls: 0,
}
}
}
}*/
/// Transport type used to establish a plaintext TCP connection with the server
pub type ElectrumPlaintextStream = TcpStream;
impl Client<ElectrumPlaintextStream> {
/// Creates a new plaintext client and tries to connect to `socket_addr`.
pub fn new<A: ToSocketAddrs>(socket_addr: A) -> Result<Self, Error> {
let stream = TcpStream::connect(socket_addr)?;
// TODO: we should probably set the connection/read/write timeout on the socket
Ok(stream.into())
let socket_addr = socket_addr.to_socket_addrs()?.collect::<Vec<_>>();
let stream = ClonableStream::new(socket_addr)?;
Ok(Self {
buf_reader: BufReader::new(stream.clone()),
stream,
headers: VecDeque::new(),
script_notifications: BTreeMap::new(),
#[cfg(feature = "debug-calls")]
calls: 0,
})
}
}
/*
#[cfg(feature = "use-openssl")]
/// Transport type used to establish an OpenSSL TLS encrypted/authenticated connection with the server
pub type ElectrumSslStream = SslStream<TcpStream>;
@ -244,28 +256,73 @@ impl Client<ElectrumProxyStream> {
Ok(stream.into())
}
}
*/
macro_rules! try_reconnect_attempts {
($self: expr, $result_fn:expr, $attempts:expr, $sleep_ms:expr) => {{
let mut attempt = 0;
loop {
match $result_fn() {
Err(e) if attempt < $attempts => {
warn!("Error {:?}, reconnection attempt: {}. Will sleep for {} ms, then we'll try to reconnect.", e, attempt, $sleep_ms);
std::thread::sleep(std::time::Duration::from_millis($sleep_ms));
if let Err(e) = $self.try_reconnect() {
warn!("Attempt {} gave error {:?}", attempt, e);
}
attempt += 1;
},
Err(e) => break Err(e),
Ok(v) => break Ok(v),
}
}
}};
}
impl<S: Read + Write + ReconnectStream> Client<S>
where
Error: From<<S as ReconnectStream>::Error>,
{
fn try_reconnect(&mut self) -> Result<(), Error> {
// TODO: reattach all the notifications to the new stream
self.stream.try_reconnect()?;
self.buf_reader = BufReader::new(self.stream.clone());
Ok(())
}
impl<S: Read + Write> Client<S> {
fn call(&mut self, req: Request) -> Result<serde_json::Value, Error> {
let mut raw = serde_json::to_vec(&req)?;
trace!("==> {}", String::from_utf8_lossy(&raw));
raw.extend_from_slice(b"\n");
self.stream.write_all(&raw)?;
self.stream.flush()?;
self.increment_calls();
let result: Result<serde_json::Value, Error> = try_reconnect_attempts!(
self,
|| {
self.stream.write_all(&raw)?;
self.stream.flush()?;
let mut resp = loop {
let raw = self.recv()?;
let mut resp: serde_json::Value = serde_json::from_slice(&raw)?;
self.increment_calls();
match resp["method"].take().as_str() {
Some(ref method) if method == &req.method => break resp,
Some(ref method) => self.handle_notification(method, resp["result"].take())?,
_ => break resp,
};
};
Ok(loop {
let raw = self.recv()?;
let mut resp: serde_json::Value = serde_json::from_slice(&raw)?;
match resp["method"].take().as_str() {
Some(ref method) if method == &req.method => break resp,
Some(ref method) => {
self.handle_notification(method, resp["result"].take())?
}
_ => break resp,
};
})
},
100,
1000
); // TODO: reduce the number of attempts
let mut resp = result?;
if let Some(err) = resp.get("error") {
return Err(Error::Protocol(err.clone()));
@ -293,6 +350,7 @@ impl<S: Read + Write> Client<S> {
trace!("==> {}", String::from_utf8_lossy(&raw));
// TODO: handle errors
self.stream.write_all(&raw)?;
self.stream.flush()?;
@ -328,7 +386,15 @@ impl<S: Read + Write> Client<S> {
fn recv(&mut self) -> io::Result<Vec<u8>> {
let mut resp = String::new();
self.buf_reader.read_line(&mut resp)?;
self.buf_reader
.read_line(&mut resp)
.and_then(|bytes_read| match bytes_read {
0 => Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Unexpected EOF in recv()",
)),
v => Ok(v),
});
trace!("<== {}", resp);
@ -367,6 +433,7 @@ impl<S: Read + Write> Client<S> {
self.buf_reader.fill_buf()?;
while !self.buf_reader.buffer().is_empty() {
// TODO: handle errors
let raw = self.recv()?;
let mut resp: serde_json::Value = serde_json::from_slice(&raw)?;

View File

@ -1,16 +1,53 @@
use std::io::{self, Read, Write};
use std::sync::{Arc, Mutex};
#[derive(Debug)]
pub struct ClonableStream<T: Read + Write>(Arc<Mutex<T>>);
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
impl<T: Read + Write> Read for ClonableStream<T> {
pub trait ReconnectStream: Sized {
type ReconnectData: std::fmt::Debug + std::clone::Clone;
type Error: std::fmt::Debug;
fn try_connect(data: &Self::ReconnectData) -> Result<Self, Self::Error>;
}
impl ReconnectStream for std::net::TcpStream {
type ReconnectData = Vec<std::net::SocketAddr>;
type Error = io::Error;
fn try_connect(data: &Self::ReconnectData) -> Result<Self, Self::Error> {
std::net::TcpStream::connect(&data[..])
}
}
#[derive(Debug)]
pub struct ClonableStream<T: Read + Write + ReconnectStream>(
Arc<Mutex<T>>,
<T as ReconnectStream>::ReconnectData,
);
impl<T: Read + Write + ReconnectStream> ClonableStream<T> {
pub fn new(
data: <T as ReconnectStream>::ReconnectData,
) -> Result<Self, <T as ReconnectStream>::Error> {
Ok(Self(Arc::new(Mutex::new(T::try_connect(&data)?)), data))
}
pub fn try_reconnect(&mut self) -> Result<(), <T as ReconnectStream>::Error> {
self.0 = Arc::new(Mutex::new(T::try_connect(&self.1)?));
debug!("ClonableStream::try_reconnect() successful");
Ok(())
}
}
impl<T: Read + Write + ReconnectStream> Read for ClonableStream<T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.lock().unwrap().read(buf)
}
}
impl<T: Read + Write> Write for ClonableStream<T> {
impl<T: Read + Write + ReconnectStream> Write for ClonableStream<T> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.lock().unwrap().write(buf)
}
@ -20,20 +57,23 @@ impl<T: Read + Write> Write for ClonableStream<T> {
}
}
impl<T: Read + Write> From<T> for ClonableStream<T> {
fn from(stream: T) -> Self {
Self(Arc::new(Mutex::new(stream)))
}
}
// For some reason this fails saying that there's a conflicting implementation in core ??
/* impl<T: Read + Write + ReconnectStream> TryFrom<<T as ReconnectStream>::ReconnectData> for ClonableStream<T> {
type Error = <T as ReconnectStream>::Error;
impl<T: Read + Write> Clone for ClonableStream<T> {
fn try_from(data: <T as ReconnectStream>::ReconnectData) -> Result<Self, Self::Error> {
Ok(Self(Arc::new(Mutex::new(T::try_connect(&data)?)), data))
}
}*/
impl<T: Read + Write + ReconnectStream> Clone for ClonableStream<T> {
fn clone(&self) -> Self {
ClonableStream(Arc::clone(&self.0))
ClonableStream(Arc::clone(&self.0), self.1.clone())
}
}
#[cfg(test)]
impl<T: Read + Write> ClonableStream<T> {
impl<T: Read + Write + ReconnectStream> ClonableStream<T> {
pub fn stream(&self) -> Arc<Mutex<T>> {
Arc::clone(&self.0)
}