New multithreaded model
This new model is based on the idea of having only one thread monitor the socket and then distribute the responses to every other thread that's waiting. Whenever a request is made, a new mpsc channel is created and added to an internal map. Then, if there are not other threads waiting for pending responses, that thread will become the "reader thread" and monitor the socket until it receives the response it was waiting for. In the meantime, if it sees responses coming on the socket with different ids, it will lookup the correct Sender from the internal map and send that message to the thread that created the request in the first place. The other threads will be waiting trying to read from their Receiver, where they will either get the message in response to their request, or a signal to wake up and become the new "reader thread".
This commit is contained in:
parent
7b4e99e80c
commit
f37bcd59e0
@ -21,6 +21,7 @@ path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
log = "^0.4"
|
||||
env_logger = "0.7"
|
||||
bitcoin = { version = "0.23", features = ["use-serde"] }
|
||||
serde = { version = "^1.0", features = ["derive"] }
|
||||
serde_json = { version = "^1.0" }
|
||||
|
||||
@ -1,9 +1,39 @@
|
||||
extern crate electrum_client;
|
||||
extern crate env_logger;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
|
||||
use electrum_client::Client;
|
||||
|
||||
fn main() {
|
||||
let mut client = Client::new("kirsche.emzy.de:50001").unwrap();
|
||||
let res = client.server_features();
|
||||
println!("{:#?}", res);
|
||||
env_logger::init();
|
||||
|
||||
let client = Arc::new(Client::new("electrum.blockstream.info:50001").unwrap());
|
||||
|
||||
let mut handles = Vec::new();
|
||||
|
||||
/*let _client = Arc::clone(&client);
|
||||
let handle = thread::spawn(move || {
|
||||
_client.reader_thread().unwrap();
|
||||
println!("reader thread exited");
|
||||
});
|
||||
|
||||
handles.push(handle);*/
|
||||
|
||||
thread::sleep(std::time::Duration::from_secs(1));
|
||||
|
||||
for _ in 0..4 {
|
||||
let client = Arc::clone(&client);
|
||||
let handle = thread::spawn(move || {
|
||||
let res = client.batch_estimate_fee(vec![1, 3, 6, 12]);
|
||||
println!("{:?}", res);
|
||||
});
|
||||
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
for h in handles {
|
||||
h.join().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
473
src/client.rs
473
src/client.rs
@ -2,12 +2,16 @@
|
||||
//!
|
||||
//! This module contains definitions of all the complex data structures that are returned by calls
|
||||
|
||||
use std::collections::{BTreeMap, VecDeque};
|
||||
use std::io::{self, BufRead, BufReader, Read, Write};
|
||||
use core::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::io::{BufRead, BufReader, Read, Write};
|
||||
use std::mem::drop;
|
||||
use std::net::{TcpStream, ToSocketAddrs};
|
||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||
use std::sync::{Mutex, TryLockError};
|
||||
|
||||
#[allow(unused_imports)]
|
||||
use log::{debug, error, info, trace};
|
||||
use log::{debug, error, info, trace, warn};
|
||||
|
||||
use bitcoin::consensus::encode::{deserialize, serialize};
|
||||
use bitcoin::hashes::hex::{FromHex, ToHex};
|
||||
@ -88,11 +92,14 @@ pub struct Client<S>
|
||||
where
|
||||
S: Read + Write,
|
||||
{
|
||||
stream: ClonableStream<S>,
|
||||
buf_reader: BufReader<ClonableStream<S>>,
|
||||
stream: Mutex<ClonableStream<S>>,
|
||||
buf_reader: Mutex<BufReader<ClonableStream<S>>>,
|
||||
|
||||
headers: VecDeque<HeaderNotification>,
|
||||
script_notifications: BTreeMap<ScriptHash, VecDeque<ScriptStatus>>,
|
||||
last_id: AtomicUsize,
|
||||
waiting_map: Mutex<HashMap<usize, Sender<ChannelMessage>>>,
|
||||
|
||||
headers: Mutex<VecDeque<HeaderNotification>>,
|
||||
script_notifications: Mutex<HashMap<ScriptHash, VecDeque<ScriptStatus>>>,
|
||||
|
||||
#[cfg(feature = "debug-calls")]
|
||||
calls: usize,
|
||||
@ -106,10 +113,14 @@ where
|
||||
let stream: ClonableStream<_> = stream.into();
|
||||
|
||||
Self {
|
||||
buf_reader: BufReader::new(stream.clone()),
|
||||
stream,
|
||||
headers: VecDeque::new(),
|
||||
script_notifications: BTreeMap::new(),
|
||||
buf_reader: Mutex::new(BufReader::new(stream.clone())),
|
||||
stream: Mutex::new(stream),
|
||||
|
||||
last_id: AtomicUsize::new(0),
|
||||
waiting_map: Mutex::new(HashMap::new()),
|
||||
|
||||
headers: Mutex::new(VecDeque::new()),
|
||||
script_notifications: Mutex::new(HashMap::new()),
|
||||
|
||||
#[cfg(feature = "debug-calls")]
|
||||
calls: 0,
|
||||
@ -245,110 +256,249 @@ impl Client<ElectrumProxyStream> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum ChannelMessage {
|
||||
Response(serde_json::Value),
|
||||
WakeUp,
|
||||
}
|
||||
|
||||
impl<S: Read + Write> Client<S> {
|
||||
fn call(&mut self, req: Request) -> Result<serde_json::Value, Error> {
|
||||
// TODO: to enable this we have to find a way to allow concurrent read and writes to the
|
||||
// underlying transport struct. This can be done pretty easily for TcpStream because it can be
|
||||
// split into a "read" and a "write" object, but it's not as trivial for other types. Without
|
||||
// such thing, this causes a deadlock, because the reader thread takes a lock on the
|
||||
// `ClonableStream` before other threads can send a request to the server. They will block
|
||||
// waiting for the reader to release the mutex, but this will never happen because the server
|
||||
// didn't receive any request, so it has nothing to send back.
|
||||
// pub fn reader_thread(&self) -> Result<(), Error> {
|
||||
// self._reader_thread(None).map(|_| ())
|
||||
// }
|
||||
|
||||
fn _reader_thread(&self, until_message: Option<usize>) -> Result<serde_json::Value, Error> {
|
||||
let mut raw_resp = String::new();
|
||||
let resp = match self.buf_reader.try_lock() {
|
||||
Ok(mut reader) => {
|
||||
trace!(
|
||||
"Starting reader thread with `until_message` = {:?}",
|
||||
until_message
|
||||
);
|
||||
|
||||
if let Some(until_message) = until_message {
|
||||
// If we are trying to start a reader thread but the corresponding sender is
|
||||
// missing from the map, exit immediately. This can happen with batch calls,
|
||||
// since the sender is shared for all the individual queries in a call. We
|
||||
// might have already received a response for that id, but we don't know it
|
||||
// yet. Exiting here forces the calling code to fallback to the sender-receiver
|
||||
// method, and it should find a message there waiting for it.
|
||||
if self
|
||||
.waiting_map
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get(&until_message)
|
||||
.is_none()
|
||||
{
|
||||
return Err(Error::CouldntLockReader);
|
||||
}
|
||||
}
|
||||
|
||||
// Loop over every message
|
||||
loop {
|
||||
raw_resp.clear();
|
||||
|
||||
reader.read_line(&mut raw_resp)?;
|
||||
trace!("<== {}", raw_resp);
|
||||
|
||||
let resp: serde_json::Value = serde_json::from_str(&raw_resp)?;
|
||||
|
||||
// Normally there is and id, but it's missing for spontaneous notifications
|
||||
// from the server
|
||||
let resp_id = resp["id"]
|
||||
.as_str()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.or(resp["id"].as_u64().map(|i| i as usize));
|
||||
match resp_id {
|
||||
Some(resp_id) if until_message == Some(resp_id) => {
|
||||
// We have a valid id and it's exactly the one we were waiting for!
|
||||
trace!(
|
||||
"Reader thread {} received a response for its request",
|
||||
resp_id
|
||||
);
|
||||
|
||||
// Remove ourselves from the "waiting map"
|
||||
let mut map = self.waiting_map.lock().unwrap();
|
||||
map.remove(&resp_id);
|
||||
|
||||
// If the map is not empty, we select a random thread to become the
|
||||
// new reader thread.
|
||||
if let Some(sender) = map.values().nth(0) {
|
||||
sender
|
||||
.send(ChannelMessage::WakeUp)
|
||||
.expect("Unable to WakeUp a different thread");
|
||||
}
|
||||
|
||||
break Ok(resp);
|
||||
}
|
||||
Some(resp_id) => {
|
||||
// We have an id, but it's not our response. Notify the thread and
|
||||
// move on
|
||||
trace!("Reader thread received response for {}", resp_id);
|
||||
|
||||
let mut map = self.waiting_map.lock().unwrap();
|
||||
if let Some(sender) = map.get(&resp_id) {
|
||||
sender
|
||||
.send(ChannelMessage::Response(resp))
|
||||
.expect("Unable to send the response");
|
||||
map.remove(&resp_id);
|
||||
} else {
|
||||
warn!("Missing listener for {}", resp_id);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// No id, that's probably a notification.
|
||||
let mut resp = resp;
|
||||
|
||||
if let Some(ref method) = resp["method"].take().as_str() {
|
||||
self.handle_notification(method, resp["params"].take())?;
|
||||
} else {
|
||||
warn!("Unexpected response: {:?}", resp);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(TryLockError::WouldBlock) => {
|
||||
// If we "WouldBlock" here it means that there's already a reader thread
|
||||
// running somewhere.
|
||||
Err(Error::CouldntLockReader)
|
||||
}
|
||||
e @ Err(TryLockError::Poisoned(_)) => e
|
||||
.map(|_| Ok(serde_json::Value::Null))
|
||||
.expect("Poisoned reader mutex"), // panic if the reader mutex has been poisoned
|
||||
};
|
||||
|
||||
let resp = resp?;
|
||||
if let Some(err) = resp.get("error") {
|
||||
Err(Error::Protocol(err.clone()))
|
||||
} else {
|
||||
Ok(resp)
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&self, req: Request) -> Result<serde_json::Value, Error> {
|
||||
// Add our listener to the map before we send the request, to make sure we don't get a
|
||||
// reply before the receiver is added
|
||||
let (sender, receiver) = channel();
|
||||
self.waiting_map.lock().unwrap().insert(req.id, sender);
|
||||
|
||||
let mut raw = serde_json::to_vec(&req)?;
|
||||
trace!("==> {}", String::from_utf8_lossy(&raw));
|
||||
|
||||
raw.extend_from_slice(b"\n");
|
||||
self.stream.write_all(&raw)?;
|
||||
self.stream.flush()?;
|
||||
let mut stream = self.stream.lock().unwrap();
|
||||
stream.write_all(&raw)?;
|
||||
stream.flush()?;
|
||||
drop(stream); // release the lock
|
||||
|
||||
self.increment_calls();
|
||||
|
||||
let mut resp = loop {
|
||||
let raw = self.recv()?;
|
||||
let mut resp: serde_json::Value = serde_json::from_slice(&raw)?;
|
||||
|
||||
match resp["method"].take().as_str() {
|
||||
Some(ref method) if method == &req.method => break resp,
|
||||
Some(ref method) => self.handle_notification(method, resp["result"].take())?,
|
||||
_ => break resp,
|
||||
};
|
||||
};
|
||||
|
||||
if let Some(err) = resp.get("error") {
|
||||
return Err(Error::Protocol(err.clone()));
|
||||
}
|
||||
|
||||
let mut resp = self.recv(&receiver, req.id)?;
|
||||
Ok(resp["result"].take())
|
||||
}
|
||||
|
||||
/// Execute a queue of calls stored in a [`Batch`](../batch/struct.Batch.html) struct. Returns
|
||||
/// `Ok()` **only if** all of the calls are successful. The order of the JSON `Value`s returned
|
||||
/// reflects the order in which the calls were made on the `Batch` struct.
|
||||
pub fn batch_call(&mut self, batch: Batch) -> Result<Vec<serde_json::Value>, Error> {
|
||||
let mut id_map = BTreeMap::new();
|
||||
pub fn batch_call(&self, batch: Batch) -> Result<Vec<serde_json::Value>, Error> {
|
||||
let mut raw = Vec::new();
|
||||
|
||||
let mut missing_responses = HashSet::new();
|
||||
let mut answer = Vec::new();
|
||||
|
||||
for (i, (method, params)) in batch.into_iter().enumerate() {
|
||||
let req = Request::new_id(i, &method, params);
|
||||
// Add our listener to the map before we send the request, Here we will clone the sender
|
||||
// for every request id, so that we only have to monitor one receiver.
|
||||
let (sender, receiver) = channel();
|
||||
|
||||
for (method, params) in batch.into_iter() {
|
||||
let req = Request::new_id(self.last_id.fetch_add(1, Ordering::SeqCst), &method, params);
|
||||
missing_responses.insert(req.id);
|
||||
|
||||
self.waiting_map
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(req.id, sender.clone());
|
||||
|
||||
raw.append(&mut serde_json::to_vec(&req)?);
|
||||
raw.extend_from_slice(b"\n");
|
||||
}
|
||||
|
||||
id_map.insert(req.id, method);
|
||||
if missing_responses.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
trace!("==> {}", String::from_utf8_lossy(&raw));
|
||||
|
||||
self.stream.write_all(&raw)?;
|
||||
self.stream.flush()?;
|
||||
let mut stream = self.stream.lock().unwrap();
|
||||
stream.write_all(&raw)?;
|
||||
stream.flush()?;
|
||||
drop(stream); // release the lock
|
||||
|
||||
self.increment_calls();
|
||||
|
||||
while answer.len() < id_map.len() {
|
||||
let raw = self.recv()?;
|
||||
let mut resp: serde_json::Value = serde_json::from_slice(&raw)?;
|
||||
while !missing_responses.is_empty() {
|
||||
let resp = self.recv(&receiver, *missing_responses.iter().nth(0).unwrap())?;
|
||||
let resp_id = resp["id"].as_u64().unwrap() as usize;
|
||||
|
||||
let resp = match resp["id"].as_u64() {
|
||||
Some(id) if id_map.contains_key(&(id as usize)) => resp,
|
||||
_ => {
|
||||
self.handle_notification(
|
||||
resp["method"].take().as_str().unwrap_or(""),
|
||||
resp["result"].take(),
|
||||
)?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(err) = resp.get("error") {
|
||||
return Err(Error::Protocol(err.clone()));
|
||||
}
|
||||
|
||||
answer.push(resp.clone());
|
||||
missing_responses.remove(&resp_id);
|
||||
answer.push(resp);
|
||||
}
|
||||
|
||||
answer.sort_by(|a, b| a["id"].as_u64().partial_cmp(&b["id"].as_u64()).unwrap());
|
||||
|
||||
let answer = answer.into_iter().map(|mut x| x["result"].take()).collect();
|
||||
|
||||
Ok(answer)
|
||||
}
|
||||
|
||||
fn recv(&mut self) -> io::Result<Vec<u8>> {
|
||||
let mut resp = String::new();
|
||||
self.buf_reader.read_line(&mut resp)?;
|
||||
fn recv(
|
||||
&self,
|
||||
receiver: &Receiver<ChannelMessage>,
|
||||
req_id: usize,
|
||||
) -> Result<serde_json::Value, Error> {
|
||||
loop {
|
||||
// Try to take the lock on the reader. If we manage to do so, we'll become the reader
|
||||
// thread until we get our reponse
|
||||
match self._reader_thread(Some(req_id)) {
|
||||
Ok(response) => break Ok(response),
|
||||
Err(Error::CouldntLockReader) => {
|
||||
match receiver.recv() {
|
||||
// Received our response, returning it
|
||||
Ok(ChannelMessage::Response(received)) => break Ok(received),
|
||||
Ok(ChannelMessage::WakeUp) => {
|
||||
// We have been woken up, this means that we should try becoming the
|
||||
// reader thread ourselves
|
||||
trace!("WakeUp for {}", req_id);
|
||||
|
||||
trace!("<== {}", resp);
|
||||
|
||||
Ok(resp.as_bytes().to_vec())
|
||||
continue;
|
||||
}
|
||||
e @ Err(_) => e.map(|_| ()).expect("Error receiving from channel"), // panic if there's something wrong with the channels
|
||||
}
|
||||
}
|
||||
e @ Err(_) => break e,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_notification(
|
||||
&mut self,
|
||||
method: &str,
|
||||
result: serde_json::Value,
|
||||
) -> Result<(), Error> {
|
||||
fn handle_notification(&self, method: &str, result: serde_json::Value) -> Result<(), Error> {
|
||||
match method {
|
||||
"blockchain.headers.subscribe" => {
|
||||
self.headers.push_back(serde_json::from_value(result)?)
|
||||
}
|
||||
"blockchain.headers.subscribe" => self.headers.lock().unwrap().append(
|
||||
&mut serde_json::from_value::<Vec<HeaderNotification>>(result)?
|
||||
.into_iter()
|
||||
.collect(),
|
||||
),
|
||||
"blockchain.scripthash.subscribe" => {
|
||||
let unserialized: ScriptNotification = serde_json::from_value(result)?;
|
||||
let mut script_notifications = self.script_notifications.lock().unwrap();
|
||||
|
||||
let queue = self
|
||||
.script_notifications
|
||||
let queue = script_notifications
|
||||
.get_mut(&unserialized.scripthash)
|
||||
.ok_or_else(|| Error::NotSubscribed(unserialized.scripthash))?;
|
||||
|
||||
@ -360,28 +510,13 @@ impl<S: Read + Write> Client<S> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Tries to read from the read buffer if any notifications were received since the last call
|
||||
/// or `poll`, and processes them
|
||||
pub fn poll(&mut self) -> Result<(), Error> {
|
||||
// try to pull data from the stream
|
||||
self.buf_reader.fill_buf()?;
|
||||
|
||||
while !self.buf_reader.buffer().is_empty() {
|
||||
let raw = self.recv()?;
|
||||
let mut resp: serde_json::Value = serde_json::from_slice(&raw)?;
|
||||
|
||||
match resp["method"].take().as_str() {
|
||||
Some(ref method) => self.handle_notification(method, resp["params"].take())?,
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Subscribes to notifications for new block headers, by sending a `blockchain.headers.subscribe` call.
|
||||
pub fn block_headers_subscribe(&mut self) -> Result<HeaderNotification, Error> {
|
||||
let req = Request::new("blockchain.headers.subscribe", vec![]);
|
||||
pub fn block_headers_subscribe(&self) -> Result<HeaderNotification, Error> {
|
||||
let req = Request::new_id(
|
||||
self.last_id.fetch_add(1, Ordering::SeqCst),
|
||||
"blockchain.headers.subscribe",
|
||||
vec![],
|
||||
);
|
||||
let value = self.call(req)?;
|
||||
|
||||
Ok(serde_json::from_value(value)?)
|
||||
@ -389,20 +524,22 @@ impl<S: Read + Write> Client<S> {
|
||||
|
||||
/// Tries to pop one queued notification for a new block header that we might have received.
|
||||
/// Returns `None` if there are no items in the queue.
|
||||
pub fn block_headers_poll(&mut self) -> Result<Option<HeaderNotification>, Error> {
|
||||
self.poll()?;
|
||||
|
||||
Ok(self.headers.pop_front())
|
||||
pub fn block_headers_pop(&self) -> Result<Option<HeaderNotification>, Error> {
|
||||
Ok(self.headers.lock().unwrap().pop_front())
|
||||
}
|
||||
|
||||
/// Gets the block header for height `height`.
|
||||
pub fn block_header(&mut self, height: usize) -> Result<BlockHeader, Error> {
|
||||
pub fn block_header(&self, height: usize) -> Result<BlockHeader, Error> {
|
||||
Ok(deserialize(&self.block_header_raw(height)?)?)
|
||||
}
|
||||
|
||||
/// Gets the raw bytes of block header for height `height`.
|
||||
pub fn block_header_raw(&mut self, height: usize) -> Result<Vec<u8>, Error> {
|
||||
let req = Request::new("blockchain.block.header", vec![Param::Usize(height)]);
|
||||
pub fn block_header_raw(&self, height: usize) -> Result<Vec<u8>, Error> {
|
||||
let req = Request::new_id(
|
||||
self.last_id.fetch_add(1, Ordering::SeqCst),
|
||||
"blockchain.block.header",
|
||||
vec![Param::Usize(height)],
|
||||
);
|
||||
let result = self.call(req)?;
|
||||
|
||||
Ok(Vec::<u8>::from_hex(
|
||||
@ -413,12 +550,9 @@ impl<S: Read + Write> Client<S> {
|
||||
}
|
||||
|
||||
/// Tries to fetch `count` block headers starting from `start_height`.
|
||||
pub fn block_headers(
|
||||
&mut self,
|
||||
start_height: usize,
|
||||
count: usize,
|
||||
) -> Result<GetHeadersRes, Error> {
|
||||
let req = Request::new(
|
||||
pub fn block_headers(&self, start_height: usize, count: usize) -> Result<GetHeadersRes, Error> {
|
||||
let req = Request::new_id(
|
||||
self.last_id.fetch_add(1, Ordering::SeqCst),
|
||||
"blockchain.block.headers",
|
||||
vec![Param::Usize(start_height), Param::Usize(count)],
|
||||
);
|
||||
@ -437,8 +571,12 @@ impl<S: Read + Write> Client<S> {
|
||||
}
|
||||
|
||||
/// Estimates the fee required in **Satoshis per kilobyte** to confirm a transaction in `number` blocks.
|
||||
pub fn estimate_fee(&mut self, number: usize) -> Result<f64, Error> {
|
||||
let req = Request::new("blockchain.estimatefee", vec![Param::Usize(number)]);
|
||||
pub fn estimate_fee(&self, number: usize) -> Result<f64, Error> {
|
||||
let req = Request::new_id(
|
||||
self.last_id.fetch_add(1, Ordering::SeqCst),
|
||||
"blockchain.estimatefee",
|
||||
vec![Param::Usize(number)],
|
||||
);
|
||||
let result = self.call(req)?;
|
||||
|
||||
result
|
||||
@ -447,8 +585,12 @@ impl<S: Read + Write> Client<S> {
|
||||
}
|
||||
|
||||
/// Returns the minimum accepted fee by the server's node in **Bitcoin, not Satoshi**.
|
||||
pub fn relay_fee(&mut self) -> Result<f64, Error> {
|
||||
let req = Request::new("blockchain.relayfee", vec![]);
|
||||
pub fn relay_fee(&self) -> Result<f64, Error> {
|
||||
let req = Request::new_id(
|
||||
self.last_id.fetch_add(1, Ordering::SeqCst),
|
||||
"blockchain.relayfee",
|
||||
vec![],
|
||||
);
|
||||
let result = self.call(req)?;
|
||||
|
||||
result
|
||||
@ -463,17 +605,18 @@ impl<S: Read + Write> Client<S> {
|
||||
///
|
||||
/// Returns [`Error::AlreadySubscribed`](../types/enum.Error.html#variant.AlreadySubscribed) if
|
||||
/// already subscribed to the same script.
|
||||
pub fn script_subscribe(&mut self, script: &Script) -> Result<ScriptStatus, Error> {
|
||||
pub fn script_subscribe(&self, script: &Script) -> Result<Option<ScriptStatus>, Error> {
|
||||
let script_hash = script.to_electrum_scripthash();
|
||||
let mut script_notifications = self.script_notifications.lock().unwrap();
|
||||
|
||||
if self.script_notifications.contains_key(&script_hash) {
|
||||
if script_notifications.contains_key(&script_hash) {
|
||||
return Err(Error::AlreadySubscribed(script_hash));
|
||||
}
|
||||
|
||||
self.script_notifications
|
||||
.insert(script_hash.clone(), VecDeque::new());
|
||||
script_notifications.insert(script_hash.clone(), VecDeque::new());
|
||||
|
||||
let req = Request::new(
|
||||
let req = Request::new_id(
|
||||
self.last_id.fetch_add(1, Ordering::SeqCst),
|
||||
"blockchain.scripthash.subscribe",
|
||||
vec![Param::String(script_hash.to_hex())],
|
||||
);
|
||||
@ -488,41 +631,50 @@ impl<S: Read + Write> Client<S> {
|
||||
///
|
||||
/// Returns [`Error::NotSubscribed`](../types/enum.Error.html#variant.NotSubscribed) if
|
||||
/// not subscribed to the script.
|
||||
pub fn script_unsubscribe(&mut self, script: &Script) -> Result<bool, Error> {
|
||||
pub fn script_unsubscribe(&self, script: &Script) -> Result<bool, Error> {
|
||||
let script_hash = script.to_electrum_scripthash();
|
||||
let mut script_notifications = self.script_notifications.lock().unwrap();
|
||||
|
||||
if !self.script_notifications.contains_key(&script_hash) {
|
||||
if !script_notifications.contains_key(&script_hash) {
|
||||
return Err(Error::NotSubscribed(script_hash));
|
||||
}
|
||||
|
||||
let req = Request::new(
|
||||
let req = Request::new_id(
|
||||
self.last_id.fetch_add(1, Ordering::SeqCst),
|
||||
"blockchain.scripthash.unsubscribe",
|
||||
vec![Param::String(script_hash.to_hex())],
|
||||
);
|
||||
let value = self.call(req)?;
|
||||
let answer = serde_json::from_value(value)?;
|
||||
|
||||
self.script_notifications.remove(&script_hash);
|
||||
script_notifications.remove(&script_hash);
|
||||
|
||||
Ok(answer)
|
||||
}
|
||||
|
||||
/// Tries to pop one queued notification for a the requested script. Returns `None` if there are no items in the queue.
|
||||
pub fn script_poll(&mut self, script: &Script) -> Result<Option<ScriptStatus>, Error> {
|
||||
self.poll()?;
|
||||
|
||||
pub fn script_pop(&self, script: &Script) -> Result<Option<ScriptStatus>, Error> {
|
||||
let script_hash = script.to_electrum_scripthash();
|
||||
|
||||
match self.script_notifications.get_mut(&script_hash) {
|
||||
match self
|
||||
.script_notifications
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get_mut(&script_hash)
|
||||
{
|
||||
None => Err(Error::NotSubscribed(script_hash)),
|
||||
Some(queue) => Ok(queue.pop_front()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the balance for a *scriptPubKey*
|
||||
pub fn script_get_balance(&mut self, script: &Script) -> Result<GetBalanceRes, Error> {
|
||||
pub fn script_get_balance(&self, script: &Script) -> Result<GetBalanceRes, Error> {
|
||||
let params = vec![Param::String(script.to_electrum_scripthash().to_hex())];
|
||||
let req = Request::new("blockchain.scripthash.get_balance", params);
|
||||
let req = Request::new_id(
|
||||
self.last_id.fetch_add(1, Ordering::SeqCst),
|
||||
"blockchain.scripthash.get_balance",
|
||||
params,
|
||||
);
|
||||
let result = self.call(req)?;
|
||||
|
||||
Ok(serde_json::from_value(result)?)
|
||||
@ -530,10 +682,7 @@ impl<S: Read + Write> Client<S> {
|
||||
/// Batch version of [`script_get_balance`](#method.script_get_balance).
|
||||
///
|
||||
/// Takes a list of scripts and returns a list of balance responses.
|
||||
pub fn batch_script_get_balance<'s, I>(
|
||||
&mut self,
|
||||
scripts: I,
|
||||
) -> Result<Vec<GetBalanceRes>, Error>
|
||||
pub fn batch_script_get_balance<'s, I>(&self, scripts: I) -> Result<Vec<GetBalanceRes>, Error>
|
||||
where
|
||||
I: IntoIterator<Item = &'s Script>,
|
||||
{
|
||||
@ -541,9 +690,13 @@ impl<S: Read + Write> Client<S> {
|
||||
}
|
||||
|
||||
/// Returns the history for a *scriptPubKey*
|
||||
pub fn script_get_history(&mut self, script: &Script) -> Result<Vec<GetHistoryRes>, Error> {
|
||||
pub fn script_get_history(&self, script: &Script) -> Result<Vec<GetHistoryRes>, Error> {
|
||||
let params = vec![Param::String(script.to_electrum_scripthash().to_hex())];
|
||||
let req = Request::new("blockchain.scripthash.get_history", params);
|
||||
let req = Request::new_id(
|
||||
self.last_id.fetch_add(1, Ordering::SeqCst),
|
||||
"blockchain.scripthash.get_history",
|
||||
params,
|
||||
);
|
||||
let result = self.call(req)?;
|
||||
|
||||
Ok(serde_json::from_value(result)?)
|
||||
@ -552,7 +705,7 @@ impl<S: Read + Write> Client<S> {
|
||||
///
|
||||
/// Takes a list of scripts and returns a list of history responses.
|
||||
pub fn batch_script_get_history<'s, I>(
|
||||
&mut self,
|
||||
&self,
|
||||
scripts: I,
|
||||
) -> Result<Vec<Vec<GetHistoryRes>>, Error>
|
||||
where
|
||||
@ -562,9 +715,13 @@ impl<S: Read + Write> Client<S> {
|
||||
}
|
||||
|
||||
/// Returns the list of unspent outputs for a *scriptPubKey*
|
||||
pub fn script_list_unspent(&mut self, script: &Script) -> Result<Vec<ListUnspentRes>, Error> {
|
||||
pub fn script_list_unspent(&self, script: &Script) -> Result<Vec<ListUnspentRes>, Error> {
|
||||
let params = vec![Param::String(script.to_electrum_scripthash().to_hex())];
|
||||
let req = Request::new("blockchain.scripthash.listunspent", params);
|
||||
let req = Request::new_id(
|
||||
self.last_id.fetch_add(1, Ordering::SeqCst),
|
||||
"blockchain.scripthash.listunspent",
|
||||
params,
|
||||
);
|
||||
let result = self.call(req)?;
|
||||
|
||||
Ok(serde_json::from_value(result)?)
|
||||
@ -574,7 +731,7 @@ impl<S: Read + Write> Client<S> {
|
||||
///
|
||||
/// Takes a list of scripts and returns a list of a list of utxos.
|
||||
pub fn batch_script_list_unspent<'s, I>(
|
||||
&mut self,
|
||||
&self,
|
||||
scripts: I,
|
||||
) -> Result<Vec<Vec<ListUnspentRes>>, Error>
|
||||
where
|
||||
@ -584,14 +741,18 @@ impl<S: Read + Write> Client<S> {
|
||||
}
|
||||
|
||||
/// Gets the transaction with `txid`. Returns an error if not found.
|
||||
pub fn transaction_get(&mut self, txid: &Txid) -> Result<Transaction, Error> {
|
||||
pub fn transaction_get(&self, txid: &Txid) -> Result<Transaction, Error> {
|
||||
Ok(deserialize(&self.transaction_get_raw(txid)?)?)
|
||||
}
|
||||
|
||||
/// Gets the raw bytes of a transaction with `txid`. Returns an error if not found.
|
||||
pub fn transaction_get_raw(&mut self, txid: &Txid) -> Result<Vec<u8>, Error> {
|
||||
pub fn transaction_get_raw(&self, txid: &Txid) -> Result<Vec<u8>, Error> {
|
||||
let params = vec![Param::String(txid.to_hex())];
|
||||
let req = Request::new("blockchain.transaction.get", params);
|
||||
let req = Request::new_id(
|
||||
self.last_id.fetch_add(1, Ordering::SeqCst),
|
||||
"blockchain.transaction.get",
|
||||
params,
|
||||
);
|
||||
let result = self.call(req)?;
|
||||
|
||||
Ok(Vec::<u8>::from_hex(
|
||||
@ -604,7 +765,7 @@ impl<S: Read + Write> Client<S> {
|
||||
/// Batch version of [`transaction_get`](#method.transaction_get).
|
||||
///
|
||||
/// Takes a list of `txids` and returns a list of transactions.
|
||||
pub fn batch_transaction_get<'t, I>(&mut self, txids: I) -> Result<Vec<Transaction>, Error>
|
||||
pub fn batch_transaction_get<'t, I>(&self, txids: I) -> Result<Vec<Transaction>, Error>
|
||||
where
|
||||
I: IntoIterator<Item = &'t Txid>,
|
||||
{
|
||||
@ -617,7 +778,7 @@ impl<S: Read + Write> Client<S> {
|
||||
/// Batch version of [`transaction_get_raw`](#method.transaction_get_raw).
|
||||
///
|
||||
/// Takes a list of `txids` and returns a list of transactions raw bytes.
|
||||
pub fn batch_transaction_get_raw<'t, I>(&mut self, txids: I) -> Result<Vec<Vec<u8>>, Error>
|
||||
pub fn batch_transaction_get_raw<'t, I>(&self, txids: I) -> Result<Vec<Vec<u8>>, Error>
|
||||
where
|
||||
I: IntoIterator<Item = &'t Txid>,
|
||||
{
|
||||
@ -631,7 +792,7 @@ impl<S: Read + Write> Client<S> {
|
||||
/// Batch version of [`block_header_raw`](#method.block_header_raw).
|
||||
///
|
||||
/// Takes a list of `heights` of blocks and returns a list of block header raw bytes.
|
||||
pub fn batch_block_header_raw<'s, I>(&mut self, heights: I) -> Result<Vec<Vec<u8>>, Error>
|
||||
pub fn batch_block_header_raw<'s, I>(&self, heights: I) -> Result<Vec<Vec<u8>>, Error>
|
||||
where
|
||||
I: IntoIterator<Item = u32>,
|
||||
{
|
||||
@ -646,7 +807,7 @@ impl<S: Read + Write> Client<S> {
|
||||
/// Batch version of [`block_header`](#method.block_header).
|
||||
///
|
||||
/// Takes a list of `heights` of blocks and returns a list of headers.
|
||||
pub fn batch_block_header<'s, I>(&mut self, heights: I) -> Result<Vec<BlockHeader>, Error>
|
||||
pub fn batch_block_header<'s, I>(&self, heights: I) -> Result<Vec<BlockHeader>, Error>
|
||||
where
|
||||
I: IntoIterator<Item = u32>,
|
||||
{
|
||||
@ -660,7 +821,7 @@ impl<S: Read + Write> Client<S> {
|
||||
///
|
||||
/// Takes a list of `numbers` of blocks and returns a list of fee required in
|
||||
/// **Satoshis per kilobyte** to confirm a transaction in the given number of blocks.
|
||||
pub fn batch_estimate_fee<'s, I>(&mut self, numbers: I) -> Result<Vec<f64>, Error>
|
||||
pub fn batch_estimate_fee<'s, I>(&self, numbers: I) -> Result<Vec<f64>, Error>
|
||||
where
|
||||
I: IntoIterator<Item = usize>,
|
||||
{
|
||||
@ -668,36 +829,48 @@ impl<S: Read + Write> Client<S> {
|
||||
}
|
||||
|
||||
/// Broadcasts the raw bytes of a transaction to the network.
|
||||
pub fn transaction_broadcast_raw(&mut self, raw_tx: &[u8]) -> Result<Txid, Error> {
|
||||
pub fn transaction_broadcast_raw(&self, raw_tx: &[u8]) -> Result<Txid, Error> {
|
||||
let params = vec![Param::String(raw_tx.to_hex())];
|
||||
let req = Request::new("blockchain.transaction.broadcast", params);
|
||||
let req = Request::new_id(
|
||||
self.last_id.fetch_add(1, Ordering::SeqCst),
|
||||
"blockchain.transaction.broadcast",
|
||||
params,
|
||||
);
|
||||
let result = self.call(req)?;
|
||||
|
||||
Ok(serde_json::from_value(result)?)
|
||||
}
|
||||
|
||||
/// Broadcasts a transaction to the network.
|
||||
pub fn transaction_broadcast(&mut self, tx: &Transaction) -> Result<Txid, Error> {
|
||||
pub fn transaction_broadcast(&self, tx: &Transaction) -> Result<Txid, Error> {
|
||||
let buffer: Vec<u8> = serialize(tx);
|
||||
self.transaction_broadcast_raw(&buffer)
|
||||
}
|
||||
|
||||
/// Returns the merkle path for the transaction `txid` confirmed in the block at `height`.
|
||||
pub fn transaction_get_merkle(
|
||||
&mut self,
|
||||
&self,
|
||||
txid: &Txid,
|
||||
height: usize,
|
||||
) -> Result<GetMerkleRes, Error> {
|
||||
let params = vec![Param::String(txid.to_hex()), Param::Usize(height)];
|
||||
let req = Request::new("blockchain.transaction.get_merkle", params);
|
||||
let req = Request::new_id(
|
||||
self.last_id.fetch_add(1, Ordering::SeqCst),
|
||||
"blockchain.transaction.get_merkle",
|
||||
params,
|
||||
);
|
||||
let result = self.call(req)?;
|
||||
|
||||
Ok(serde_json::from_value(result)?)
|
||||
}
|
||||
|
||||
/// Returns the capabilities of the server.
|
||||
pub fn server_features(&mut self) -> Result<ServerFeaturesRes, Error> {
|
||||
let req = Request::new("server.features", vec![]);
|
||||
pub fn server_features(&self) -> Result<ServerFeaturesRes, Error> {
|
||||
let req = Request::new_id(
|
||||
self.last_id.fetch_add(1, Ordering::SeqCst),
|
||||
"server.features",
|
||||
vec![],
|
||||
);
|
||||
let result = self.call(req)?;
|
||||
|
||||
Ok(serde_json::from_value(result)?)
|
||||
|
||||
@ -20,6 +20,7 @@
|
||||
//! ```
|
||||
|
||||
pub extern crate bitcoin;
|
||||
extern crate core;
|
||||
extern crate log;
|
||||
#[cfg(feature = "use-openssl")]
|
||||
extern crate openssl;
|
||||
|
||||
35
src/types.rs
35
src/types.rs
@ -2,6 +2,8 @@
|
||||
//!
|
||||
//! This module contains definitions of all the complex data structures that are returned by calls
|
||||
|
||||
use std::ops::Deref;
|
||||
|
||||
use bitcoin::blockdata::block;
|
||||
use bitcoin::hashes::hex::FromHex;
|
||||
use bitcoin::hashes::{sha256, Hash};
|
||||
@ -23,6 +25,8 @@ pub enum Param {
|
||||
String(String),
|
||||
/// Boolean parameter
|
||||
Bool(bool),
|
||||
/// Bytes array parameter
|
||||
Bytes(Vec<u8>),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
@ -40,7 +44,7 @@ pub struct Request<'a> {
|
||||
|
||||
impl<'a> Request<'a> {
|
||||
/// Creates a new request with a default id
|
||||
pub fn new(method: &'a str, params: Vec<Param>) -> Self {
|
||||
fn new(method: &'a str, params: Vec<Param>) -> Self {
|
||||
Self {
|
||||
id: 0,
|
||||
jsonrpc: JSONRPC_2_0,
|
||||
@ -58,12 +62,31 @@ impl<'a> Request<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize)]
|
||||
pub struct Hex32Bytes(#[serde(deserialize_with = "from_hex")] [u8; 32]);
|
||||
|
||||
impl Deref for Hex32Bytes {
|
||||
type Target = [u8; 32];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl From<[u8; 32]> for Hex32Bytes {
|
||||
fn from(other: [u8; 32]) -> Hex32Bytes {
|
||||
Hex32Bytes(other)
|
||||
}
|
||||
}
|
||||
|
||||
/// Format used by the Electrum server to identify an address. The reverse sha256 hash of the
|
||||
/// scriptPubKey. Documented [here](https://electrumx.readthedocs.io/en/latest/protocol-basics.html#script-hashes).
|
||||
pub type ScriptHash = [u8; 32];
|
||||
pub type ScriptHash = Hex32Bytes;
|
||||
|
||||
/// Binary blob that condenses all the activity of an address. Used to detect changes without
|
||||
/// having to compare potentially long lists of transactions.
|
||||
pub type ScriptStatus = [u8; 32];
|
||||
pub type ScriptStatus = Hex32Bytes;
|
||||
|
||||
/// Trait used to convert a struct into the Electrum representation of an address
|
||||
pub trait ToElectrumScriptHash {
|
||||
@ -76,7 +99,7 @@ impl ToElectrumScriptHash for Script {
|
||||
let mut result = sha256::Hash::hash(self.as_bytes()).into_inner();
|
||||
result.reverse();
|
||||
|
||||
result
|
||||
result.into()
|
||||
}
|
||||
}
|
||||
|
||||
@ -243,6 +266,10 @@ pub enum Error {
|
||||
/// Missing domain while it was explicitly asked to validate it
|
||||
MissingDomain,
|
||||
|
||||
/// Couldn't take a lock on the reader mutex. This means that there's already another reader
|
||||
/// thread running
|
||||
CouldntLockReader,
|
||||
|
||||
#[cfg(feature = "use-openssl")]
|
||||
/// Invalid OpenSSL method used
|
||||
InvalidSslMethod(openssl::error::ErrorStack),
|
||||
|
||||
Loading…
Reference in New Issue
Block a user