Merge bitcoindevkit/rust-electrum-client#109: Errors if expecting headers notification but not subscribed

b86f2bb22c errors if expecing headers notification but not subscribed (Riccardo Casatta)

Pull request description:

  the opposite, erroring when subscribing multiple times, is not handled because clients could call multiple times to trigger the server to reply with the tip

ACKs for top commit:
  danielabrozzoni:
    utACK b86f2bb22c - changes look good to me, didn't test locally

Tree-SHA512: e1a21223448e708cc054271eb1ac5285dc98bdadf7497a856fc4a19ff51655879352ca6ef9f3fb5ec5f9071dd7ee50b2e44d4c65e70694e5d9fa53d280179dd2
This commit is contained in:
Daniela Brozzoni 2023-06-27 15:11:03 +02:00
commit c84507e45b
No known key found for this signature in database
GPG Key ID: 7DE4F1FDCED0AB87
2 changed files with 37 additions and 8 deletions

View File

@ -133,7 +133,7 @@ where
last_id: AtomicUsize,
waiting_map: Mutex<HashMap<usize, Sender<ChannelMessage>>>,
headers: Mutex<VecDeque<RawHeaderNotification>>,
headers: Mutex<Option<VecDeque<RawHeaderNotification>>>,
script_notifications: Mutex<HashMap<ScriptHash, VecDeque<ScriptStatus>>>,
#[cfg(feature = "debug-calls")]
@ -154,7 +154,7 @@ where
last_id: AtomicUsize::new(0),
waiting_map: Mutex::new(HashMap::new()),
headers: Mutex::new(VecDeque::new()),
headers: Mutex::new(None),
script_notifications: Mutex::new(HashMap::new()),
#[cfg(feature = "debug-calls")]
@ -648,11 +648,17 @@ impl<S: Read + Write> RawClient<S> {
fn handle_notification(&self, method: &str, result: serde_json::Value) -> Result<(), Error> {
match method {
"blockchain.headers.subscribe" => self.headers.lock()?.append(
&mut serde_json::from_value::<Vec<RawHeaderNotification>>(result)?
.into_iter()
.collect(),
),
"blockchain.headers.subscribe" => {
let mut queue = self.headers.lock()?;
match queue.as_mut() {
None => return Err(Error::NotSubscribedToHeaders),
Some(queue) => queue.append(
&mut serde_json::from_value::<Vec<RawHeaderNotification>>(result)?
.into_iter()
.collect(),
),
}
}
"blockchain.scripthash.subscribe" => {
let unserialized: ScriptNotification = serde_json::from_value(result)?;
let mut script_notifications = self.script_notifications.lock()?;
@ -762,6 +768,11 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
}
fn block_headers_subscribe_raw(&self) -> Result<RawHeaderNotification, Error> {
let mut headers = self.headers.lock()?;
if headers.is_none() {
*headers = Some(VecDeque::new());
}
let req = Request::new_id(
self.last_id.fetch_add(1, Ordering::SeqCst),
"blockchain.headers.subscribe",
@ -773,7 +784,11 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
}
fn block_headers_pop_raw(&self) -> Result<Option<RawHeaderNotification>, Error> {
Ok(self.headers.lock()?.pop_front())
let mut queue = self.headers.lock()?;
match queue.as_mut() {
None => Err(Error::NotSubscribedToHeaders),
Some(queue) => Ok(queue.pop_front()),
}
}
fn block_header_raw(&self, height: usize) -> Result<Vec<u8>, Error> {
@ -1333,6 +1348,16 @@ mod test {
assert!(resp.height >= 639000);
}
#[test]
fn test_block_headers_subscribe_pop() {
let client = RawClient::new(get_test_server(), None).unwrap();
let resp = client.block_headers_pop();
assert_eq!(format!("{:?}", resp), "Err(NotSubscribedToHeaders)");
client.block_headers_subscribe().unwrap();
let resp = client.block_headers_pop();
assert!(resp.is_ok());
}
#[test]
fn test_script_subscribe() {
use std::str::FromStr;

View File

@ -327,6 +327,9 @@ pub enum Error {
#[cfg(feature = "use-openssl")]
/// SSL Handshake failed with the server
SslHandshakeError(openssl::ssl::HandshakeError<std::net::TcpStream>),
/// Expecting notification on headers but we are not subscribed
NotSubscribedToHeaders,
}
impl Display for Error {
@ -364,6 +367,7 @@ impl Display for Error {
Error::MissingDomain => f.write_str("Missing domain while it was explicitly asked to validate it"),
Error::CouldntLockReader => f.write_str("Couldn't take a lock on the reader mutex. This means that there's already another reader thread is running"),
Error::Mpsc => f.write_str("Broken IPC communication channel: the other thread probably has exited"),
Error::NotSubscribedToHeaders => write!(f, "Expecting notification on headers but we are not subscribed"),
}
}
}