diff --git a/Cargo.toml b/Cargo.toml index 2a41d59..930ba64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ path = "src/lib.rs" [dependencies] log = "^0.4" +env_logger = "0.7" bitcoin = { version = "0.23", features = ["use-serde"] } serde = { version = "^1.0", features = ["derive"] } serde_json = { version = "^1.0" } diff --git a/examples/plaintext.rs b/examples/plaintext.rs index 4b1c9bc..fd48b20 100644 --- a/examples/plaintext.rs +++ b/examples/plaintext.rs @@ -1,9 +1,39 @@ extern crate electrum_client; +extern crate env_logger; + +use std::sync::Arc; +use std::thread; use electrum_client::Client; fn main() { - let mut client = Client::new("kirsche.emzy.de:50001").unwrap(); - let res = client.server_features(); - println!("{:#?}", res); + env_logger::init(); + + let client = Arc::new(Client::new("electrum.blockstream.info:50001").unwrap()); + + let mut handles = Vec::new(); + + /*let _client = Arc::clone(&client); + let handle = thread::spawn(move || { + _client.reader_thread().unwrap(); + println!("reader thread exited"); + }); + + handles.push(handle);*/ + + thread::sleep(std::time::Duration::from_secs(1)); + + for _ in 0..4 { + let client = Arc::clone(&client); + let handle = thread::spawn(move || { + let res = client.batch_estimate_fee(vec![1, 3, 6, 12]); + println!("{:?}", res); + }); + + handles.push(handle); + } + + for h in handles { + h.join().unwrap(); + } } diff --git a/src/client.rs b/src/client.rs index 04a1f14..dde2dd4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,12 +2,16 @@ //! //! This module contains definitions of all the complex data structures that are returned by calls -use std::collections::{BTreeMap, VecDeque}; -use std::io::{self, BufRead, BufReader, Read, Write}; +use core::sync::atomic::{AtomicUsize, Ordering}; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::io::{BufRead, BufReader, Read, Write}; +use std::mem::drop; use std::net::{TcpStream, ToSocketAddrs}; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::{Mutex, TryLockError}; #[allow(unused_imports)] -use log::{debug, error, info, trace}; +use log::{debug, error, info, trace, warn}; use bitcoin::consensus::encode::{deserialize, serialize}; use bitcoin::hashes::hex::{FromHex, ToHex}; @@ -88,11 +92,14 @@ pub struct Client where S: Read + Write, { - stream: ClonableStream, - buf_reader: BufReader>, + stream: Mutex>, + buf_reader: Mutex>>, - headers: VecDeque, - script_notifications: BTreeMap>, + last_id: AtomicUsize, + waiting_map: Mutex>>, + + headers: Mutex>, + script_notifications: Mutex>>, #[cfg(feature = "debug-calls")] calls: usize, @@ -106,10 +113,14 @@ where let stream: ClonableStream<_> = stream.into(); Self { - buf_reader: BufReader::new(stream.clone()), - stream, - headers: VecDeque::new(), - script_notifications: BTreeMap::new(), + buf_reader: Mutex::new(BufReader::new(stream.clone())), + stream: Mutex::new(stream), + + last_id: AtomicUsize::new(0), + waiting_map: Mutex::new(HashMap::new()), + + headers: Mutex::new(VecDeque::new()), + script_notifications: Mutex::new(HashMap::new()), #[cfg(feature = "debug-calls")] calls: 0, @@ -245,110 +256,249 @@ impl Client { } } +#[derive(Debug)] +enum ChannelMessage { + Response(serde_json::Value), + WakeUp, +} + impl Client { - fn call(&mut self, req: Request) -> Result { + // TODO: to enable this we have to find a way to allow concurrent read and writes to the + // underlying transport struct. This can be done pretty easily for TcpStream because it can be + // split into a "read" and a "write" object, but it's not as trivial for other types. Without + // such thing, this causes a deadlock, because the reader thread takes a lock on the + // `ClonableStream` before other threads can send a request to the server. They will block + // waiting for the reader to release the mutex, but this will never happen because the server + // didn't receive any request, so it has nothing to send back. + // pub fn reader_thread(&self) -> Result<(), Error> { + // self._reader_thread(None).map(|_| ()) + // } + + fn _reader_thread(&self, until_message: Option) -> Result { + let mut raw_resp = String::new(); + let resp = match self.buf_reader.try_lock() { + Ok(mut reader) => { + trace!( + "Starting reader thread with `until_message` = {:?}", + until_message + ); + + if let Some(until_message) = until_message { + // If we are trying to start a reader thread but the corresponding sender is + // missing from the map, exit immediately. This can happen with batch calls, + // since the sender is shared for all the individual queries in a call. We + // might have already received a response for that id, but we don't know it + // yet. Exiting here forces the calling code to fallback to the sender-receiver + // method, and it should find a message there waiting for it. + if self + .waiting_map + .lock() + .unwrap() + .get(&until_message) + .is_none() + { + return Err(Error::CouldntLockReader); + } + } + + // Loop over every message + loop { + raw_resp.clear(); + + reader.read_line(&mut raw_resp)?; + trace!("<== {}", raw_resp); + + let resp: serde_json::Value = serde_json::from_str(&raw_resp)?; + + // Normally there is and id, but it's missing for spontaneous notifications + // from the server + let resp_id = resp["id"] + .as_str() + .and_then(|s| s.parse().ok()) + .or(resp["id"].as_u64().map(|i| i as usize)); + match resp_id { + Some(resp_id) if until_message == Some(resp_id) => { + // We have a valid id and it's exactly the one we were waiting for! + trace!( + "Reader thread {} received a response for its request", + resp_id + ); + + // Remove ourselves from the "waiting map" + let mut map = self.waiting_map.lock().unwrap(); + map.remove(&resp_id); + + // If the map is not empty, we select a random thread to become the + // new reader thread. + if let Some(sender) = map.values().nth(0) { + sender + .send(ChannelMessage::WakeUp) + .expect("Unable to WakeUp a different thread"); + } + + break Ok(resp); + } + Some(resp_id) => { + // We have an id, but it's not our response. Notify the thread and + // move on + trace!("Reader thread received response for {}", resp_id); + + let mut map = self.waiting_map.lock().unwrap(); + if let Some(sender) = map.get(&resp_id) { + sender + .send(ChannelMessage::Response(resp)) + .expect("Unable to send the response"); + map.remove(&resp_id); + } else { + warn!("Missing listener for {}", resp_id); + } + } + None => { + // No id, that's probably a notification. + let mut resp = resp; + + if let Some(ref method) = resp["method"].take().as_str() { + self.handle_notification(method, resp["params"].take())?; + } else { + warn!("Unexpected response: {:?}", resp); + } + } + } + } + } + Err(TryLockError::WouldBlock) => { + // If we "WouldBlock" here it means that there's already a reader thread + // running somewhere. + Err(Error::CouldntLockReader) + } + e @ Err(TryLockError::Poisoned(_)) => e + .map(|_| Ok(serde_json::Value::Null)) + .expect("Poisoned reader mutex"), // panic if the reader mutex has been poisoned + }; + + let resp = resp?; + if let Some(err) = resp.get("error") { + Err(Error::Protocol(err.clone())) + } else { + Ok(resp) + } + } + + fn call(&self, req: Request) -> Result { + // Add our listener to the map before we send the request, to make sure we don't get a + // reply before the receiver is added + let (sender, receiver) = channel(); + self.waiting_map.lock().unwrap().insert(req.id, sender); + let mut raw = serde_json::to_vec(&req)?; trace!("==> {}", String::from_utf8_lossy(&raw)); raw.extend_from_slice(b"\n"); - self.stream.write_all(&raw)?; - self.stream.flush()?; + let mut stream = self.stream.lock().unwrap(); + stream.write_all(&raw)?; + stream.flush()?; + drop(stream); // release the lock self.increment_calls(); - let mut resp = loop { - let raw = self.recv()?; - let mut resp: serde_json::Value = serde_json::from_slice(&raw)?; - - match resp["method"].take().as_str() { - Some(ref method) if method == &req.method => break resp, - Some(ref method) => self.handle_notification(method, resp["result"].take())?, - _ => break resp, - }; - }; - - if let Some(err) = resp.get("error") { - return Err(Error::Protocol(err.clone())); - } - + let mut resp = self.recv(&receiver, req.id)?; Ok(resp["result"].take()) } /// Execute a queue of calls stored in a [`Batch`](../batch/struct.Batch.html) struct. Returns /// `Ok()` **only if** all of the calls are successful. The order of the JSON `Value`s returned /// reflects the order in which the calls were made on the `Batch` struct. - pub fn batch_call(&mut self, batch: Batch) -> Result, Error> { - let mut id_map = BTreeMap::new(); + pub fn batch_call(&self, batch: Batch) -> Result, Error> { let mut raw = Vec::new(); + + let mut missing_responses = HashSet::new(); let mut answer = Vec::new(); - for (i, (method, params)) in batch.into_iter().enumerate() { - let req = Request::new_id(i, &method, params); + // Add our listener to the map before we send the request, Here we will clone the sender + // for every request id, so that we only have to monitor one receiver. + let (sender, receiver) = channel(); + + for (method, params) in batch.into_iter() { + let req = Request::new_id(self.last_id.fetch_add(1, Ordering::SeqCst), &method, params); + missing_responses.insert(req.id); + + self.waiting_map + .lock() + .unwrap() + .insert(req.id, sender.clone()); raw.append(&mut serde_json::to_vec(&req)?); raw.extend_from_slice(b"\n"); + } - id_map.insert(req.id, method); + if missing_responses.is_empty() { + return Ok(vec![]); } trace!("==> {}", String::from_utf8_lossy(&raw)); - self.stream.write_all(&raw)?; - self.stream.flush()?; + let mut stream = self.stream.lock().unwrap(); + stream.write_all(&raw)?; + stream.flush()?; + drop(stream); // release the lock self.increment_calls(); - while answer.len() < id_map.len() { - let raw = self.recv()?; - let mut resp: serde_json::Value = serde_json::from_slice(&raw)?; + while !missing_responses.is_empty() { + let resp = self.recv(&receiver, *missing_responses.iter().nth(0).unwrap())?; + let resp_id = resp["id"].as_u64().unwrap() as usize; - let resp = match resp["id"].as_u64() { - Some(id) if id_map.contains_key(&(id as usize)) => resp, - _ => { - self.handle_notification( - resp["method"].take().as_str().unwrap_or(""), - resp["result"].take(), - )?; - continue; - } - }; - - if let Some(err) = resp.get("error") { - return Err(Error::Protocol(err.clone())); - } - - answer.push(resp.clone()); + missing_responses.remove(&resp_id); + answer.push(resp); } answer.sort_by(|a, b| a["id"].as_u64().partial_cmp(&b["id"].as_u64()).unwrap()); - let answer = answer.into_iter().map(|mut x| x["result"].take()).collect(); + Ok(answer) } - fn recv(&mut self) -> io::Result> { - let mut resp = String::new(); - self.buf_reader.read_line(&mut resp)?; + fn recv( + &self, + receiver: &Receiver, + req_id: usize, + ) -> Result { + loop { + // Try to take the lock on the reader. If we manage to do so, we'll become the reader + // thread until we get our reponse + match self._reader_thread(Some(req_id)) { + Ok(response) => break Ok(response), + Err(Error::CouldntLockReader) => { + match receiver.recv() { + // Received our response, returning it + Ok(ChannelMessage::Response(received)) => break Ok(received), + Ok(ChannelMessage::WakeUp) => { + // We have been woken up, this means that we should try becoming the + // reader thread ourselves + trace!("WakeUp for {}", req_id); - trace!("<== {}", resp); - - Ok(resp.as_bytes().to_vec()) + continue; + } + e @ Err(_) => e.map(|_| ()).expect("Error receiving from channel"), // panic if there's something wrong with the channels + } + } + e @ Err(_) => break e, + } + } } - fn handle_notification( - &mut self, - method: &str, - result: serde_json::Value, - ) -> Result<(), Error> { + fn handle_notification(&self, method: &str, result: serde_json::Value) -> Result<(), Error> { match method { - "blockchain.headers.subscribe" => { - self.headers.push_back(serde_json::from_value(result)?) - } + "blockchain.headers.subscribe" => self.headers.lock().unwrap().append( + &mut serde_json::from_value::>(result)? + .into_iter() + .collect(), + ), "blockchain.scripthash.subscribe" => { let unserialized: ScriptNotification = serde_json::from_value(result)?; + let mut script_notifications = self.script_notifications.lock().unwrap(); - let queue = self - .script_notifications + let queue = script_notifications .get_mut(&unserialized.scripthash) .ok_or_else(|| Error::NotSubscribed(unserialized.scripthash))?; @@ -360,28 +510,13 @@ impl Client { Ok(()) } - /// Tries to read from the read buffer if any notifications were received since the last call - /// or `poll`, and processes them - pub fn poll(&mut self) -> Result<(), Error> { - // try to pull data from the stream - self.buf_reader.fill_buf()?; - - while !self.buf_reader.buffer().is_empty() { - let raw = self.recv()?; - let mut resp: serde_json::Value = serde_json::from_slice(&raw)?; - - match resp["method"].take().as_str() { - Some(ref method) => self.handle_notification(method, resp["params"].take())?, - _ => continue, - } - } - - Ok(()) - } - /// Subscribes to notifications for new block headers, by sending a `blockchain.headers.subscribe` call. - pub fn block_headers_subscribe(&mut self) -> Result { - let req = Request::new("blockchain.headers.subscribe", vec![]); + pub fn block_headers_subscribe(&self) -> Result { + let req = Request::new_id( + self.last_id.fetch_add(1, Ordering::SeqCst), + "blockchain.headers.subscribe", + vec![], + ); let value = self.call(req)?; Ok(serde_json::from_value(value)?) @@ -389,20 +524,22 @@ impl Client { /// Tries to pop one queued notification for a new block header that we might have received. /// Returns `None` if there are no items in the queue. - pub fn block_headers_poll(&mut self) -> Result, Error> { - self.poll()?; - - Ok(self.headers.pop_front()) + pub fn block_headers_pop(&self) -> Result, Error> { + Ok(self.headers.lock().unwrap().pop_front()) } /// Gets the block header for height `height`. - pub fn block_header(&mut self, height: usize) -> Result { + pub fn block_header(&self, height: usize) -> Result { Ok(deserialize(&self.block_header_raw(height)?)?) } /// Gets the raw bytes of block header for height `height`. - pub fn block_header_raw(&mut self, height: usize) -> Result, Error> { - let req = Request::new("blockchain.block.header", vec![Param::Usize(height)]); + pub fn block_header_raw(&self, height: usize) -> Result, Error> { + let req = Request::new_id( + self.last_id.fetch_add(1, Ordering::SeqCst), + "blockchain.block.header", + vec![Param::Usize(height)], + ); let result = self.call(req)?; Ok(Vec::::from_hex( @@ -413,12 +550,9 @@ impl Client { } /// Tries to fetch `count` block headers starting from `start_height`. - pub fn block_headers( - &mut self, - start_height: usize, - count: usize, - ) -> Result { - let req = Request::new( + pub fn block_headers(&self, start_height: usize, count: usize) -> Result { + let req = Request::new_id( + self.last_id.fetch_add(1, Ordering::SeqCst), "blockchain.block.headers", vec![Param::Usize(start_height), Param::Usize(count)], ); @@ -437,8 +571,12 @@ impl Client { } /// Estimates the fee required in **Satoshis per kilobyte** to confirm a transaction in `number` blocks. - pub fn estimate_fee(&mut self, number: usize) -> Result { - let req = Request::new("blockchain.estimatefee", vec![Param::Usize(number)]); + pub fn estimate_fee(&self, number: usize) -> Result { + let req = Request::new_id( + self.last_id.fetch_add(1, Ordering::SeqCst), + "blockchain.estimatefee", + vec![Param::Usize(number)], + ); let result = self.call(req)?; result @@ -447,8 +585,12 @@ impl Client { } /// Returns the minimum accepted fee by the server's node in **Bitcoin, not Satoshi**. - pub fn relay_fee(&mut self) -> Result { - let req = Request::new("blockchain.relayfee", vec![]); + pub fn relay_fee(&self) -> Result { + let req = Request::new_id( + self.last_id.fetch_add(1, Ordering::SeqCst), + "blockchain.relayfee", + vec![], + ); let result = self.call(req)?; result @@ -463,17 +605,18 @@ impl Client { /// /// Returns [`Error::AlreadySubscribed`](../types/enum.Error.html#variant.AlreadySubscribed) if /// already subscribed to the same script. - pub fn script_subscribe(&mut self, script: &Script) -> Result { + pub fn script_subscribe(&self, script: &Script) -> Result, Error> { let script_hash = script.to_electrum_scripthash(); + let mut script_notifications = self.script_notifications.lock().unwrap(); - if self.script_notifications.contains_key(&script_hash) { + if script_notifications.contains_key(&script_hash) { return Err(Error::AlreadySubscribed(script_hash)); } - self.script_notifications - .insert(script_hash.clone(), VecDeque::new()); + script_notifications.insert(script_hash.clone(), VecDeque::new()); - let req = Request::new( + let req = Request::new_id( + self.last_id.fetch_add(1, Ordering::SeqCst), "blockchain.scripthash.subscribe", vec![Param::String(script_hash.to_hex())], ); @@ -488,41 +631,50 @@ impl Client { /// /// Returns [`Error::NotSubscribed`](../types/enum.Error.html#variant.NotSubscribed) if /// not subscribed to the script. - pub fn script_unsubscribe(&mut self, script: &Script) -> Result { + pub fn script_unsubscribe(&self, script: &Script) -> Result { let script_hash = script.to_electrum_scripthash(); + let mut script_notifications = self.script_notifications.lock().unwrap(); - if !self.script_notifications.contains_key(&script_hash) { + if !script_notifications.contains_key(&script_hash) { return Err(Error::NotSubscribed(script_hash)); } - let req = Request::new( + let req = Request::new_id( + self.last_id.fetch_add(1, Ordering::SeqCst), "blockchain.scripthash.unsubscribe", vec![Param::String(script_hash.to_hex())], ); let value = self.call(req)?; let answer = serde_json::from_value(value)?; - self.script_notifications.remove(&script_hash); + script_notifications.remove(&script_hash); Ok(answer) } /// Tries to pop one queued notification for a the requested script. Returns `None` if there are no items in the queue. - pub fn script_poll(&mut self, script: &Script) -> Result, Error> { - self.poll()?; - + pub fn script_pop(&self, script: &Script) -> Result, Error> { let script_hash = script.to_electrum_scripthash(); - match self.script_notifications.get_mut(&script_hash) { + match self + .script_notifications + .lock() + .unwrap() + .get_mut(&script_hash) + { None => Err(Error::NotSubscribed(script_hash)), Some(queue) => Ok(queue.pop_front()), } } /// Returns the balance for a *scriptPubKey* - pub fn script_get_balance(&mut self, script: &Script) -> Result { + pub fn script_get_balance(&self, script: &Script) -> Result { let params = vec![Param::String(script.to_electrum_scripthash().to_hex())]; - let req = Request::new("blockchain.scripthash.get_balance", params); + let req = Request::new_id( + self.last_id.fetch_add(1, Ordering::SeqCst), + "blockchain.scripthash.get_balance", + params, + ); let result = self.call(req)?; Ok(serde_json::from_value(result)?) @@ -530,10 +682,7 @@ impl Client { /// Batch version of [`script_get_balance`](#method.script_get_balance). /// /// Takes a list of scripts and returns a list of balance responses. - pub fn batch_script_get_balance<'s, I>( - &mut self, - scripts: I, - ) -> Result, Error> + pub fn batch_script_get_balance<'s, I>(&self, scripts: I) -> Result, Error> where I: IntoIterator, { @@ -541,9 +690,13 @@ impl Client { } /// Returns the history for a *scriptPubKey* - pub fn script_get_history(&mut self, script: &Script) -> Result, Error> { + pub fn script_get_history(&self, script: &Script) -> Result, Error> { let params = vec![Param::String(script.to_electrum_scripthash().to_hex())]; - let req = Request::new("blockchain.scripthash.get_history", params); + let req = Request::new_id( + self.last_id.fetch_add(1, Ordering::SeqCst), + "blockchain.scripthash.get_history", + params, + ); let result = self.call(req)?; Ok(serde_json::from_value(result)?) @@ -552,7 +705,7 @@ impl Client { /// /// Takes a list of scripts and returns a list of history responses. pub fn batch_script_get_history<'s, I>( - &mut self, + &self, scripts: I, ) -> Result>, Error> where @@ -562,9 +715,13 @@ impl Client { } /// Returns the list of unspent outputs for a *scriptPubKey* - pub fn script_list_unspent(&mut self, script: &Script) -> Result, Error> { + pub fn script_list_unspent(&self, script: &Script) -> Result, Error> { let params = vec![Param::String(script.to_electrum_scripthash().to_hex())]; - let req = Request::new("blockchain.scripthash.listunspent", params); + let req = Request::new_id( + self.last_id.fetch_add(1, Ordering::SeqCst), + "blockchain.scripthash.listunspent", + params, + ); let result = self.call(req)?; Ok(serde_json::from_value(result)?) @@ -574,7 +731,7 @@ impl Client { /// /// Takes a list of scripts and returns a list of a list of utxos. pub fn batch_script_list_unspent<'s, I>( - &mut self, + &self, scripts: I, ) -> Result>, Error> where @@ -584,14 +741,18 @@ impl Client { } /// Gets the transaction with `txid`. Returns an error if not found. - pub fn transaction_get(&mut self, txid: &Txid) -> Result { + pub fn transaction_get(&self, txid: &Txid) -> Result { Ok(deserialize(&self.transaction_get_raw(txid)?)?) } /// Gets the raw bytes of a transaction with `txid`. Returns an error if not found. - pub fn transaction_get_raw(&mut self, txid: &Txid) -> Result, Error> { + pub fn transaction_get_raw(&self, txid: &Txid) -> Result, Error> { let params = vec![Param::String(txid.to_hex())]; - let req = Request::new("blockchain.transaction.get", params); + let req = Request::new_id( + self.last_id.fetch_add(1, Ordering::SeqCst), + "blockchain.transaction.get", + params, + ); let result = self.call(req)?; Ok(Vec::::from_hex( @@ -604,7 +765,7 @@ impl Client { /// Batch version of [`transaction_get`](#method.transaction_get). /// /// Takes a list of `txids` and returns a list of transactions. - pub fn batch_transaction_get<'t, I>(&mut self, txids: I) -> Result, Error> + pub fn batch_transaction_get<'t, I>(&self, txids: I) -> Result, Error> where I: IntoIterator, { @@ -617,7 +778,7 @@ impl Client { /// Batch version of [`transaction_get_raw`](#method.transaction_get_raw). /// /// Takes a list of `txids` and returns a list of transactions raw bytes. - pub fn batch_transaction_get_raw<'t, I>(&mut self, txids: I) -> Result>, Error> + pub fn batch_transaction_get_raw<'t, I>(&self, txids: I) -> Result>, Error> where I: IntoIterator, { @@ -631,7 +792,7 @@ impl Client { /// Batch version of [`block_header_raw`](#method.block_header_raw). /// /// Takes a list of `heights` of blocks and returns a list of block header raw bytes. - pub fn batch_block_header_raw<'s, I>(&mut self, heights: I) -> Result>, Error> + pub fn batch_block_header_raw<'s, I>(&self, heights: I) -> Result>, Error> where I: IntoIterator, { @@ -646,7 +807,7 @@ impl Client { /// Batch version of [`block_header`](#method.block_header). /// /// Takes a list of `heights` of blocks and returns a list of headers. - pub fn batch_block_header<'s, I>(&mut self, heights: I) -> Result, Error> + pub fn batch_block_header<'s, I>(&self, heights: I) -> Result, Error> where I: IntoIterator, { @@ -660,7 +821,7 @@ impl Client { /// /// Takes a list of `numbers` of blocks and returns a list of fee required in /// **Satoshis per kilobyte** to confirm a transaction in the given number of blocks. - pub fn batch_estimate_fee<'s, I>(&mut self, numbers: I) -> Result, Error> + pub fn batch_estimate_fee<'s, I>(&self, numbers: I) -> Result, Error> where I: IntoIterator, { @@ -668,36 +829,48 @@ impl Client { } /// Broadcasts the raw bytes of a transaction to the network. - pub fn transaction_broadcast_raw(&mut self, raw_tx: &[u8]) -> Result { + pub fn transaction_broadcast_raw(&self, raw_tx: &[u8]) -> Result { let params = vec![Param::String(raw_tx.to_hex())]; - let req = Request::new("blockchain.transaction.broadcast", params); + let req = Request::new_id( + self.last_id.fetch_add(1, Ordering::SeqCst), + "blockchain.transaction.broadcast", + params, + ); let result = self.call(req)?; Ok(serde_json::from_value(result)?) } /// Broadcasts a transaction to the network. - pub fn transaction_broadcast(&mut self, tx: &Transaction) -> Result { + pub fn transaction_broadcast(&self, tx: &Transaction) -> Result { let buffer: Vec = serialize(tx); self.transaction_broadcast_raw(&buffer) } /// Returns the merkle path for the transaction `txid` confirmed in the block at `height`. pub fn transaction_get_merkle( - &mut self, + &self, txid: &Txid, height: usize, ) -> Result { let params = vec![Param::String(txid.to_hex()), Param::Usize(height)]; - let req = Request::new("blockchain.transaction.get_merkle", params); + let req = Request::new_id( + self.last_id.fetch_add(1, Ordering::SeqCst), + "blockchain.transaction.get_merkle", + params, + ); let result = self.call(req)?; Ok(serde_json::from_value(result)?) } /// Returns the capabilities of the server. - pub fn server_features(&mut self) -> Result { - let req = Request::new("server.features", vec![]); + pub fn server_features(&self) -> Result { + let req = Request::new_id( + self.last_id.fetch_add(1, Ordering::SeqCst), + "server.features", + vec![], + ); let result = self.call(req)?; Ok(serde_json::from_value(result)?) diff --git a/src/lib.rs b/src/lib.rs index 790c9a6..2f1bc42 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,7 @@ //! ``` pub extern crate bitcoin; +extern crate core; extern crate log; #[cfg(feature = "use-openssl")] extern crate openssl; diff --git a/src/types.rs b/src/types.rs index cdd5417..dfd928e 100644 --- a/src/types.rs +++ b/src/types.rs @@ -2,6 +2,8 @@ //! //! This module contains definitions of all the complex data structures that are returned by calls +use std::ops::Deref; + use bitcoin::blockdata::block; use bitcoin::hashes::hex::FromHex; use bitcoin::hashes::{sha256, Hash}; @@ -23,6 +25,8 @@ pub enum Param { String(String), /// Boolean parameter Bool(bool), + /// Bytes array parameter + Bytes(Vec), } #[derive(Serialize, Clone)] @@ -40,7 +44,7 @@ pub struct Request<'a> { impl<'a> Request<'a> { /// Creates a new request with a default id - pub fn new(method: &'a str, params: Vec) -> Self { + fn new(method: &'a str, params: Vec) -> Self { Self { id: 0, jsonrpc: JSONRPC_2_0, @@ -58,12 +62,31 @@ impl<'a> Request<'a> { } } +#[doc(hidden)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize)] +pub struct Hex32Bytes(#[serde(deserialize_with = "from_hex")] [u8; 32]); + +impl Deref for Hex32Bytes { + type Target = [u8; 32]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From<[u8; 32]> for Hex32Bytes { + fn from(other: [u8; 32]) -> Hex32Bytes { + Hex32Bytes(other) + } +} + /// Format used by the Electrum server to identify an address. The reverse sha256 hash of the /// scriptPubKey. Documented [here](https://electrumx.readthedocs.io/en/latest/protocol-basics.html#script-hashes). -pub type ScriptHash = [u8; 32]; +pub type ScriptHash = Hex32Bytes; + /// Binary blob that condenses all the activity of an address. Used to detect changes without /// having to compare potentially long lists of transactions. -pub type ScriptStatus = [u8; 32]; +pub type ScriptStatus = Hex32Bytes; /// Trait used to convert a struct into the Electrum representation of an address pub trait ToElectrumScriptHash { @@ -76,7 +99,7 @@ impl ToElectrumScriptHash for Script { let mut result = sha256::Hash::hash(self.as_bytes()).into_inner(); result.reverse(); - result + result.into() } } @@ -243,6 +266,10 @@ pub enum Error { /// Missing domain while it was explicitly asked to validate it MissingDomain, + /// Couldn't take a lock on the reader mutex. This means that there's already another reader + /// thread running + CouldntLockReader, + #[cfg(feature = "use-openssl")] /// Invalid OpenSSL method used InvalidSslMethod(openssl::error::ErrorStack),