diff --git a/src/raw_client.rs b/src/raw_client.rs index 62938cd..d421e1a 100644 --- a/src/raw_client.rs +++ b/src/raw_client.rs @@ -133,7 +133,7 @@ where last_id: AtomicUsize, waiting_map: Mutex>>, - headers: Mutex>, + headers: Mutex>>, script_notifications: Mutex>>, #[cfg(feature = "debug-calls")] @@ -154,7 +154,7 @@ where last_id: AtomicUsize::new(0), waiting_map: Mutex::new(HashMap::new()), - headers: Mutex::new(VecDeque::new()), + headers: Mutex::new(None), script_notifications: Mutex::new(HashMap::new()), #[cfg(feature = "debug-calls")] @@ -648,11 +648,17 @@ impl RawClient { fn handle_notification(&self, method: &str, result: serde_json::Value) -> Result<(), Error> { match method { - "blockchain.headers.subscribe" => self.headers.lock()?.append( - &mut serde_json::from_value::>(result)? - .into_iter() - .collect(), - ), + "blockchain.headers.subscribe" => { + let mut queue = self.headers.lock()?; + match queue.as_mut() { + None => return Err(Error::NotSubscribedToHeaders), + Some(queue) => queue.append( + &mut serde_json::from_value::>(result)? + .into_iter() + .collect(), + ), + } + } "blockchain.scripthash.subscribe" => { let unserialized: ScriptNotification = serde_json::from_value(result)?; let mut script_notifications = self.script_notifications.lock()?; @@ -762,6 +768,11 @@ impl ElectrumApi for RawClient { } fn block_headers_subscribe_raw(&self) -> Result { + let mut headers = self.headers.lock()?; + if headers.is_none() { + *headers = Some(VecDeque::new()); + } + let req = Request::new_id( self.last_id.fetch_add(1, Ordering::SeqCst), "blockchain.headers.subscribe", @@ -773,7 +784,11 @@ impl ElectrumApi for RawClient { } fn block_headers_pop_raw(&self) -> Result, Error> { - Ok(self.headers.lock()?.pop_front()) + let mut queue = self.headers.lock()?; + match queue.as_mut() { + None => Err(Error::NotSubscribedToHeaders), + Some(queue) => Ok(queue.pop_front()), + } } fn block_header_raw(&self, height: usize) -> Result, Error> { @@ -1333,6 +1348,16 @@ mod test { assert!(resp.height >= 639000); } + #[test] + fn test_block_headers_subscribe_pop() { + let client = RawClient::new(get_test_server(), None).unwrap(); + let resp = client.block_headers_pop(); + assert_eq!(format!("{:?}", resp), "Err(NotSubscribedToHeaders)"); + client.block_headers_subscribe().unwrap(); + let resp = client.block_headers_pop(); + assert!(resp.is_ok()); + } + #[test] fn test_script_subscribe() { use std::str::FromStr; diff --git a/src/types.rs b/src/types.rs index 0773883..12df4b9 100644 --- a/src/types.rs +++ b/src/types.rs @@ -327,6 +327,9 @@ pub enum Error { #[cfg(feature = "use-openssl")] /// SSL Handshake failed with the server SslHandshakeError(openssl::ssl::HandshakeError), + + /// Expecting notification on headers but we are not subscribed + NotSubscribedToHeaders, } impl Display for Error { @@ -364,6 +367,7 @@ impl Display for Error { Error::MissingDomain => f.write_str("Missing domain while it was explicitly asked to validate it"), Error::CouldntLockReader => f.write_str("Couldn't take a lock on the reader mutex. This means that there's already another reader thread is running"), Error::Mpsc => f.write_str("Broken IPC communication channel: the other thread probably has exited"), + Error::NotSubscribedToHeaders => write!(f, "Expecting notification on headers but we are not subscribed"), } } }