From 6721929f22be6243e8bcaa71de9d3372e4a3fbba Mon Sep 17 00:00:00 2001 From: marshallyale <19430240+marshallyale@users.noreply.github.com> Date: Wed, 22 Jan 2025 13:46:41 -0800 Subject: [PATCH] fix: fix batch ordering issue Fixes issue #75 Raw client waiting map was using the same channel for every request/response. When items were put back into the channel inside of _reader_thread the waiting receiver in recv would just take the next response without validating it on request id request. This fixes this by using unique channels for each request response inside of the waiting map. --- src/raw_client.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/raw_client.rs b/src/raw_client.rs index eb78185..28ffc61 100644 --- a/src/raw_client.rs +++ b/src/raw_client.rs @@ -3,7 +3,7 @@ //! This module contains the definition of the raw client that wraps the transport method use std::borrow::Borrow; -use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::collections::{BTreeMap, HashMap, VecDeque}; use std::io::{BufRead, BufReader, Read, Write}; use std::mem::drop; use std::net::{TcpStream, ToSocketAddrs}; @@ -539,11 +539,10 @@ impl RawClient { if let Some(until_message) = until_message { // If we are trying to start a reader thread but the corresponding sender is - // missing from the map, exit immediately. This can happen with batch calls, - // since the sender is shared for all the individual queries in a call. We - // might have already received a response for that id, but we don't know it - // yet. Exiting here forces the calling code to fallback to the sender-receiver - // method, and it should find a message there waiting for it. + // missing from the map, exit immediately. We might have already received a + // response for that id, but we don't know it yet. Exiting here forces the + // calling code to fallback to the sender-receiver method, and it should find + // a message there waiting for it. if self.waiting_map.lock()?.get(&until_message).is_none() { return Err(Error::CouldntLockReader); } @@ -762,12 +761,10 @@ impl ElectrumApi for RawClient { fn batch_call(&self, batch: &Batch) -> Result, Error> { let mut raw = Vec::new(); - let mut missing_responses = BTreeSet::new(); + let mut missing_responses = Vec::new(); let mut answers = BTreeMap::new(); - // Add our listener to the map before we send the request, Here we will clone the sender - // for every request id, so that we only have to monitor one receiver. - let (sender, receiver) = channel(); + // Add our listener to the map before we send the request for (method, params) in batch.iter() { let req = Request::new_id( @@ -775,9 +772,12 @@ impl ElectrumApi for RawClient { method, params.to_vec(), ); - missing_responses.insert(req.id); + // Add distinct channel to each request so when we remove our request id (and sender) from the waiting_map + // we can be sure that the response gets sent to the correct channel in self.recv + let (sender, receiver) = channel(); + missing_responses.push((req.id, receiver)); - self.waiting_map.lock()?.insert(req.id, sender.clone()); + self.waiting_map.lock()?.insert(req.id, sender); raw.append(&mut serde_json::to_vec(&req)?); raw.extend_from_slice(b"\n"); @@ -796,8 +796,8 @@ impl ElectrumApi for RawClient { self.increment_calls(); - for req_id in missing_responses.iter() { - match self.recv(&receiver, *req_id) { + for (req_id, receiver) in missing_responses.iter() { + match self.recv(receiver, *req_id) { Ok(mut resp) => answers.insert(req_id, resp["result"].take()), Err(e) => { // In case of error our sender could still be left in the map, depending on where @@ -805,7 +805,7 @@ impl ElectrumApi for RawClient { warn!("got error for req_id {}: {:?}", req_id, e); warn!("removing all waiting req of this batch"); let mut guard = self.waiting_map.lock()?; - for req_id in missing_responses.iter() { + for (req_id, _) in missing_responses.iter() { guard.remove(req_id); } return Err(e);