Add BDK integration examples

This commit is contained in:
Mononaut 2023-10-14 02:10:45 +00:00
parent d8cc9fc54c
commit 45629c5eb7
No known key found for this signature in database
GPG Key ID: A3F058E41374C04E
6 changed files with 468 additions and 0 deletions

View File

@ -56,6 +56,14 @@ loop {
Also check out the `wasm_wallet_watcher` example crate.
# BDK
The library exposes a `MempoolAsync` struct, which wraps and extends the `AsyncClient` from the `esplora-client` crate, and is suitable for integration with BDK.
Check out the `bdk_mempool` and `bdk_mempool_wallet` example crates, which are analogous to https://github.com/bitcoindevkit/bdk/tree/master/crates/esplora and https://github.com/bitcoindevkit/bdk/tree/master/example-crates/wallet_esplora_async respectively.
## API
_(TODO)_

View File

@ -0,0 +1,22 @@
[package]
name = "bdk_mempool"
version = "0.3.0"
edition = "2021"
readme = "README.md"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bdk_chain = { path = "../../../bdk/crates/chain", version = "0.5.0", default-features = false, features = ["serde", "miniscript"] }
mwck = { version = "0.1.0", path = "../../" }
esplora-client = { version = "0.6.0", default-features = false, features = ["async-https"] }
async-trait = { version = "0.1.66" }
futures = { version = "0.3.26" }
# use these dependencies if you need to enable their /no-std features
bitcoin = { version = "0.30", optional = true, default-features = false }
miniscript = { version = "10.0.0", optional = true, default-features = false }
[features]
default = ["std"]
std = ["bdk_chain/std"]

View File

@ -0,0 +1,296 @@
use async_trait::async_trait;
use bdk_chain::collections::btree_map;
use bdk_chain::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
collections::{BTreeMap, BTreeSet},
local_chain::{self, CheckPoint},
BlockId, ConfirmationTimeAnchor, TxGraph,
};
use esplora_client::TxStatus;
use mwck::{MempoolAsync, wallet::Error};
use futures::{stream::FuturesOrdered, TryStreamExt};
use crate::{anchor_from_status, ASSUME_FINAL_DEPTH};
/// Trait to extend the functionality of [`esplora_client::AsyncClient`].
///
/// Refer to [crate-level documentation] for more.
///
/// [crate-level documentation]: crate
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait MempoolAsyncExt {
/// Prepare an [`LocalChain`] update with blocks fetched from Esplora.
///
/// * `prev_tip` is the previous tip of [`LocalChain::tip`].
/// * `get_heights` is the block heights that we are interested in fetching from Esplora.
///
/// The result of this method can be applied to [`LocalChain::apply_update`].
///
/// [`LocalChain`]: bdk_chain::local_chain::LocalChain
/// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip
/// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update
#[allow(clippy::result_large_err)]
async fn update_local_chain(
&self,
local_tip: Option<CheckPoint>,
request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
) -> Result<local_chain::Update, Error>;
/// Scan Esplora for the data specified and return a [`TxGraph`] and a map of last active
/// indices.
///
/// * `keychain_spks`: keychains that we want to scan transactions for
/// * `txids`: transactions for which we want updated [`ConfirmationTimeAnchor`]s
/// * `outpoints`: transactions associated with these outpoints (residing, spending) that we
/// want to include in the update
///
/// The scan for each keychain stops after a gap of `stop_gap` script pubkeys with no associated
/// transactions. `parallel_requests` specifies the max number of HTTP requests to make in
/// parallel.
#[allow(clippy::result_large_err)]
async fn update_tx_graph<K: Ord + Clone + Send>(
&self,
keychain_spks: BTreeMap<
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
>,
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
stop_gap: usize,
parallel_requests: usize,
) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error>;
/// Convenience method to call [`update_tx_graph`] without requiring a keychain.
///
/// [`update_tx_graph`]: EsploraAsyncExt::update_tx_graph
#[allow(clippy::result_large_err)]
async fn update_tx_graph_without_keychain(
&self,
misc_spks: impl IntoIterator<IntoIter = impl Iterator<Item = ScriptBuf> + Send> + Send,
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
parallel_requests: usize,
) -> Result<TxGraph<ConfirmationTimeAnchor>, Error> {
self.update_tx_graph(
[(
(),
misc_spks
.into_iter()
.enumerate()
.map(|(i, spk)| (i as u32, spk)),
)]
.into(),
txids,
outpoints,
usize::MAX,
parallel_requests,
)
.await
.map(|(g, _)| g)
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl MempoolAsyncExt for MempoolAsync {
async fn update_local_chain(
&self,
local_tip: Option<CheckPoint>,
request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
) -> Result<local_chain::Update, Error> {
let request_heights = request_heights.into_iter().collect::<BTreeSet<_>>();
let new_tip_height = self.get_height().await?;
// atomically fetch blocks from esplora
let mut fetched_blocks: BTreeMap<u32, BlockHash> = {
let heights = (0..=new_tip_height).rev();
let hashes = self
.get_blocks(Some(new_tip_height))
.await?
.into_iter()
.map(|b| b.id);
heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>()
};
// fetch heights that the caller is interested in
for height in request_heights {
// do not fetch blocks higher than remote tip
if height > new_tip_height {
continue;
}
// only fetch what is missing
if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
let hash = self.get_block_hash(height).await?;
entry.insert(hash);
}
}
// find the earliest point of agreement between local chain and fetched chain
let earliest_agreement_cp = {
let mut earliest_agreement_cp = Option::<CheckPoint>::None;
if let Some(local_tip) = local_tip {
let local_tip_height = local_tip.height();
for local_cp in local_tip.iter() {
let local_block = local_cp.block_id();
// the updated hash (block hash at this height after the update), can either be:
// 1. a block that already existed in `fetched_blocks`
// 2. a block that exists locally and atleast has a depth of ASSUME_FINAL_DEPTH
// 3. otherwise we can freshly fetch the block from remote, which is safe as it
// is guaranteed that this would be at or below ASSUME_FINAL_DEPTH from the
// remote tip
let updated_hash = match fetched_blocks.entry(local_block.height) {
btree_map::Entry::Occupied(entry) => *entry.get(),
btree_map::Entry::Vacant(entry) => *entry.insert(
if local_tip_height - local_block.height >= ASSUME_FINAL_DEPTH {
local_block.hash
} else {
self.get_block_hash(local_block.height).await?
},
),
};
// since we may introduce blocks below the point of agreement, we cannot break
// here unconditionally - we only break if we guarantee there are no new heights
// below our current local checkpoint
if local_block.hash == updated_hash {
earliest_agreement_cp = Some(local_cp);
let first_new_height = *fetched_blocks
.keys()
.next()
.expect("must have atleast one new block");
if first_new_height >= local_block.height {
break;
}
}
}
}
earliest_agreement_cp
};
let tip = {
// first checkpoint to use for the update chain
let first_cp = match earliest_agreement_cp {
Some(cp) => cp,
None => {
let (&height, &hash) = fetched_blocks
.iter()
.next()
.expect("must have atleast one new block");
CheckPoint::new(BlockId { height, hash })
}
};
// transform fetched chain into the update chain
fetched_blocks
// we exclude anything at or below the first cp of the update chain otherwise
// building the chain will fail
.split_off(&(first_cp.height() + 1))
.into_iter()
.map(|(height, hash)| BlockId { height, hash })
.fold(first_cp, |prev_cp, block| {
prev_cp.push(block).expect("must extend checkpoint")
})
};
Ok(local_chain::Update {
tip,
introduce_older_blocks: true,
})
}
async fn update_tx_graph<K: Ord + Clone + Send>(
&self,
keychain_spks: BTreeMap<
K,
impl IntoIterator<IntoIter = impl Iterator<Item = (u32, ScriptBuf)> + Send> + Send,
>,
txids: impl IntoIterator<IntoIter = impl Iterator<Item = Txid> + Send> + Send,
outpoints: impl IntoIterator<IntoIter = impl Iterator<Item = OutPoint> + Send> + Send,
stop_gap: usize,
parallel_requests: usize,
) -> Result<(TxGraph<ConfirmationTimeAnchor>, BTreeMap<K, u32>), Error> {
let parallel_requests = Ord::max(parallel_requests, 1);
let mut graph = TxGraph::<ConfirmationTimeAnchor>::default();
let mut last_active_indexes = BTreeMap::<K, u32>::new();
for (keychain, spks) in keychain_spks {
let mut last_active_index = Option::<u32>::None;
// TODO parallelize this
for (index, spk) in spks {
let txs = self.mwck_confirmed_scripthash_txs(&spk).await?;
if !txs.is_empty() {
last_active_index = Some(index);
}
for tx in txs {
let _ = graph.insert_tx(tx.to_tx());
if let Some(anchor) = anchor_from_status(&tx.status) {
let _ = graph.insert_anchor(tx.txid, anchor);
}
}
if Some(index) > last_active_index.map(|i| i + stop_gap as u32) {
break;
}
}
if let Some(last_active_index) = last_active_index {
last_active_indexes.insert(keychain, last_active_index);
}
}
let mut txids = txids.into_iter();
loop {
let handles = txids
.by_ref()
.take(parallel_requests)
.filter(|&txid| graph.get_tx(txid).is_none())
.map(|txid| {
let client = self.clone();
async move { client.get_tx_status(&txid).await.map(|s| (txid, s)) }
})
.collect::<FuturesOrdered<_>>();
if handles.is_empty() {
break;
}
for (txid, status) in handles.try_collect::<Vec<(Txid, TxStatus)>>().await? {
if let Some(anchor) = anchor_from_status(&status) {
let _ = graph.insert_anchor(txid, anchor);
}
}
}
for op in outpoints.into_iter() {
if graph.get_tx(op.txid).is_none() {
if let Some(tx) = self.get_tx(&op.txid).await? {
let _ = graph.insert_tx(tx);
}
let status = self.get_tx_status(&op.txid).await?;
if let Some(anchor) = anchor_from_status(&status) {
let _ = graph.insert_anchor(op.txid, anchor);
}
}
if let Some(op_status) = self.get_output_status(&op.txid, op.vout as _).await? {
if let Some(txid) = op_status.txid {
if graph.get_tx(txid).is_none() {
if let Some(tx) = self.get_tx(&txid).await? {
let _ = graph.insert_tx(tx);
}
let status = self.get_tx_status(&txid).await?;
if let Some(anchor) = anchor_from_status(&status) {
let _ = graph.insert_anchor(txid, anchor);
}
}
}
}
}
Ok((graph, last_active_indexes))
}
}

View File

@ -0,0 +1,27 @@
use bdk_chain::{BlockId, ConfirmationTimeAnchor};
use esplora_client::TxStatus;
pub use mwck;
mod async_ext;
pub use async_ext::*;
const ASSUME_FINAL_DEPTH: u32 = 15;
fn anchor_from_status(status: &TxStatus) -> Option<ConfirmationTimeAnchor> {
if let TxStatus {
block_height: Some(height),
block_hash: Some(hash),
block_time: Some(time),
..
} = status.clone()
{
Some(ConfirmationTimeAnchor {
anchor_block: BlockId { height, hash },
confirmation_height: height,
confirmation_time: time,
})
} else {
None
}
}

View File

@ -0,0 +1,13 @@
[package]
name = "wallet_mempool_async"
version = "0.2.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bdk = { path = "../../../bdk/crates/bdk", version = "1.0.0-alpha.1" }
bdk_mempool = { path = "../bdk_mempool" }
bdk_file_store = { path = "../../../bdk/crates/file_store" }
tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros"] }
env_logger = "0.10.0"

View File

@ -0,0 +1,102 @@
use std::{io::Write, str::FromStr};
use bdk::{
bitcoin::{Address, Network},
chain::keychain::LocalUpdate,
wallet::AddressIndex,
SignOptions, Wallet,
};
use bdk_mempool::{mwck::MempoolAsync, MempoolAsyncExt};
use bdk_file_store::Store;
const DB_MAGIC: &str = "bdk_wallet_esplora_async_example";
const SEND_AMOUNT: u64 = 5000;
const STOP_GAP: usize = 50;
const PARALLEL_REQUESTS: usize = 5;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let db_path = std::env::temp_dir().join("bdk-esplora-async-example");
let db = Store::<bdk::wallet::ChangeSet>::new_from_path(DB_MAGIC.as_bytes(), db_path)?;
let external_descriptor = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/0'/0'/0/*)";
let internal_descriptor = "wpkh(tprv8ZgxMBicQKsPdy6LMhUtFHAgpocR8GC6QmwMSFpZs7h6Eziw3SpThFfczTDh5rW2krkqffa11UpX3XkeTTB2FvzZKWXqPY54Y6Rq4AQ5R8L/84'/0'/0'/1/*)";
let mut wallet = Wallet::new(
external_descriptor,
Some(internal_descriptor),
db,
Network::Testnet,
)?;
let address = wallet.get_address(AddressIndex::New);
println!("Generated Address: {}", address);
let balance = wallet.get_balance();
println!("Wallet balance before syncing: {} sats", balance.total());
println!("Connecting...");
let client = MempoolAsync::from_url("https://mempool.space/testnet/api");
client.connect(true).await;
println!("Syncing...");
let prev_tip = wallet.latest_checkpoint();
let keychain_spks = wallet
.spks_of_all_keychains()
.into_iter()
.map(|(k, k_spks)| {
let mut once = Some(());
let mut stdout = std::io::stdout();
let k_spks = k_spks
.inspect(move |(spk_i, _)| match once.take() {
Some(_) => print!("\nScanning keychain [{:?}]", k),
None => print!(" {:<3}", spk_i),
})
.inspect(move |_| stdout.flush().expect("must flush"));
(k, k_spks)
})
.collect();
let (update_graph, last_active_indices) = client
.update_tx_graph(keychain_spks, None, None, STOP_GAP, PARALLEL_REQUESTS)
.await?;
let missing_heights = wallet.tx_graph().missing_heights(wallet.local_chain());
let chain_update = client.update_local_chain(prev_tip, missing_heights).await?;
let update = LocalUpdate {
last_active_indices,
graph: update_graph,
..LocalUpdate::new(chain_update)
};
wallet.apply_update(update)?;
wallet.commit()?;
println!();
let balance = wallet.get_balance();
println!("Wallet balance after syncing: {} sats", balance.total());
if balance.total() < SEND_AMOUNT {
println!(
"Please send at least {} sats to the receiving address",
SEND_AMOUNT
);
std::process::exit(0);
}
let faucet_address = Address::from_str("mkHS9ne12qx9pS9VojpwU5xtRd4T7X7ZUt")?
.require_network(Network::Testnet)?;
let mut tx_builder = wallet.build_tx();
tx_builder
.add_recipient(faucet_address.script_pubkey(), SEND_AMOUNT)
.enable_rbf();
let (mut psbt, _) = tx_builder.finish()?;
let finalized = wallet.sign(&mut psbt, SignOptions::default())?;
assert!(finalized);
let tx = psbt.extract_tx();
client.broadcast(&tx).await?;
println!("Tx broadcasted! Txid: {}", tx.txid());
Ok(())
}