From 72f85c398f12db5c31873c70d233de656bfcda3e Mon Sep 17 00:00:00 2001 From: Craig Raw Date: Tue, 31 Mar 2026 11:49:24 +0200 Subject: [PATCH] improve concurrency through use of duplicate duckdb connections --- benchmark.py | 8 +- build.gradle | 1 - .../frigate/electrum/RequestHandler.java | 14 ++- .../frigate/index/AbstractDbManager.java | 52 ---------- .../frigate/index/DuckDBReadPool.java | 95 +++++++++++++++++++ .../frigate/index/MemoryDbManager.java | 6 +- .../frigate/index/ScalingDbManager.java | 28 ++++-- .../frigate/index/SingleDbManager.java | 52 +++++----- src/main/java/module-info.java | 1 - src/main/resources/logback.xml | 3 - 10 files changed, 158 insertions(+), 102 deletions(-) create mode 100644 src/main/java/com/sparrowwallet/frigate/index/DuckDBReadPool.java diff --git a/benchmark.py b/benchmark.py index 5262a94..a48b911 100755 --- a/benchmark.py +++ b/benchmark.py @@ -102,9 +102,10 @@ def format_number(n): return f"{n:,}" -def run_benchmarks(host, port, end_height, markdown, clients): +def run_benchmarks(host, port, end_height, markdown, clients, max_periods=0): + period_list = PERIODS[:max_periods] if max_periods > 0 else PERIODS periods = [] - for desc, short, blocks in PERIODS: + for desc, short, blocks in period_list: start = end_height - blocks txns = TRANSACTION_COUNTS.get(short) if end_height == DEFAULT_END_HEIGHT else None periods.append((desc, short, blocks, start, f"{start}-{end_height}", txns)) @@ -186,10 +187,11 @@ def main(): parser.add_argument("--end-height", type=int, default=DEFAULT_END_HEIGHT, help=f"end block height (default: {DEFAULT_END_HEIGHT})") parser.add_argument("--markdown", action="store_true", help="output as markdown table") parser.add_argument("--clients", type=int, default=1, help="number of concurrent clients per scan period (default: 1)") + parser.add_argument("--periods", type=int, default=0, help="number of periods to run (default: all)") args = parser.parse_args() try: - run_benchmarks(args.host, args.port, args.end_height, args.markdown, args.clients) + run_benchmarks(args.host, args.port, args.end_height, args.markdown, args.clients, args.periods) except ConnectionRefusedError: print(f"Error: could not connect to {args.host}:{args.port}", file=sys.stderr) sys.exit(1) diff --git a/build.gradle b/build.gradle index 0773536..67aff05 100644 --- a/build.gradle +++ b/build.gradle @@ -21,7 +21,6 @@ tasks.withType(AbstractArchiveTask).configureEach { dependencies { implementation(project(':drongo')) implementation('org.duckdb:duckdb_jdbc:1.4.4.0') - implementation("com.zaxxer:HikariCP:7.0.2") implementation('com.google.guava:guava:33.5.0-jre') implementation('com.google.code.gson:gson:2.13.2') implementation('com.github.arteam:simple-json-rpc-core:1.3') diff --git a/src/main/java/com/sparrowwallet/frigate/electrum/RequestHandler.java b/src/main/java/com/sparrowwallet/frigate/electrum/RequestHandler.java index 6c2ce09..2c0de68 100644 --- a/src/main/java/com/sparrowwallet/frigate/electrum/RequestHandler.java +++ b/src/main/java/com/sparrowwallet/frigate/electrum/RequestHandler.java @@ -192,9 +192,17 @@ public class RequestHandler implements Runnable, SubscriptionStatus, Thread.Unca subscription.setHighestBlockHeight(notification.history().stream().mapToInt(TxEntry::getHeight).max().orElse(subscription.getHighestBlockHeight())); subscription.getMempoolTxids().addAll(notification.history().stream().filter(txEntry -> txEntry.height <= 0).map(txEntry -> Sha256Hash.wrap(txEntry.tx_hash)).collect(Collectors.toSet())); - ElectrumNotificationTransport electrumNotificationTransport = new ElectrumNotificationTransport(clientSocket); - JsonRpcClient jsonRpcClient = new JsonRpcClient(electrumNotificationTransport); - jsonRpcClient.onDemand(ElectrumNotificationService.class).notifySilentPayments(notification.subscription(), notification.progress(), notification.history()); + try { + ElectrumNotificationTransport electrumNotificationTransport = new ElectrumNotificationTransport(clientSocket); + JsonRpcClient jsonRpcClient = new JsonRpcClient(electrumNotificationTransport); + jsonRpcClient.onDemand(ElectrumNotificationService.class).notifySilentPayments(notification.subscription(), notification.progress(), notification.history()); + } catch(IllegalStateException e) { + if(e.getCause() instanceof java.io.IOException) { + log.debug("Client disconnected before notification could be sent"); + } else { + throw e; + } + } } } diff --git a/src/main/java/com/sparrowwallet/frigate/index/AbstractDbManager.java b/src/main/java/com/sparrowwallet/frigate/index/AbstractDbManager.java index 6c0f7fc..4f3b2f7 100644 --- a/src/main/java/com/sparrowwallet/frigate/index/AbstractDbManager.java +++ b/src/main/java/com/sparrowwallet/frigate/index/AbstractDbManager.java @@ -1,17 +1,11 @@ package com.sparrowwallet.frigate.index; -import com.sparrowwallet.frigate.io.Config; -import com.sparrowwallet.frigate.io.Storage; -import com.zaxxer.hikari.HikariConfig; -import com.zaxxer.hikari.HikariDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; -import java.util.Properties; public abstract class AbstractDbManager implements DbManager { private static final Logger log = LoggerFactory.getLogger(AbstractDbManager.class); @@ -23,50 +17,4 @@ public abstract class AbstractDbManager implements DbManager { log.debug("Creating write connection"); return DriverManager.getConnection(connectionUrl); } - - protected HikariDataSource createReadDataSource(String connectionUrl, int maxPoolSize) { - HikariConfig config = new HikariConfig(); - config.setJdbcUrl(connectionUrl); - config.setDriverClassName("org.duckdb.DuckDBDriver"); - config.addDataSourceProperty("access_mode", "READ_ONLY"); - config.addDataSourceProperty("allow_unsigned_extensions", "true"); - config.addDataSourceProperty("jdbc_stream_results", "true"); - config.addDataSourceProperty("scheduler_process_partial", "true"); - config.setConnectionInitSql(buildReadConnectionInitSql()); - - config.setMaximumPoolSize(maxPoolSize); - config.setMinimumIdle(2); - config.setConnectionTimeout(30000); - config.setIdleTimeout(300000); - config.setMaxLifetime(900000); - config.setLeakDetectionThreshold(0); - - config.setConnectionTestQuery("SELECT 1"); - config.setValidationTimeout(5000); - config.setPoolName("DuckDB-ReadOnly-Pool"); - - log.debug("Creating read connection pool with max size: " + config.getMaximumPoolSize()); - return new HikariDataSource(config); - } - - private String buildReadConnectionInitSql() { - Properties duckDbProperties = new Properties(); - duckDbProperties.setProperty("enable_progress_bar", "true"); - duckDbProperties.setProperty("enable_progress_bar_print", "false"); - if(Config.get().getDbThreads() != null) { - duckDbProperties.setProperty("threads", Config.get().getDbThreads().toString()); - } - - StringBuilder sql = new StringBuilder(); - for(String propertyName : duckDbProperties.stringPropertyNames()) { - String value = duckDbProperties.getProperty(propertyName); - sql.append("SET ").append(propertyName).append(" = '").append(value).append("'; "); - } - - File ufsecpExtensionFile = Storage.getUfsecpExtensionFile(); - sql.append("LOAD '").append(ufsecpExtensionFile.getAbsolutePath()).append("'; "); - sql.append("SELECT ufsecp_set_cache_dir('").append(Storage.getFrigateCacheDir().getAbsolutePath()).append("'); "); - - return sql.toString().trim(); - } } diff --git a/src/main/java/com/sparrowwallet/frigate/index/DuckDBReadPool.java b/src/main/java/com/sparrowwallet/frigate/index/DuckDBReadPool.java new file mode 100644 index 0000000..4d68346 --- /dev/null +++ b/src/main/java/com/sparrowwallet/frigate/index/DuckDBReadPool.java @@ -0,0 +1,95 @@ +package com.sparrowwallet.frigate.index; + +import com.sparrowwallet.frigate.io.Config; +import com.sparrowwallet.frigate.io.Storage; +import org.duckdb.DuckDBConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.sql.*; +import java.util.Properties; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class DuckDBReadPool { + private static final Logger log = LoggerFactory.getLogger(DuckDBReadPool.class); + + private final DuckDBConnection masterConnection; + private final ArrayBlockingQueue pool; + private final int maxSize; + private volatile boolean closed = false; + + public DuckDBReadPool(String connectionUrl, int maxSize) throws SQLException { + this.maxSize = maxSize; + + Properties props = new Properties(); + props.setProperty("duckdb.read_only", "true"); + props.setProperty("allow_unsigned_extensions", "true"); + this.masterConnection = (DuckDBConnection)DriverManager.getConnection(connectionUrl, props); + + try(Statement stmt = masterConnection.createStatement()) { + if(Config.get().getDbThreads() != null) { + stmt.execute("SET threads = '" + Config.get().getDbThreads() + "'"); + } + + File ufsecpExtensionFile = Storage.getUfsecpExtensionFile(); + stmt.execute("LOAD '" + ufsecpExtensionFile.getAbsolutePath() + "'"); + stmt.execute("SELECT ufsecp_set_cache_dir('" + Storage.getFrigateCacheDir().getAbsolutePath() + "')"); + } + + this.pool = new ArrayBlockingQueue<>(maxSize); + log.debug("DuckDB read pool created (max size: {})", maxSize); + } + + public Connection getConnection() throws SQLException { + if(closed) { + throw new SQLException("Pool is closed"); + } + + Connection conn = pool.poll(); + if(conn != null && !conn.isClosed()) { + return conn; + } + + return masterConnection.duplicate(); + } + + public void releaseConnection(Connection conn) { + if(closed || conn == null) { + closeQuietly(conn); + return; + } + + try { + if(conn.isClosed()) { + return; + } + } catch(SQLException e) { + return; + } + + if(!pool.offer(conn)) { + closeQuietly(conn); + } + } + + public void close() { + closed = true; + Connection conn; + while((conn = pool.poll()) != null) { + closeQuietly(conn); + } + closeQuietly(masterConnection); + log.debug("DuckDB read pool closed"); + } + + private static void closeQuietly(Connection conn) { + if(conn != null) { + try { + conn.close(); + } catch(SQLException ignored) { + } + } + } +} diff --git a/src/main/java/com/sparrowwallet/frigate/index/MemoryDbManager.java b/src/main/java/com/sparrowwallet/frigate/index/MemoryDbManager.java index f8aefcf..95b51ca 100644 --- a/src/main/java/com/sparrowwallet/frigate/index/MemoryDbManager.java +++ b/src/main/java/com/sparrowwallet/frigate/index/MemoryDbManager.java @@ -13,13 +13,13 @@ import java.sql.Statement; import java.util.Properties; public class MemoryDbManager implements DbManager { - private final static Logger log = LoggerFactory.getLogger(ScalingDbManager.class); + private final static Logger log = LoggerFactory.getLogger(MemoryDbManager.class); private Connection connection; private boolean shutdown = false; @Override - public T executeRead(ReadOperation operation) throws SQLException { + public synchronized T executeRead(ReadOperation operation) throws SQLException { if(shutdown) { throw new SQLException("Connection manager is shutting down"); } @@ -29,7 +29,7 @@ public class MemoryDbManager implements DbManager { } @Override - public T executeWrite(WriteOperation operation) throws SQLException { + public synchronized T executeWrite(WriteOperation operation) throws SQLException { if(shutdown) { throw new SQLException("Connection manager is shutting down"); } diff --git a/src/main/java/com/sparrowwallet/frigate/index/ScalingDbManager.java b/src/main/java/com/sparrowwallet/frigate/index/ScalingDbManager.java index 6bc19e2..bec143b 100644 --- a/src/main/java/com/sparrowwallet/frigate/index/ScalingDbManager.java +++ b/src/main/java/com/sparrowwallet/frigate/index/ScalingDbManager.java @@ -1,6 +1,5 @@ package com.sparrowwallet.frigate.index; -import com.zaxxer.hikari.HikariDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,7 +14,7 @@ public class ScalingDbManager extends AbstractDbManager { private final String readWriteUrl; private Connection writeConnection; - private final List dataSources = new ArrayList<>(); + private final List readPools = new ArrayList<>(); private final AtomicInteger index = new AtomicInteger(0); private boolean shutdown = false; @@ -23,8 +22,11 @@ public class ScalingDbManager extends AbstractDbManager { super(); this.readWriteUrl = readWriteUrl; for(String url : readOnlyUrls) { - HikariDataSource ds = createReadDataSource(url, 1); - dataSources.add(ds); + try { + readPools.add(new DuckDBReadPool(url, 10)); + } catch(SQLException e) { + throw new RuntimeException("Failed to create DuckDB read pool for " + url, e); + } } } @@ -34,9 +36,17 @@ public class ScalingDbManager extends AbstractDbManager { throw new SQLException("Connection manager is shutting down"); } - int ind = index.getAndIncrement() % dataSources.size(); - HikariDataSource ds = dataSources.get(ind); - return operation.execute(ds.getConnection()); + int ind = index.getAndIncrement() % readPools.size(); + DuckDBReadPool pool = readPools.get(ind); + Connection conn = null; + try { + conn = pool.getConnection(); + return operation.execute(conn); + } finally { + if(conn != null) { + pool.releaseConnection(conn); + } + } } @Override @@ -61,8 +71,8 @@ public class ScalingDbManager extends AbstractDbManager { log.error("Error closing write connection", e); } - for(HikariDataSource ds : dataSources) { - ds.close(); + for(DuckDBReadPool pool : readPools) { + pool.close(); } } diff --git a/src/main/java/com/sparrowwallet/frigate/index/SingleDbManager.java b/src/main/java/com/sparrowwallet/frigate/index/SingleDbManager.java index a371c75..5206111 100644 --- a/src/main/java/com/sparrowwallet/frigate/index/SingleDbManager.java +++ b/src/main/java/com/sparrowwallet/frigate/index/SingleDbManager.java @@ -1,6 +1,5 @@ package com.sparrowwallet.frigate.index; -import com.zaxxer.hikari.HikariDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,7 +18,7 @@ public class SingleDbManager extends AbstractDbManager { private final Semaphore writerWaiting; private Connection writeConnection; - private HikariDataSource readDataSource; + private DuckDBReadPool readPool; private boolean inWriteMode; private volatile boolean shutdown = false; private volatile boolean writeOperationActive = false; @@ -50,12 +49,15 @@ public class SingleDbManager extends AbstractDbManager { } rwLock.readLock().lock(); + Connection conn = null; try { ensureReadMode(); - try(Connection conn = readDataSource.getConnection()) { - return operation.execute(conn); - } + conn = readPool.getConnection(); + return operation.execute(conn); } finally { + if(conn != null) { + readPool.releaseConnection(conn); + } rwLock.readLock().unlock(); } } @@ -93,11 +95,11 @@ public class SingleDbManager extends AbstractDbManager { } private synchronized void ensureReadMode() throws SQLException { - if(inWriteMode || readDataSource == null) { + if(inWriteMode || readPool == null) { log.debug("Switching to READ mode"); waitForWriteOperationToComplete(); closeWriteConnection(); - createReadDataSource(); + createReadPool(); inWriteMode = false; } } @@ -105,7 +107,7 @@ public class SingleDbManager extends AbstractDbManager { private synchronized void ensureWriteMode() throws SQLException { if(!inWriteMode || writeConnection == null) { log.debug("Switching to WRITE mode"); - closeReadDataSource(); + closeReadPool(); createWriteConnection(); inWriteMode = true; } @@ -125,12 +127,16 @@ public class SingleDbManager extends AbstractDbManager { } } - private void createReadDataSource() { - if(readDataSource != null) { + private void createReadPool() { + if(readPool != null) { return; } - readDataSource = createReadDataSource(connectionUrl, 10); + try { + readPool = new DuckDBReadPool(connectionUrl, 10); + } catch(SQLException e) { + throw new RuntimeException("Failed to create DuckDB read pool", e); + } } private void createWriteConnection() throws SQLException { @@ -156,12 +162,12 @@ public class SingleDbManager extends AbstractDbManager { } } - private void closeReadDataSource() { - if(readDataSource != null) { + private void closeReadPool() { + if(readPool != null) { try { - readDataSource.close(); + readPool.close(); } finally { - readDataSource = null; + readPool = null; log.debug("Closed read connection pool"); } } @@ -210,24 +216,16 @@ public class SingleDbManager extends AbstractDbManager { } try { - closeReadDataSource(); + closeReadPool(); } catch(Exception e) { - log.error("Error closing read data source", e); + log.error("Error closing read pool", e); } log.debug("Shutdown complete"); } - public int getActiveReadConnections() { - return readDataSource != null ? readDataSource.getHikariPoolMXBean().getActiveConnections() : 0; - } - - public int getIdleReadConnections() { - return readDataSource != null ? readDataSource.getHikariPoolMXBean().getIdleConnections() : 0; - } - - public int getTotalReadConnections() { - return readDataSource != null ? readDataSource.getHikariPoolMXBean().getTotalConnections() : 0; + public boolean isReadPoolActive() { + return readPool != null; } public boolean isShutdown() { diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index f4d99fe..6078f9d 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -1,7 +1,6 @@ module com.sparrowwallet.frigate { requires com.sparrowwallet.drongo; requires duckdb.jdbc; - requires com.zaxxer.hikari; requires com.fasterxml.jackson.annotation; requires simple.json.rpc.core; requires simple.json.rpc.client; diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 35a725d..125c0fa 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -3,9 +3,6 @@ - - - frigate