fix: fix batch ordering issue

Fixes issue #75
Raw client waiting map was using the same channel
for every request/response. When items were put
back into the channel inside of _reader_thread
the waiting receiver in recv would just take the
next response without validating it on request id
request.
This fixes this by using unique channels for each
request response inside of the waiting map.
This commit is contained in:
marshallyale 2025-01-22 13:46:41 -08:00 committed by valued mammal
parent 372eda9455
commit 6721929f22
No known key found for this signature in database
GPG Key ID: A98E0F0093B80812

View File

@ -3,7 +3,7 @@
//! This module contains the definition of the raw client that wraps the transport method
use std::borrow::Borrow;
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::io::{BufRead, BufReader, Read, Write};
use std::mem::drop;
use std::net::{TcpStream, ToSocketAddrs};
@ -539,11 +539,10 @@ impl<S: Read + Write> RawClient<S> {
if let Some(until_message) = until_message {
// If we are trying to start a reader thread but the corresponding sender is
// missing from the map, exit immediately. This can happen with batch calls,
// since the sender is shared for all the individual queries in a call. We
// might have already received a response for that id, but we don't know it
// yet. Exiting here forces the calling code to fallback to the sender-receiver
// method, and it should find a message there waiting for it.
// missing from the map, exit immediately. We might have already received a
// response for that id, but we don't know it yet. Exiting here forces the
// calling code to fallback to the sender-receiver method, and it should find
// a message there waiting for it.
if self.waiting_map.lock()?.get(&until_message).is_none() {
return Err(Error::CouldntLockReader);
}
@ -762,12 +761,10 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
fn batch_call(&self, batch: &Batch) -> Result<Vec<serde_json::Value>, Error> {
let mut raw = Vec::new();
let mut missing_responses = BTreeSet::new();
let mut missing_responses = Vec::new();
let mut answers = BTreeMap::new();
// Add our listener to the map before we send the request, Here we will clone the sender
// for every request id, so that we only have to monitor one receiver.
let (sender, receiver) = channel();
// Add our listener to the map before we send the request
for (method, params) in batch.iter() {
let req = Request::new_id(
@ -775,9 +772,12 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
method,
params.to_vec(),
);
missing_responses.insert(req.id);
// Add distinct channel to each request so when we remove our request id (and sender) from the waiting_map
// we can be sure that the response gets sent to the correct channel in self.recv
let (sender, receiver) = channel();
missing_responses.push((req.id, receiver));
self.waiting_map.lock()?.insert(req.id, sender.clone());
self.waiting_map.lock()?.insert(req.id, sender);
raw.append(&mut serde_json::to_vec(&req)?);
raw.extend_from_slice(b"\n");
@ -796,8 +796,8 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
self.increment_calls();
for req_id in missing_responses.iter() {
match self.recv(&receiver, *req_id) {
for (req_id, receiver) in missing_responses.iter() {
match self.recv(receiver, *req_id) {
Ok(mut resp) => answers.insert(req_id, resp["result"].take()),
Err(e) => {
// In case of error our sender could still be left in the map, depending on where
@ -805,7 +805,7 @@ impl<T: Read + Write> ElectrumApi for RawClient<T> {
warn!("got error for req_id {}: {:?}", req_id, e);
warn!("removing all waiting req of this batch");
let mut guard = self.waiting_map.lock()?;
for req_id in missing_responses.iter() {
for (req_id, _) in missing_responses.iter() {
guard.remove(req_id);
}
return Err(e);