add privacy-preserving hourly aggregate of historical scan stats
This commit is contained in:
parent
ab75f165b1
commit
77c10e1600
@ -62,6 +62,7 @@ connect = true
|
||||
# memoryLimit = "8GB" # cap DuckDB memory usage (default: 80% of system RAM)
|
||||
# maxLabels = 10 # maximum number of labels accepted per silent payments subscription
|
||||
# maxSubscriptions = 100 # maximum number of silent payments subscriptions per connection
|
||||
# metricsEnabled = true # hourly aggregate scan stats log line (default: true)
|
||||
|
||||
[server]
|
||||
# host = "ssl://xyz.com:50002" # advertised in server.features; use array for multiple. Omit to advertise nothing.
|
||||
@ -118,6 +119,9 @@ The `maxLabels` setting caps the number of labels accepted per silent payments s
|
||||
The `maxSubscriptions` setting caps the number of silent payments subscriptions per connection (default: 100).
|
||||
Requests exceeding either limit are rejected with a JSON-RPC `-32602 Invalid params` error.
|
||||
|
||||
When `metricsEnabled` is true (default), Frigate emits one `Aggregate SP scan stats` log line per hour summarising historical scan throughput across all scans in the window.
|
||||
The output is bucketed by result count and duration, rounded, and suppresses any bucket with fewer than ten samples, so no per-client scan information is exposed. Set to `false` to disable the line entirely.
|
||||
|
||||
### Server
|
||||
|
||||
Listeners are configured as bind URLs in the scheme `tcp://` or `ssl://` followed by a host and port.
|
||||
|
||||
@ -39,6 +39,7 @@ public class Frigate {
|
||||
|
||||
private Index blocksIndex;
|
||||
private Index mempoolIndex;
|
||||
private IndexQuerier indexQuerier;
|
||||
private BitcoindClient bitcoindClient;
|
||||
private ElectrumServerRunnable electrumServer;
|
||||
|
||||
@ -73,7 +74,8 @@ public class Frigate {
|
||||
SSLContext sslContext = serverConfig.isSslEnabled() ? SslUtil.getServerSSLContext(serverConfig.getSslCertFile(), serverConfig.getSslKeyFile()) : null;
|
||||
InetSocketAddress tcpBind = toBindAddress(serverConfig.getTcpServer());
|
||||
InetSocketAddress sslBind = toBindAddress(serverConfig.getSslServer());
|
||||
electrumServer = new ElectrumServerRunnable(bitcoindClient, new IndexQuerier(blocksIndex, mempoolIndex), tcpBind, sslBind, sslContext);
|
||||
indexQuerier = new IndexQuerier(blocksIndex, mempoolIndex);
|
||||
electrumServer = new ElectrumServerRunnable(bitcoindClient, indexQuerier, tcpBind, sslBind, sslContext);
|
||||
Thread electrumServerThread = new Thread(electrumServer, "Frigate Electrum Server");
|
||||
electrumServerThread.setDaemon(false);
|
||||
electrumServerThread.start();
|
||||
@ -100,6 +102,9 @@ public class Frigate {
|
||||
if(electrumServer != null) {
|
||||
electrumServer.stop();
|
||||
}
|
||||
if(indexQuerier != null) {
|
||||
indexQuerier.close();
|
||||
}
|
||||
|
||||
running = false;
|
||||
}
|
||||
|
||||
@ -0,0 +1,95 @@
|
||||
package com.sparrowwallet.frigate.index;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicIntegerArray;
|
||||
|
||||
/**
|
||||
* Privacy-preserving aggregate counters for historical scan throughput.
|
||||
*/
|
||||
class HistoricalScanMetrics {
|
||||
private static final int MIN_SAMPLES_PER_BUCKET = 10;
|
||||
private static final int COUNT_ROUNDING = 10;
|
||||
|
||||
static final String[] RESULT_LABELS = {"0", "1-10", "11-100", "101-1000", "1001-10000", "10001+"};
|
||||
static final String[] DURATION_LABELS = {"0-100ms", "100-500ms", "500ms-2s", "2-10s", "10-60s", "60s+"};
|
||||
|
||||
private final AtomicIntegerArray resultBuckets = new AtomicIntegerArray(RESULT_LABELS.length);
|
||||
private final AtomicIntegerArray durationBuckets = new AtomicIntegerArray(DURATION_LABELS.length);
|
||||
|
||||
record Snapshot(int[] results, int[] durations) {}
|
||||
|
||||
void record(int resultCount, long durationMillis) {
|
||||
resultBuckets.incrementAndGet(resultBucket(resultCount));
|
||||
durationBuckets.incrementAndGet(durationBucket(durationMillis));
|
||||
}
|
||||
|
||||
//a record() concurrent with snapshotAndReset() may have its two increments split across windows. The per-bucket drift is at
|
||||
//most one sample per emission and never crosses subscription identity, so it is fine for an aggregate stat.
|
||||
Snapshot snapshotAndReset() {
|
||||
int[] results = new int[RESULT_LABELS.length];
|
||||
int[] durations = new int[DURATION_LABELS.length];
|
||||
for(int i = 0; i < results.length; i++) {
|
||||
results[i] = resultBuckets.getAndSet(i, 0);
|
||||
}
|
||||
for(int i = 0; i < durations.length; i++) {
|
||||
durations[i] = durationBuckets.getAndSet(i, 0);
|
||||
}
|
||||
return new Snapshot(results, durations);
|
||||
}
|
||||
|
||||
Optional<String> format(Snapshot snapshot) {
|
||||
String resultsStr = formatHist(snapshot.results(), RESULT_LABELS);
|
||||
String durationsStr = formatHist(snapshot.durations(), DURATION_LABELS);
|
||||
if(resultsStr.isEmpty() && durationsStr.isEmpty()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder("Aggregate SP scan stats (1h window):");
|
||||
if(!resultsStr.isEmpty()) {
|
||||
sb.append(" results [").append(resultsStr).append("]");
|
||||
}
|
||||
if(!durationsStr.isEmpty()) {
|
||||
sb.append(" duration [").append(durationsStr).append("]");
|
||||
}
|
||||
return Optional.of(sb.toString());
|
||||
}
|
||||
|
||||
static int resultBucket(int n) {
|
||||
if(n <= 0) return 0;
|
||||
if(n <= 10) return 1;
|
||||
if(n <= 100) return 2;
|
||||
if(n <= 1000) return 3;
|
||||
if(n <= 10000) return 4;
|
||||
return 5;
|
||||
}
|
||||
|
||||
static int durationBucket(long ms) {
|
||||
if(ms < 100) return 0;
|
||||
if(ms < 500) return 1;
|
||||
if(ms < 2000) return 2;
|
||||
if(ms < 10000) return 3;
|
||||
if(ms < 60000) return 4;
|
||||
return 5;
|
||||
}
|
||||
|
||||
private static String formatHist(int[] counts, String[] labels) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for(int i = 0; i < counts.length; i++) {
|
||||
int rounded = roundCount(counts[i]);
|
||||
if(rounded > 0) {
|
||||
if(sb.length() > 0) {
|
||||
sb.append(", ");
|
||||
}
|
||||
sb.append(labels[i]).append(":").append(rounded);
|
||||
}
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private static int roundCount(int raw) {
|
||||
if(raw < MIN_SAMPLES_PER_BUCKET) {
|
||||
return 0;
|
||||
}
|
||||
return ((raw + COUNT_ROUNDING / 2) / COUNT_ROUNDING) * COUNT_ROUNDING;
|
||||
}
|
||||
}
|
||||
@ -9,6 +9,7 @@ import com.sparrowwallet.frigate.electrum.SilentPaymentAddressSubscription;
|
||||
import com.sparrowwallet.frigate.electrum.SilentPaymentsNotification;
|
||||
import com.sparrowwallet.frigate.electrum.SilentPaymentsSubscription;
|
||||
import com.sparrowwallet.frigate.electrum.SilentPaymentsTxEntry;
|
||||
import com.sparrowwallet.frigate.io.Config;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -19,7 +20,9 @@ import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.BooleanSupplier;
|
||||
|
||||
public class IndexQuerier {
|
||||
@ -29,10 +32,25 @@ public class IndexQuerier {
|
||||
|
||||
private final Index blocksIndex;
|
||||
private final Index mempoolIndex;
|
||||
private final HistoricalScanMetrics metrics;
|
||||
private final ScheduledExecutorService metricsExecutor;
|
||||
|
||||
public IndexQuerier(Index blocksIndex, Index mempoolIndex) {
|
||||
this.blocksIndex = blocksIndex;
|
||||
this.mempoolIndex = mempoolIndex;
|
||||
|
||||
if(Config.get().getScan().isMetricsEnabled()) {
|
||||
this.metrics = new HistoricalScanMetrics();
|
||||
this.metricsExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
|
||||
Thread t = new ThreadFactoryBuilder().setNameFormat("IndexQueryMetrics-%d").build().newThread(r);
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
});
|
||||
this.metricsExecutor.scheduleAtFixedRate(this::emitMetrics, 1, 1, TimeUnit.HOURS);
|
||||
} else {
|
||||
this.metrics = null;
|
||||
this.metricsExecutor = null;
|
||||
}
|
||||
}
|
||||
|
||||
private final ExecutorService queryPool = Executors.newFixedThreadPool(10, r -> {
|
||||
@ -42,14 +60,30 @@ public class IndexQuerier {
|
||||
return t;
|
||||
});
|
||||
|
||||
private void emitMetrics() {
|
||||
try {
|
||||
metrics.format(metrics.snapshotAndReset()).ifPresent(log::info);
|
||||
} catch(Throwable t) {
|
||||
log.warn("Failed to emit scan metrics", t);
|
||||
}
|
||||
}
|
||||
|
||||
public void close() {
|
||||
if(metricsExecutor != null) {
|
||||
metricsExecutor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
public void startHistoryScan(SilentPaymentScanAddress scanAddress, Integer startHeight, Integer endHeight, SilentPaymentAddressSubscription subscription, WeakReference<SubscriptionStatus> subscriptionStatusRef, boolean isHistorical) {
|
||||
BooleanSupplier cancelled = subscription.captureScanCancellation();
|
||||
queryPool.submit(() -> {
|
||||
long startMillis = isHistorical ? System.currentTimeMillis() : 0L;
|
||||
try {
|
||||
SilentPaymentsSubscription notificationSubscription = new SilentPaymentsSubscription(scanAddress.toString(), subscription.getLabels().toArray(new Integer[0]), subscription.getStartHeight());
|
||||
List<SilentPaymentsTxEntry> history = blocksIndex.getHistoryAsync(scanAddress, notificationSubscription, startHeight, endHeight, null, subscriptionStatusRef, cancelled, isHistorical);
|
||||
List<SilentPaymentsTxEntry> mempoolHistory = getMempoolHistory(scanAddress, null, subscriptionStatusRef, notificationSubscription, cancelled);
|
||||
history.addAll(mempoolHistory);
|
||||
long scanDurationMillis = isHistorical ? System.currentTimeMillis() - startMillis : 0L;
|
||||
|
||||
boolean wasCancelled = cancelled.getAsBoolean();
|
||||
if(!wasCancelled && (isHistorical || !history.isEmpty())) {
|
||||
@ -57,6 +91,9 @@ public class IndexQuerier {
|
||||
}
|
||||
if(!wasCancelled && isHistorical) {
|
||||
subscription.markHistoricalComplete();
|
||||
if(metrics != null) {
|
||||
metrics.record(history.size(), scanDurationMillis);
|
||||
}
|
||||
}
|
||||
} catch(Throwable t) {
|
||||
log.error("History scan task failed for " + scanAddress + " (start=" + startHeight + ", end=" + endHeight + ", isHistorical=" + isHistorical + ")", t);
|
||||
|
||||
@ -390,6 +390,7 @@ public class Config {
|
||||
private String memoryLimit;
|
||||
private Integer maxLabels;
|
||||
private Integer maxSubscriptions;
|
||||
private Boolean metricsEnabled;
|
||||
|
||||
public int getBatchSize() {
|
||||
return batchSize != null ? batchSize : DEFAULT_BATCH_SIZE;
|
||||
@ -450,6 +451,14 @@ public class Config {
|
||||
public void setMemoryLimit(String memoryLimit) {
|
||||
this.memoryLimit = memoryLimit;
|
||||
}
|
||||
|
||||
public boolean isMetricsEnabled() {
|
||||
return metricsEnabled == null || metricsEnabled;
|
||||
}
|
||||
|
||||
public void setMetricsEnabled(Boolean metricsEnabled) {
|
||||
this.metricsEnabled = metricsEnabled;
|
||||
}
|
||||
}
|
||||
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
|
||||
@ -0,0 +1,147 @@
|
||||
package com.sparrowwallet.frigate.index;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class HistoricalScanMetricsTest {
|
||||
@Test
|
||||
public void resultBucketBoundaries() {
|
||||
assertEquals(0, HistoricalScanMetrics.resultBucket(0));
|
||||
assertEquals(1, HistoricalScanMetrics.resultBucket(1));
|
||||
assertEquals(1, HistoricalScanMetrics.resultBucket(10));
|
||||
assertEquals(2, HistoricalScanMetrics.resultBucket(11));
|
||||
assertEquals(2, HistoricalScanMetrics.resultBucket(100));
|
||||
assertEquals(3, HistoricalScanMetrics.resultBucket(101));
|
||||
assertEquals(3, HistoricalScanMetrics.resultBucket(1000));
|
||||
assertEquals(4, HistoricalScanMetrics.resultBucket(1001));
|
||||
assertEquals(4, HistoricalScanMetrics.resultBucket(10000));
|
||||
assertEquals(5, HistoricalScanMetrics.resultBucket(10001));
|
||||
}
|
||||
|
||||
//pair each sample input with the label the log line will use, so a future edit that changes a boundary without updating
|
||||
//the corresponding label (or vice versa) fails here instead of silently producing a misleading aggregate line.
|
||||
@Test
|
||||
public void resultLabelsMatchBoundaries() {
|
||||
assertEquals("0", HistoricalScanMetrics.RESULT_LABELS[HistoricalScanMetrics.resultBucket(0)]);
|
||||
assertEquals("1-10", HistoricalScanMetrics.RESULT_LABELS[HistoricalScanMetrics.resultBucket(5)]);
|
||||
assertEquals("11-100", HistoricalScanMetrics.RESULT_LABELS[HistoricalScanMetrics.resultBucket(50)]);
|
||||
assertEquals("101-1000", HistoricalScanMetrics.RESULT_LABELS[HistoricalScanMetrics.resultBucket(500)]);
|
||||
assertEquals("1001-10000", HistoricalScanMetrics.RESULT_LABELS[HistoricalScanMetrics.resultBucket(5000)]);
|
||||
assertEquals("10001+", HistoricalScanMetrics.RESULT_LABELS[HistoricalScanMetrics.resultBucket(50000)]);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void durationLabelsMatchBoundaries() {
|
||||
assertEquals("0-100ms", HistoricalScanMetrics.DURATION_LABELS[HistoricalScanMetrics.durationBucket(50)]);
|
||||
assertEquals("100-500ms", HistoricalScanMetrics.DURATION_LABELS[HistoricalScanMetrics.durationBucket(250)]);
|
||||
assertEquals("500ms-2s", HistoricalScanMetrics.DURATION_LABELS[HistoricalScanMetrics.durationBucket(1000)]);
|
||||
assertEquals("2-10s", HistoricalScanMetrics.DURATION_LABELS[HistoricalScanMetrics.durationBucket(5000)]);
|
||||
assertEquals("10-60s", HistoricalScanMetrics.DURATION_LABELS[HistoricalScanMetrics.durationBucket(30000)]);
|
||||
assertEquals("60s+", HistoricalScanMetrics.DURATION_LABELS[HistoricalScanMetrics.durationBucket(120000)]);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void durationBucketBoundaries() {
|
||||
assertEquals(0, HistoricalScanMetrics.durationBucket(0));
|
||||
assertEquals(0, HistoricalScanMetrics.durationBucket(99));
|
||||
assertEquals(1, HistoricalScanMetrics.durationBucket(100));
|
||||
assertEquals(1, HistoricalScanMetrics.durationBucket(499));
|
||||
assertEquals(2, HistoricalScanMetrics.durationBucket(500));
|
||||
assertEquals(2, HistoricalScanMetrics.durationBucket(1999));
|
||||
assertEquals(3, HistoricalScanMetrics.durationBucket(2000));
|
||||
assertEquals(3, HistoricalScanMetrics.durationBucket(9999));
|
||||
assertEquals(4, HistoricalScanMetrics.durationBucket(10000));
|
||||
assertEquals(4, HistoricalScanMetrics.durationBucket(59999));
|
||||
assertEquals(5, HistoricalScanMetrics.durationBucket(60000));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void thresholdSuppressesBelowTen() {
|
||||
HistoricalScanMetrics m = new HistoricalScanMetrics();
|
||||
for(int i = 0; i < 9; i++) {
|
||||
m.record(5, 200);
|
||||
}
|
||||
assertTrue(m.format(m.snapshotAndReset()).isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void exactlyTenEmitsAsTen() {
|
||||
HistoricalScanMetrics m = new HistoricalScanMetrics();
|
||||
for(int i = 0; i < 10; i++) {
|
||||
m.record(5, 200);
|
||||
}
|
||||
String line = m.format(m.snapshotAndReset()).orElseThrow();
|
||||
assertTrue(line.contains("1-10:10"), line);
|
||||
assertTrue(line.contains("100-500ms:10"), line);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fourteenRoundsDownToTen() {
|
||||
HistoricalScanMetrics m = new HistoricalScanMetrics();
|
||||
for(int i = 0; i < 14; i++) {
|
||||
m.record(5, 200);
|
||||
}
|
||||
assertTrue(m.format(m.snapshotAndReset()).orElseThrow().contains("1-10:10"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fifteenRoundsUpToTwenty() {
|
||||
HistoricalScanMetrics m = new HistoricalScanMetrics();
|
||||
for(int i = 0; i < 15; i++) {
|
||||
m.record(5, 200);
|
||||
}
|
||||
assertTrue(m.format(m.snapshotAndReset()).orElseThrow().contains("1-10:20"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void snapshotResetsCounters() {
|
||||
HistoricalScanMetrics m = new HistoricalScanMetrics();
|
||||
for(int i = 0; i < 20; i++) {
|
||||
m.record(5, 200);
|
||||
}
|
||||
m.snapshotAndReset();
|
||||
assertTrue(m.format(m.snapshotAndReset()).isEmpty());
|
||||
}
|
||||
|
||||
//spread result counts across all five non-zero buckets so no result bucket reaches threshold, while every duration lands in
|
||||
//the same bucket so it does — verifies one-sided emission.
|
||||
@Test
|
||||
public void onlyOneHistogramEmittedWhenOnlyOneClearsThreshold() {
|
||||
HistoricalScanMetrics m = new HistoricalScanMetrics();
|
||||
int[] resultsPerBucket = {0, 0, 5, 5, 50, 50, 500, 500, 5000, 5000};
|
||||
for(int n : resultsPerBucket) {
|
||||
m.record(n, 200);
|
||||
}
|
||||
String line = m.format(m.snapshotAndReset()).orElseThrow();
|
||||
assertFalse(line.contains("results ["), line);
|
||||
assertTrue(line.contains("duration ["), line);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void emptyWhenNoRecords() {
|
||||
HistoricalScanMetrics m = new HistoricalScanMetrics();
|
||||
assertTrue(m.format(m.snapshotAndReset()).isEmpty());
|
||||
}
|
||||
|
||||
//mechanical guardrail: future edits to the format must not introduce summary statistics, relative timestamps, or absolute
|
||||
//ISO timestamps. The forbidden tokens are bare (no colons) so "last value: 123ms" or "min 100ms" both trip the check.
|
||||
@Test
|
||||
public void formattedLineHasNoForbiddenSubstrings() {
|
||||
HistoricalScanMetrics m = new HistoricalScanMetrics();
|
||||
for(int i = 0; i < 20; i++) {
|
||||
m.record(50, 1500);
|
||||
}
|
||||
String line = m.format(m.snapshotAndReset()).orElseThrow();
|
||||
String lower = line.toLowerCase();
|
||||
for(String forbidden : new String[]{"min", "max", "avg", "mean", "last", "ago"}) {
|
||||
assertFalse(lower.contains(forbidden), "line must not contain '" + forbidden + "': " + line);
|
||||
}
|
||||
assertFalse(line.matches(".*\\d{4}-\\d{2}-\\d{2}.*"), "line must not contain ISO date: " + line);
|
||||
assertFalse(line.matches(".*\\d{2}:\\d{2}:\\d{2}.*"), "line must not contain ISO time: " + line);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user