Track transactions, balances, utxos per address

This commit is contained in:
Mononaut 2023-09-11 18:47:24 +00:00
parent 0314830967
commit 4c625d6c96
No known key found for this signature in database
GPG Key ID: A3F058E41374C04E
4 changed files with 241 additions and 21 deletions

171
src/address.ts Normal file
View File

@ -0,0 +1,171 @@
import { AddressState, Transaction, Utxo } from "./interfaces";
/**
* Utility class for keeping track of address state
* via idempotent "add" and "remove" transaction events
*/
export class AddressTracker {
private address: string;
private transactions: Map<string, Transaction>;
private balance: {
total: number;
confirmed: number;
mempool: number;
};
private utxos: Map<string, Utxo>;
// Map of spent inputs for which we haven't yet seen
// the corresponding output.
private spent: Set<string>;
// While loadingApi=true, websocket events are withheld in a pending queue
private loadingApi: boolean = true;
private pending: { event: 'add' | 'remove', tx?: Transaction, txid?: string }[] = [];
constructor(address: string) {
this.address = address;
this.transactions = new Map();
this.balance = {
total: 0,
confirmed: 0,
mempool: 0,
};
this.utxos = new Map();
this.spent = new Set();
}
/**
* Returns the current state of the address in a JSON-friendly format
*/
public getState(): AddressState {
return {
transactions: Array.from(this.transactions.values()),
balance: {
total: this.balance.total,
mempool: this.balance.mempool,
confirmed: this.balance.confirmed,
},
utxos: Array.from(this.utxos.values()),
};
}
public hasTransaction(txid: string): boolean {
return this.transactions.has(txid);
}
/**
* Update the address state with the effect of a transaction
*
* Idempotent, but the most recent confirmation status applies,
* so ordering matters
*/
public addTransaction(tx: Transaction, fromWs: boolean = false): void {
// delay websocket events until we finished processing transactions from the REST API
if (this.loadingApi && fromWs) {
this.pending.push({ event: 'add', tx });
return;
}
// if we already have this transaction
// undo the effects of that version before applying this one
if (this.transactions.has(tx.txid)) {
this.removeTransaction(tx.txid);
}
for (const vin of tx.vin) {
if (vin.prevout.scriptpubkey_address === this.address) {
const key = `${vin.txid}:${vin.vout}`;
const utxo = this.utxos.get(key);
if (utxo) {
this.utxos.delete(key);
this.balance[utxo.confirmed ? 'confirmed' : 'mempool'] -= utxo.value;
this.balance.total -= utxo.value;
} else {
// we're missing the utxo for this input
// record that so we don't double count it later
this.spent.add(key);
}
}
}
for (const [index, vout] of tx.vout.entries()) {
if (vout.scriptpubkey_address === this.address) {
const key = `${tx.txid}:${index}`;
// skip outputs we've already seen spent
if (!this.spent.delete(key)) {
this.balance[tx.status.confirmed ? 'confirmed' : 'mempool'] += vout.value;
this.balance.total += vout.value;
this.utxos.set(key, {
txid: tx.txid,
vout: index,
value: vout.value,
confirmed: tx.status.confirmed,
})
}
}
}
this.transactions.set(tx.txid, tx);
}
/**
* Undo the effect of a previously added transaction
*/
public removeTransaction(txid: string, live: boolean = false): void {
// delay processing 'live' transactions until we finished loading from the REST API
if (this.loadingApi && live) {
this.pending.push({ event: 'remove', txid });
return;
}
const tx = this.transactions.get(txid);
if (!tx) {
return;
}
this.transactions.delete(txid);
for (const vin of tx.vin) {
if (vin.prevout.scriptpubkey_address === this.address) {
const key = `${vin.txid}:${vin.vout}`;
const prevTx = this.transactions.get(vin.txid);
if (prevTx) {
this.balance[prevTx.status.confirmed ? 'confirmed' : 'mempool'] += vin.prevout.value;
this.balance.total += vin.prevout.value;
this.utxos.set(key, {
txid: vin.txid,
vout: vin.vout,
value: vin.prevout.value,
confirmed: prevTx.status.confirmed,
});
}
this.spent.delete(key);
}
}
for (const [index, vout] of tx.vout.entries()) {
if (vout.scriptpubkey_address === this.address) {
const key = `${tx.txid}:${index}`;
if (this.utxos.delete(key)) {
// this output was still unspent
this.balance[tx.status.confirmed ? 'confirmed' : 'mempool'] -= vout.value;
this.balance.total -= vout.value;
} else {
// record that the output is already spent
this.spent.add(key);
}
}
}
}
/**
* Call after all API transactions have been processed
*
* Drains any pending websocket events
*/
public onApiLoaded(): void {
this.loadingApi = false;
while (this.pending.length) {
const event = this.pending.shift();
if (event?.event === 'add' && event.tx) {
this.addTransaction(event.tx);
} else if (event?.event === 'remove' && event.txid) {
this.removeTransaction(event.txid);
}
}
}
}

View File

@ -1,13 +1,14 @@
import { MempoolApi } from './api';
import { AddressTxEvent, MempoolSocket } from './websocket';
import { MempoolOptions, Transaction } from './interfaces';
import { MempoolOptions, Transaction, AddressState, Utxo, WalletState } from './interfaces';
import { AddressTracker } from './address';
type AddressEvent = 'added' | 'confirmed' | 'removed' | 'changed';
export class MempoolClient {
private api: MempoolApi;
private ws: MempoolSocket;
private tracking: { [key: string]: Map<string, Transaction> } = {};
private tracking: { [key: string]: AddressTracker } = {};
private observerId = 0;
private observers: {
@ -24,9 +25,9 @@ export class MempoolClient {
constructor(options: MempoolOptions) {
this.api = new MempoolApi(options);
this.ws = new MempoolSocket(options);
this.ws.on(AddressTxEvent.mempool, (address, tx) => { this.onTransactionUnconfirmed(address, tx); });
this.ws.on(AddressTxEvent.confirmed, (address, tx) => { this.onTransactionConfirmed(address, tx); });
this.ws.on(AddressTxEvent.removed, (address, tx) => { this.onTransactionRemoved(address, tx); });
this.ws.on(AddressTxEvent.mempool, (address, tx) => { this.onTransactionUnconfirmed(address, tx, true); });
this.ws.on(AddressTxEvent.confirmed, (address, tx) => { this.onTransactionConfirmed(address, tx, true); });
this.ws.on(AddressTxEvent.removed, (address, tx) => { this.onTransactionRemoved(address, tx, true); });
}
public destroy(): void {
@ -35,31 +36,32 @@ export class MempoolClient {
this.ws.off(AddressTxEvent.removed);
}
private onTransactionUnconfirmed(address: string, tx: Transaction): void {
private onTransactionUnconfirmed(address: string, tx: Transaction, live: boolean = false): void {
if (address in this.tracking) {
this.tracking[address].addTransaction(tx, live);
Object.values(this.observers.added).forEach(observer => observer({address, tx}));
Object.values(this.observers.changed).forEach(observer => observer({event: 'added', address, tx}));
this.tracking[address].set(tx.txid, tx);
}
}
private onTransactionConfirmed(address: string, tx: Transaction): void {
private onTransactionConfirmed(address: string, tx: Transaction, live: boolean = false): void {
if (address in this.tracking) {
if (!this.tracking[address].has(tx.txid)) {
const isKnown = this.tracking[address].hasTransaction(tx.txid);
this.tracking[address].addTransaction(tx, live);
if (!isKnown) {
Object.values(this.observers.added).forEach(observer => observer({address, tx}));
Object.values(this.observers.changed).forEach(observer => observer({event: 'added', address, tx}));
}
Object.values(this.observers.confirmed).forEach(observer => observer({address, tx}));
Object.values(this.observers.changed).forEach(observer => observer({event: 'confirmed', address, tx}));
this.tracking[address].set(tx.txid, tx);
}
}
private onTransactionRemoved(address: string, tx: Transaction): void {
private onTransactionRemoved(address: string, tx: Transaction, live: boolean = false): void {
if (address in this.tracking) {
Object.values(this.observers.removed).forEach(observer => observer({address, tx}));
Object.values(this.observers.changed).forEach(observer => observer({event: 'removed', address, tx}));
this.tracking[address].delete(tx.txid);
this.tracking[address].removeTransaction(tx.txid, live);
}
}
@ -71,26 +73,51 @@ export class MempoolClient {
return () => { delete this.observers[event][oid]; };
}
public getTransactions(address: string): Transaction[] {
return Array.from(this.tracking[address].values());
public getAddressState(address: string): AddressState | null {
if (this.tracking[address]) {
return this.tracking[address].getState();
} else {
return null;
}
}
public getWalletState(): { [key: string]: Transaction[] } {
const addresses: { [address: string]: Transaction[] } = {};
public getWalletState(): WalletState {
const addresses: { [address: string]: AddressState } = {};
const transactions = new Map();
const balance = {
total: 0,
confirmed: 0,
mempool: 0,
};
let utxos: Utxo[] = [];
Object.keys(this.tracking).forEach(address => {
addresses[address] = Array.from(this.tracking[address].values());
if (this.tracking[address]) {
const addressState = this.tracking[address].getState();
addresses[address] = addressState;
balance.total += addressState.balance.total;
balance.confirmed += addressState.balance.confirmed;
balance.mempool += addressState.balance.mempool;
for (const tx of addressState.transactions) {
transactions.set(tx.txid, tx);
}
utxos = utxos.concat(addressState.utxos);
}
})
return addresses;
return {
addresses,
balance,
transactions: Array.from(transactions.values()),
utxos,
};
}
public async trackAddresses(addresses: string[]): Promise<void> {
console.log('starting to track addresses ', addresses)
addresses = addresses.filter(address => !(address in this.tracking));
if (!addresses.length) {
return;
}
for (const address of addresses) {
this.tracking[address] = new Map();
this.tracking[address] = new AddressTracker(address);
}
this.ws.trackAddresses(Object.keys(this.tracking));
for (const address of addresses) {
@ -102,6 +129,7 @@ export class MempoolClient {
this.onTransactionUnconfirmed(address, tx);
}
}
this.tracking[address].onApiLoaded();
}
}

View File

@ -56,6 +56,27 @@ export interface Status {
block_time?: number;
}
export interface Utxo {
txid: string,
vout: number,
value: number,
confirmed: boolean,
}
export interface AddressState {
transactions: Transaction[];
balance: {
total: number;
confirmed: number;
mempool: number;
};
utxos: Utxo[];
}
export interface WalletState extends AddressState {
addresses: { [address: string]: AddressState };
}
export interface AddressTransactionsResponse {
[address: string]: {
mempool: Transaction[];

View File

@ -120,7 +120,7 @@ export class MempoolSocket {
for (const [address, events] of Object.entries(addressTransactions)) {
for (const event in AddressTxEvent) {
if (this.addressTxCallbacks[event as AddressTxEvent]) {
for (const tx of events.mempool || []) {
for (const tx of events[event as AddressTxEvent] || []) {
this.addressTxCallbacks[event as AddressTxEvent]?.(address, tx);
}
}