diff --git a/README.md b/README.md index 64c4a2c..d603f1d 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,7 @@ connect = true # memoryLimit = "8GB" # cap DuckDB memory usage (default: 80% of system RAM) # maxLabels = 10 # maximum number of labels accepted per silent payments subscription # maxSubscriptions = 100 # maximum number of silent payments subscriptions per connection +# metricsEnabled = true # hourly aggregate scan stats log line (default: true) [server] # host = "ssl://xyz.com:50002" # advertised in server.features; use array for multiple. Omit to advertise nothing. @@ -118,6 +119,9 @@ The `maxLabels` setting caps the number of labels accepted per silent payments s The `maxSubscriptions` setting caps the number of silent payments subscriptions per connection (default: 100). Requests exceeding either limit are rejected with a JSON-RPC `-32602 Invalid params` error. +When `metricsEnabled` is true (default), Frigate emits one `Aggregate SP scan stats` log line per hour summarising historical scan throughput across all scans in the window. +The output is bucketed by result count and duration, rounded, and suppresses any bucket with fewer than ten samples, so no per-client scan information is exposed. Set to `false` to disable the line entirely. + ### Server Listeners are configured as bind URLs in the scheme `tcp://` or `ssl://` followed by a host and port. diff --git a/src/main/java/com/sparrowwallet/frigate/Frigate.java b/src/main/java/com/sparrowwallet/frigate/Frigate.java index caac090..9d0afd2 100644 --- a/src/main/java/com/sparrowwallet/frigate/Frigate.java +++ b/src/main/java/com/sparrowwallet/frigate/Frigate.java @@ -39,6 +39,7 @@ public class Frigate { private Index blocksIndex; private Index mempoolIndex; + private IndexQuerier indexQuerier; private BitcoindClient bitcoindClient; private ElectrumServerRunnable electrumServer; @@ -73,7 +74,8 @@ public class Frigate { SSLContext sslContext = serverConfig.isSslEnabled() ? SslUtil.getServerSSLContext(serverConfig.getSslCertFile(), serverConfig.getSslKeyFile()) : null; InetSocketAddress tcpBind = toBindAddress(serverConfig.getTcpServer()); InetSocketAddress sslBind = toBindAddress(serverConfig.getSslServer()); - electrumServer = new ElectrumServerRunnable(bitcoindClient, new IndexQuerier(blocksIndex, mempoolIndex), tcpBind, sslBind, sslContext); + indexQuerier = new IndexQuerier(blocksIndex, mempoolIndex); + electrumServer = new ElectrumServerRunnable(bitcoindClient, indexQuerier, tcpBind, sslBind, sslContext); Thread electrumServerThread = new Thread(electrumServer, "Frigate Electrum Server"); electrumServerThread.setDaemon(false); electrumServerThread.start(); @@ -100,6 +102,9 @@ public class Frigate { if(electrumServer != null) { electrumServer.stop(); } + if(indexQuerier != null) { + indexQuerier.close(); + } running = false; } diff --git a/src/main/java/com/sparrowwallet/frigate/index/HistoricalScanMetrics.java b/src/main/java/com/sparrowwallet/frigate/index/HistoricalScanMetrics.java new file mode 100644 index 0000000..0b072db --- /dev/null +++ b/src/main/java/com/sparrowwallet/frigate/index/HistoricalScanMetrics.java @@ -0,0 +1,95 @@ +package com.sparrowwallet.frigate.index; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicIntegerArray; + +/** + * Privacy-preserving aggregate counters for historical scan throughput. + */ +class HistoricalScanMetrics { + private static final int MIN_SAMPLES_PER_BUCKET = 10; + private static final int COUNT_ROUNDING = 10; + + static final String[] RESULT_LABELS = {"0", "1-10", "11-100", "101-1000", "1001-10000", "10001+"}; + static final String[] DURATION_LABELS = {"0-100ms", "100-500ms", "500ms-2s", "2-10s", "10-60s", "60s+"}; + + private final AtomicIntegerArray resultBuckets = new AtomicIntegerArray(RESULT_LABELS.length); + private final AtomicIntegerArray durationBuckets = new AtomicIntegerArray(DURATION_LABELS.length); + + record Snapshot(int[] results, int[] durations) {} + + void record(int resultCount, long durationMillis) { + resultBuckets.incrementAndGet(resultBucket(resultCount)); + durationBuckets.incrementAndGet(durationBucket(durationMillis)); + } + + //a record() concurrent with snapshotAndReset() may have its two increments split across windows. The per-bucket drift is at + //most one sample per emission and never crosses subscription identity, so it is fine for an aggregate stat. + Snapshot snapshotAndReset() { + int[] results = new int[RESULT_LABELS.length]; + int[] durations = new int[DURATION_LABELS.length]; + for(int i = 0; i < results.length; i++) { + results[i] = resultBuckets.getAndSet(i, 0); + } + for(int i = 0; i < durations.length; i++) { + durations[i] = durationBuckets.getAndSet(i, 0); + } + return new Snapshot(results, durations); + } + + Optional format(Snapshot snapshot) { + String resultsStr = formatHist(snapshot.results(), RESULT_LABELS); + String durationsStr = formatHist(snapshot.durations(), DURATION_LABELS); + if(resultsStr.isEmpty() && durationsStr.isEmpty()) { + return Optional.empty(); + } + + StringBuilder sb = new StringBuilder("Aggregate SP scan stats (1h window):"); + if(!resultsStr.isEmpty()) { + sb.append(" results [").append(resultsStr).append("]"); + } + if(!durationsStr.isEmpty()) { + sb.append(" duration [").append(durationsStr).append("]"); + } + return Optional.of(sb.toString()); + } + + static int resultBucket(int n) { + if(n <= 0) return 0; + if(n <= 10) return 1; + if(n <= 100) return 2; + if(n <= 1000) return 3; + if(n <= 10000) return 4; + return 5; + } + + static int durationBucket(long ms) { + if(ms < 100) return 0; + if(ms < 500) return 1; + if(ms < 2000) return 2; + if(ms < 10000) return 3; + if(ms < 60000) return 4; + return 5; + } + + private static String formatHist(int[] counts, String[] labels) { + StringBuilder sb = new StringBuilder(); + for(int i = 0; i < counts.length; i++) { + int rounded = roundCount(counts[i]); + if(rounded > 0) { + if(sb.length() > 0) { + sb.append(", "); + } + sb.append(labels[i]).append(":").append(rounded); + } + } + return sb.toString(); + } + + private static int roundCount(int raw) { + if(raw < MIN_SAMPLES_PER_BUCKET) { + return 0; + } + return ((raw + COUNT_ROUNDING / 2) / COUNT_ROUNDING) * COUNT_ROUNDING; + } +} diff --git a/src/main/java/com/sparrowwallet/frigate/index/IndexQuerier.java b/src/main/java/com/sparrowwallet/frigate/index/IndexQuerier.java index 2ae54ed..6d6f80c 100644 --- a/src/main/java/com/sparrowwallet/frigate/index/IndexQuerier.java +++ b/src/main/java/com/sparrowwallet/frigate/index/IndexQuerier.java @@ -9,6 +9,7 @@ import com.sparrowwallet.frigate.electrum.SilentPaymentAddressSubscription; import com.sparrowwallet.frigate.electrum.SilentPaymentsNotification; import com.sparrowwallet.frigate.electrum.SilentPaymentsSubscription; import com.sparrowwallet.frigate.electrum.SilentPaymentsTxEntry; +import com.sparrowwallet.frigate.io.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,7 +20,9 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; public class IndexQuerier { @@ -29,10 +32,25 @@ public class IndexQuerier { private final Index blocksIndex; private final Index mempoolIndex; + private final HistoricalScanMetrics metrics; + private final ScheduledExecutorService metricsExecutor; public IndexQuerier(Index blocksIndex, Index mempoolIndex) { this.blocksIndex = blocksIndex; this.mempoolIndex = mempoolIndex; + + if(Config.get().getScan().isMetricsEnabled()) { + this.metrics = new HistoricalScanMetrics(); + this.metricsExecutor = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new ThreadFactoryBuilder().setNameFormat("IndexQueryMetrics-%d").build().newThread(r); + t.setDaemon(true); + return t; + }); + this.metricsExecutor.scheduleAtFixedRate(this::emitMetrics, 1, 1, TimeUnit.HOURS); + } else { + this.metrics = null; + this.metricsExecutor = null; + } } private final ExecutorService queryPool = Executors.newFixedThreadPool(10, r -> { @@ -42,14 +60,30 @@ public class IndexQuerier { return t; }); + private void emitMetrics() { + try { + metrics.format(metrics.snapshotAndReset()).ifPresent(log::info); + } catch(Throwable t) { + log.warn("Failed to emit scan metrics", t); + } + } + + public void close() { + if(metricsExecutor != null) { + metricsExecutor.shutdownNow(); + } + } + public void startHistoryScan(SilentPaymentScanAddress scanAddress, Integer startHeight, Integer endHeight, SilentPaymentAddressSubscription subscription, WeakReference subscriptionStatusRef, boolean isHistorical) { BooleanSupplier cancelled = subscription.captureScanCancellation(); queryPool.submit(() -> { + long startMillis = isHistorical ? System.currentTimeMillis() : 0L; try { SilentPaymentsSubscription notificationSubscription = new SilentPaymentsSubscription(scanAddress.toString(), subscription.getLabels().toArray(new Integer[0]), subscription.getStartHeight()); List history = blocksIndex.getHistoryAsync(scanAddress, notificationSubscription, startHeight, endHeight, null, subscriptionStatusRef, cancelled, isHistorical); List mempoolHistory = getMempoolHistory(scanAddress, null, subscriptionStatusRef, notificationSubscription, cancelled); history.addAll(mempoolHistory); + long scanDurationMillis = isHistorical ? System.currentTimeMillis() - startMillis : 0L; boolean wasCancelled = cancelled.getAsBoolean(); if(!wasCancelled && (isHistorical || !history.isEmpty())) { @@ -57,6 +91,9 @@ public class IndexQuerier { } if(!wasCancelled && isHistorical) { subscription.markHistoricalComplete(); + if(metrics != null) { + metrics.record(history.size(), scanDurationMillis); + } } } catch(Throwable t) { log.error("History scan task failed for " + scanAddress + " (start=" + startHeight + ", end=" + endHeight + ", isHistorical=" + isHistorical + ")", t); diff --git a/src/main/java/com/sparrowwallet/frigate/io/Config.java b/src/main/java/com/sparrowwallet/frigate/io/Config.java index 4ca80f8..0adc655 100644 --- a/src/main/java/com/sparrowwallet/frigate/io/Config.java +++ b/src/main/java/com/sparrowwallet/frigate/io/Config.java @@ -390,6 +390,7 @@ public class Config { private String memoryLimit; private Integer maxLabels; private Integer maxSubscriptions; + private Boolean metricsEnabled; public int getBatchSize() { return batchSize != null ? batchSize : DEFAULT_BATCH_SIZE; @@ -450,6 +451,14 @@ public class Config { public void setMemoryLimit(String memoryLimit) { this.memoryLimit = memoryLimit; } + + public boolean isMetricsEnabled() { + return metricsEnabled == null || metricsEnabled; + } + + public void setMetricsEnabled(Boolean metricsEnabled) { + this.metricsEnabled = metricsEnabled; + } } @JsonIgnoreProperties(ignoreUnknown = true) diff --git a/src/test/java/com/sparrowwallet/frigate/index/HistoricalScanMetricsTest.java b/src/test/java/com/sparrowwallet/frigate/index/HistoricalScanMetricsTest.java new file mode 100644 index 0000000..9214eb3 --- /dev/null +++ b/src/test/java/com/sparrowwallet/frigate/index/HistoricalScanMetricsTest.java @@ -0,0 +1,147 @@ +package com.sparrowwallet.frigate.index; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class HistoricalScanMetricsTest { + @Test + public void resultBucketBoundaries() { + assertEquals(0, HistoricalScanMetrics.resultBucket(0)); + assertEquals(1, HistoricalScanMetrics.resultBucket(1)); + assertEquals(1, HistoricalScanMetrics.resultBucket(10)); + assertEquals(2, HistoricalScanMetrics.resultBucket(11)); + assertEquals(2, HistoricalScanMetrics.resultBucket(100)); + assertEquals(3, HistoricalScanMetrics.resultBucket(101)); + assertEquals(3, HistoricalScanMetrics.resultBucket(1000)); + assertEquals(4, HistoricalScanMetrics.resultBucket(1001)); + assertEquals(4, HistoricalScanMetrics.resultBucket(10000)); + assertEquals(5, HistoricalScanMetrics.resultBucket(10001)); + } + + //pair each sample input with the label the log line will use, so a future edit that changes a boundary without updating + //the corresponding label (or vice versa) fails here instead of silently producing a misleading aggregate line. + @Test + public void resultLabelsMatchBoundaries() { + assertEquals("0", HistoricalScanMetrics.RESULT_LABELS[HistoricalScanMetrics.resultBucket(0)]); + assertEquals("1-10", HistoricalScanMetrics.RESULT_LABELS[HistoricalScanMetrics.resultBucket(5)]); + assertEquals("11-100", HistoricalScanMetrics.RESULT_LABELS[HistoricalScanMetrics.resultBucket(50)]); + assertEquals("101-1000", HistoricalScanMetrics.RESULT_LABELS[HistoricalScanMetrics.resultBucket(500)]); + assertEquals("1001-10000", HistoricalScanMetrics.RESULT_LABELS[HistoricalScanMetrics.resultBucket(5000)]); + assertEquals("10001+", HistoricalScanMetrics.RESULT_LABELS[HistoricalScanMetrics.resultBucket(50000)]); + } + + @Test + public void durationLabelsMatchBoundaries() { + assertEquals("0-100ms", HistoricalScanMetrics.DURATION_LABELS[HistoricalScanMetrics.durationBucket(50)]); + assertEquals("100-500ms", HistoricalScanMetrics.DURATION_LABELS[HistoricalScanMetrics.durationBucket(250)]); + assertEquals("500ms-2s", HistoricalScanMetrics.DURATION_LABELS[HistoricalScanMetrics.durationBucket(1000)]); + assertEquals("2-10s", HistoricalScanMetrics.DURATION_LABELS[HistoricalScanMetrics.durationBucket(5000)]); + assertEquals("10-60s", HistoricalScanMetrics.DURATION_LABELS[HistoricalScanMetrics.durationBucket(30000)]); + assertEquals("60s+", HistoricalScanMetrics.DURATION_LABELS[HistoricalScanMetrics.durationBucket(120000)]); + } + + @Test + public void durationBucketBoundaries() { + assertEquals(0, HistoricalScanMetrics.durationBucket(0)); + assertEquals(0, HistoricalScanMetrics.durationBucket(99)); + assertEquals(1, HistoricalScanMetrics.durationBucket(100)); + assertEquals(1, HistoricalScanMetrics.durationBucket(499)); + assertEquals(2, HistoricalScanMetrics.durationBucket(500)); + assertEquals(2, HistoricalScanMetrics.durationBucket(1999)); + assertEquals(3, HistoricalScanMetrics.durationBucket(2000)); + assertEquals(3, HistoricalScanMetrics.durationBucket(9999)); + assertEquals(4, HistoricalScanMetrics.durationBucket(10000)); + assertEquals(4, HistoricalScanMetrics.durationBucket(59999)); + assertEquals(5, HistoricalScanMetrics.durationBucket(60000)); + } + + @Test + public void thresholdSuppressesBelowTen() { + HistoricalScanMetrics m = new HistoricalScanMetrics(); + for(int i = 0; i < 9; i++) { + m.record(5, 200); + } + assertTrue(m.format(m.snapshotAndReset()).isEmpty()); + } + + @Test + public void exactlyTenEmitsAsTen() { + HistoricalScanMetrics m = new HistoricalScanMetrics(); + for(int i = 0; i < 10; i++) { + m.record(5, 200); + } + String line = m.format(m.snapshotAndReset()).orElseThrow(); + assertTrue(line.contains("1-10:10"), line); + assertTrue(line.contains("100-500ms:10"), line); + } + + @Test + public void fourteenRoundsDownToTen() { + HistoricalScanMetrics m = new HistoricalScanMetrics(); + for(int i = 0; i < 14; i++) { + m.record(5, 200); + } + assertTrue(m.format(m.snapshotAndReset()).orElseThrow().contains("1-10:10")); + } + + @Test + public void fifteenRoundsUpToTwenty() { + HistoricalScanMetrics m = new HistoricalScanMetrics(); + for(int i = 0; i < 15; i++) { + m.record(5, 200); + } + assertTrue(m.format(m.snapshotAndReset()).orElseThrow().contains("1-10:20")); + } + + @Test + public void snapshotResetsCounters() { + HistoricalScanMetrics m = new HistoricalScanMetrics(); + for(int i = 0; i < 20; i++) { + m.record(5, 200); + } + m.snapshotAndReset(); + assertTrue(m.format(m.snapshotAndReset()).isEmpty()); + } + + //spread result counts across all five non-zero buckets so no result bucket reaches threshold, while every duration lands in + //the same bucket so it does — verifies one-sided emission. + @Test + public void onlyOneHistogramEmittedWhenOnlyOneClearsThreshold() { + HistoricalScanMetrics m = new HistoricalScanMetrics(); + int[] resultsPerBucket = {0, 0, 5, 5, 50, 50, 500, 500, 5000, 5000}; + for(int n : resultsPerBucket) { + m.record(n, 200); + } + String line = m.format(m.snapshotAndReset()).orElseThrow(); + assertFalse(line.contains("results ["), line); + assertTrue(line.contains("duration ["), line); + } + + @Test + public void emptyWhenNoRecords() { + HistoricalScanMetrics m = new HistoricalScanMetrics(); + assertTrue(m.format(m.snapshotAndReset()).isEmpty()); + } + + //mechanical guardrail: future edits to the format must not introduce summary statistics, relative timestamps, or absolute + //ISO timestamps. The forbidden tokens are bare (no colons) so "last value: 123ms" or "min 100ms" both trip the check. + @Test + public void formattedLineHasNoForbiddenSubstrings() { + HistoricalScanMetrics m = new HistoricalScanMetrics(); + for(int i = 0; i < 20; i++) { + m.record(50, 1500); + } + String line = m.format(m.snapshotAndReset()).orElseThrow(); + String lower = line.toLowerCase(); + for(String forbidden : new String[]{"min", "max", "avg", "mean", "last", "ago"}) { + assertFalse(lower.contains(forbidden), "line must not contain '" + forbidden + "': " + line); + } + assertFalse(line.matches(".*\\d{4}-\\d{2}-\\d{2}.*"), "line must not contain ISO date: " + line); + assertFalse(line.matches(".*\\d{2}:\\d{2}:\\d{2}.*"), "line must not contain ISO time: " + line); + } +}