improve concurrency through use of duplicate duckdb connections
This commit is contained in:
parent
121d4131db
commit
72f85c398f
@ -102,9 +102,10 @@ def format_number(n):
|
||||
return f"{n:,}"
|
||||
|
||||
|
||||
def run_benchmarks(host, port, end_height, markdown, clients):
|
||||
def run_benchmarks(host, port, end_height, markdown, clients, max_periods=0):
|
||||
period_list = PERIODS[:max_periods] if max_periods > 0 else PERIODS
|
||||
periods = []
|
||||
for desc, short, blocks in PERIODS:
|
||||
for desc, short, blocks in period_list:
|
||||
start = end_height - blocks
|
||||
txns = TRANSACTION_COUNTS.get(short) if end_height == DEFAULT_END_HEIGHT else None
|
||||
periods.append((desc, short, blocks, start, f"{start}-{end_height}", txns))
|
||||
@ -186,10 +187,11 @@ def main():
|
||||
parser.add_argument("--end-height", type=int, default=DEFAULT_END_HEIGHT, help=f"end block height (default: {DEFAULT_END_HEIGHT})")
|
||||
parser.add_argument("--markdown", action="store_true", help="output as markdown table")
|
||||
parser.add_argument("--clients", type=int, default=1, help="number of concurrent clients per scan period (default: 1)")
|
||||
parser.add_argument("--periods", type=int, default=0, help="number of periods to run (default: all)")
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
run_benchmarks(args.host, args.port, args.end_height, args.markdown, args.clients)
|
||||
run_benchmarks(args.host, args.port, args.end_height, args.markdown, args.clients, args.periods)
|
||||
except ConnectionRefusedError:
|
||||
print(f"Error: could not connect to {args.host}:{args.port}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
@ -21,7 +21,6 @@ tasks.withType(AbstractArchiveTask).configureEach {
|
||||
dependencies {
|
||||
implementation(project(':drongo'))
|
||||
implementation('org.duckdb:duckdb_jdbc:1.4.4.0')
|
||||
implementation("com.zaxxer:HikariCP:7.0.2")
|
||||
implementation('com.google.guava:guava:33.5.0-jre')
|
||||
implementation('com.google.code.gson:gson:2.13.2')
|
||||
implementation('com.github.arteam:simple-json-rpc-core:1.3')
|
||||
|
||||
@ -192,9 +192,17 @@ public class RequestHandler implements Runnable, SubscriptionStatus, Thread.Unca
|
||||
subscription.setHighestBlockHeight(notification.history().stream().mapToInt(TxEntry::getHeight).max().orElse(subscription.getHighestBlockHeight()));
|
||||
subscription.getMempoolTxids().addAll(notification.history().stream().filter(txEntry -> txEntry.height <= 0).map(txEntry -> Sha256Hash.wrap(txEntry.tx_hash)).collect(Collectors.toSet()));
|
||||
|
||||
ElectrumNotificationTransport electrumNotificationTransport = new ElectrumNotificationTransport(clientSocket);
|
||||
JsonRpcClient jsonRpcClient = new JsonRpcClient(electrumNotificationTransport);
|
||||
jsonRpcClient.onDemand(ElectrumNotificationService.class).notifySilentPayments(notification.subscription(), notification.progress(), notification.history());
|
||||
try {
|
||||
ElectrumNotificationTransport electrumNotificationTransport = new ElectrumNotificationTransport(clientSocket);
|
||||
JsonRpcClient jsonRpcClient = new JsonRpcClient(electrumNotificationTransport);
|
||||
jsonRpcClient.onDemand(ElectrumNotificationService.class).notifySilentPayments(notification.subscription(), notification.progress(), notification.history());
|
||||
} catch(IllegalStateException e) {
|
||||
if(e.getCause() instanceof java.io.IOException) {
|
||||
log.debug("Client disconnected before notification could be sent");
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1,17 +1,11 @@
|
||||
package com.sparrowwallet.frigate.index;
|
||||
|
||||
import com.sparrowwallet.frigate.io.Config;
|
||||
import com.sparrowwallet.frigate.io.Storage;
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Properties;
|
||||
|
||||
public abstract class AbstractDbManager implements DbManager {
|
||||
private static final Logger log = LoggerFactory.getLogger(AbstractDbManager.class);
|
||||
@ -23,50 +17,4 @@ public abstract class AbstractDbManager implements DbManager {
|
||||
log.debug("Creating write connection");
|
||||
return DriverManager.getConnection(connectionUrl);
|
||||
}
|
||||
|
||||
protected HikariDataSource createReadDataSource(String connectionUrl, int maxPoolSize) {
|
||||
HikariConfig config = new HikariConfig();
|
||||
config.setJdbcUrl(connectionUrl);
|
||||
config.setDriverClassName("org.duckdb.DuckDBDriver");
|
||||
config.addDataSourceProperty("access_mode", "READ_ONLY");
|
||||
config.addDataSourceProperty("allow_unsigned_extensions", "true");
|
||||
config.addDataSourceProperty("jdbc_stream_results", "true");
|
||||
config.addDataSourceProperty("scheduler_process_partial", "true");
|
||||
config.setConnectionInitSql(buildReadConnectionInitSql());
|
||||
|
||||
config.setMaximumPoolSize(maxPoolSize);
|
||||
config.setMinimumIdle(2);
|
||||
config.setConnectionTimeout(30000);
|
||||
config.setIdleTimeout(300000);
|
||||
config.setMaxLifetime(900000);
|
||||
config.setLeakDetectionThreshold(0);
|
||||
|
||||
config.setConnectionTestQuery("SELECT 1");
|
||||
config.setValidationTimeout(5000);
|
||||
config.setPoolName("DuckDB-ReadOnly-Pool");
|
||||
|
||||
log.debug("Creating read connection pool with max size: " + config.getMaximumPoolSize());
|
||||
return new HikariDataSource(config);
|
||||
}
|
||||
|
||||
private String buildReadConnectionInitSql() {
|
||||
Properties duckDbProperties = new Properties();
|
||||
duckDbProperties.setProperty("enable_progress_bar", "true");
|
||||
duckDbProperties.setProperty("enable_progress_bar_print", "false");
|
||||
if(Config.get().getDbThreads() != null) {
|
||||
duckDbProperties.setProperty("threads", Config.get().getDbThreads().toString());
|
||||
}
|
||||
|
||||
StringBuilder sql = new StringBuilder();
|
||||
for(String propertyName : duckDbProperties.stringPropertyNames()) {
|
||||
String value = duckDbProperties.getProperty(propertyName);
|
||||
sql.append("SET ").append(propertyName).append(" = '").append(value).append("'; ");
|
||||
}
|
||||
|
||||
File ufsecpExtensionFile = Storage.getUfsecpExtensionFile();
|
||||
sql.append("LOAD '").append(ufsecpExtensionFile.getAbsolutePath()).append("'; ");
|
||||
sql.append("SELECT ufsecp_set_cache_dir('").append(Storage.getFrigateCacheDir().getAbsolutePath()).append("'); ");
|
||||
|
||||
return sql.toString().trim();
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,95 @@
|
||||
package com.sparrowwallet.frigate.index;
|
||||
|
||||
import com.sparrowwallet.frigate.io.Config;
|
||||
import com.sparrowwallet.frigate.io.Storage;
|
||||
import org.duckdb.DuckDBConnection;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.sql.*;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class DuckDBReadPool {
|
||||
private static final Logger log = LoggerFactory.getLogger(DuckDBReadPool.class);
|
||||
|
||||
private final DuckDBConnection masterConnection;
|
||||
private final ArrayBlockingQueue<Connection> pool;
|
||||
private final int maxSize;
|
||||
private volatile boolean closed = false;
|
||||
|
||||
public DuckDBReadPool(String connectionUrl, int maxSize) throws SQLException {
|
||||
this.maxSize = maxSize;
|
||||
|
||||
Properties props = new Properties();
|
||||
props.setProperty("duckdb.read_only", "true");
|
||||
props.setProperty("allow_unsigned_extensions", "true");
|
||||
this.masterConnection = (DuckDBConnection)DriverManager.getConnection(connectionUrl, props);
|
||||
|
||||
try(Statement stmt = masterConnection.createStatement()) {
|
||||
if(Config.get().getDbThreads() != null) {
|
||||
stmt.execute("SET threads = '" + Config.get().getDbThreads() + "'");
|
||||
}
|
||||
|
||||
File ufsecpExtensionFile = Storage.getUfsecpExtensionFile();
|
||||
stmt.execute("LOAD '" + ufsecpExtensionFile.getAbsolutePath() + "'");
|
||||
stmt.execute("SELECT ufsecp_set_cache_dir('" + Storage.getFrigateCacheDir().getAbsolutePath() + "')");
|
||||
}
|
||||
|
||||
this.pool = new ArrayBlockingQueue<>(maxSize);
|
||||
log.debug("DuckDB read pool created (max size: {})", maxSize);
|
||||
}
|
||||
|
||||
public Connection getConnection() throws SQLException {
|
||||
if(closed) {
|
||||
throw new SQLException("Pool is closed");
|
||||
}
|
||||
|
||||
Connection conn = pool.poll();
|
||||
if(conn != null && !conn.isClosed()) {
|
||||
return conn;
|
||||
}
|
||||
|
||||
return masterConnection.duplicate();
|
||||
}
|
||||
|
||||
public void releaseConnection(Connection conn) {
|
||||
if(closed || conn == null) {
|
||||
closeQuietly(conn);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if(conn.isClosed()) {
|
||||
return;
|
||||
}
|
||||
} catch(SQLException e) {
|
||||
return;
|
||||
}
|
||||
|
||||
if(!pool.offer(conn)) {
|
||||
closeQuietly(conn);
|
||||
}
|
||||
}
|
||||
|
||||
public void close() {
|
||||
closed = true;
|
||||
Connection conn;
|
||||
while((conn = pool.poll()) != null) {
|
||||
closeQuietly(conn);
|
||||
}
|
||||
closeQuietly(masterConnection);
|
||||
log.debug("DuckDB read pool closed");
|
||||
}
|
||||
|
||||
private static void closeQuietly(Connection conn) {
|
||||
if(conn != null) {
|
||||
try {
|
||||
conn.close();
|
||||
} catch(SQLException ignored) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -13,13 +13,13 @@ import java.sql.Statement;
|
||||
import java.util.Properties;
|
||||
|
||||
public class MemoryDbManager implements DbManager {
|
||||
private final static Logger log = LoggerFactory.getLogger(ScalingDbManager.class);
|
||||
private final static Logger log = LoggerFactory.getLogger(MemoryDbManager.class);
|
||||
|
||||
private Connection connection;
|
||||
private boolean shutdown = false;
|
||||
|
||||
@Override
|
||||
public <T> T executeRead(ReadOperation<T> operation) throws SQLException {
|
||||
public synchronized <T> T executeRead(ReadOperation<T> operation) throws SQLException {
|
||||
if(shutdown) {
|
||||
throw new SQLException("Connection manager is shutting down");
|
||||
}
|
||||
@ -29,7 +29,7 @@ public class MemoryDbManager implements DbManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T executeWrite(WriteOperation<T> operation) throws SQLException {
|
||||
public synchronized <T> T executeWrite(WriteOperation<T> operation) throws SQLException {
|
||||
if(shutdown) {
|
||||
throw new SQLException("Connection manager is shutting down");
|
||||
}
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
package com.sparrowwallet.frigate.index;
|
||||
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -15,7 +14,7 @@ public class ScalingDbManager extends AbstractDbManager {
|
||||
|
||||
private final String readWriteUrl;
|
||||
private Connection writeConnection;
|
||||
private final List<HikariDataSource> dataSources = new ArrayList<>();
|
||||
private final List<DuckDBReadPool> readPools = new ArrayList<>();
|
||||
private final AtomicInteger index = new AtomicInteger(0);
|
||||
private boolean shutdown = false;
|
||||
|
||||
@ -23,8 +22,11 @@ public class ScalingDbManager extends AbstractDbManager {
|
||||
super();
|
||||
this.readWriteUrl = readWriteUrl;
|
||||
for(String url : readOnlyUrls) {
|
||||
HikariDataSource ds = createReadDataSource(url, 1);
|
||||
dataSources.add(ds);
|
||||
try {
|
||||
readPools.add(new DuckDBReadPool(url, 10));
|
||||
} catch(SQLException e) {
|
||||
throw new RuntimeException("Failed to create DuckDB read pool for " + url, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -34,9 +36,17 @@ public class ScalingDbManager extends AbstractDbManager {
|
||||
throw new SQLException("Connection manager is shutting down");
|
||||
}
|
||||
|
||||
int ind = index.getAndIncrement() % dataSources.size();
|
||||
HikariDataSource ds = dataSources.get(ind);
|
||||
return operation.execute(ds.getConnection());
|
||||
int ind = index.getAndIncrement() % readPools.size();
|
||||
DuckDBReadPool pool = readPools.get(ind);
|
||||
Connection conn = null;
|
||||
try {
|
||||
conn = pool.getConnection();
|
||||
return operation.execute(conn);
|
||||
} finally {
|
||||
if(conn != null) {
|
||||
pool.releaseConnection(conn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -61,8 +71,8 @@ public class ScalingDbManager extends AbstractDbManager {
|
||||
log.error("Error closing write connection", e);
|
||||
}
|
||||
|
||||
for(HikariDataSource ds : dataSources) {
|
||||
ds.close();
|
||||
for(DuckDBReadPool pool : readPools) {
|
||||
pool.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
package com.sparrowwallet.frigate.index;
|
||||
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -19,7 +18,7 @@ public class SingleDbManager extends AbstractDbManager {
|
||||
private final Semaphore writerWaiting;
|
||||
|
||||
private Connection writeConnection;
|
||||
private HikariDataSource readDataSource;
|
||||
private DuckDBReadPool readPool;
|
||||
private boolean inWriteMode;
|
||||
private volatile boolean shutdown = false;
|
||||
private volatile boolean writeOperationActive = false;
|
||||
@ -50,12 +49,15 @@ public class SingleDbManager extends AbstractDbManager {
|
||||
}
|
||||
|
||||
rwLock.readLock().lock();
|
||||
Connection conn = null;
|
||||
try {
|
||||
ensureReadMode();
|
||||
try(Connection conn = readDataSource.getConnection()) {
|
||||
return operation.execute(conn);
|
||||
}
|
||||
conn = readPool.getConnection();
|
||||
return operation.execute(conn);
|
||||
} finally {
|
||||
if(conn != null) {
|
||||
readPool.releaseConnection(conn);
|
||||
}
|
||||
rwLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
@ -93,11 +95,11 @@ public class SingleDbManager extends AbstractDbManager {
|
||||
}
|
||||
|
||||
private synchronized void ensureReadMode() throws SQLException {
|
||||
if(inWriteMode || readDataSource == null) {
|
||||
if(inWriteMode || readPool == null) {
|
||||
log.debug("Switching to READ mode");
|
||||
waitForWriteOperationToComplete();
|
||||
closeWriteConnection();
|
||||
createReadDataSource();
|
||||
createReadPool();
|
||||
inWriteMode = false;
|
||||
}
|
||||
}
|
||||
@ -105,7 +107,7 @@ public class SingleDbManager extends AbstractDbManager {
|
||||
private synchronized void ensureWriteMode() throws SQLException {
|
||||
if(!inWriteMode || writeConnection == null) {
|
||||
log.debug("Switching to WRITE mode");
|
||||
closeReadDataSource();
|
||||
closeReadPool();
|
||||
createWriteConnection();
|
||||
inWriteMode = true;
|
||||
}
|
||||
@ -125,12 +127,16 @@ public class SingleDbManager extends AbstractDbManager {
|
||||
}
|
||||
}
|
||||
|
||||
private void createReadDataSource() {
|
||||
if(readDataSource != null) {
|
||||
private void createReadPool() {
|
||||
if(readPool != null) {
|
||||
return;
|
||||
}
|
||||
|
||||
readDataSource = createReadDataSource(connectionUrl, 10);
|
||||
try {
|
||||
readPool = new DuckDBReadPool(connectionUrl, 10);
|
||||
} catch(SQLException e) {
|
||||
throw new RuntimeException("Failed to create DuckDB read pool", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void createWriteConnection() throws SQLException {
|
||||
@ -156,12 +162,12 @@ public class SingleDbManager extends AbstractDbManager {
|
||||
}
|
||||
}
|
||||
|
||||
private void closeReadDataSource() {
|
||||
if(readDataSource != null) {
|
||||
private void closeReadPool() {
|
||||
if(readPool != null) {
|
||||
try {
|
||||
readDataSource.close();
|
||||
readPool.close();
|
||||
} finally {
|
||||
readDataSource = null;
|
||||
readPool = null;
|
||||
log.debug("Closed read connection pool");
|
||||
}
|
||||
}
|
||||
@ -210,24 +216,16 @@ public class SingleDbManager extends AbstractDbManager {
|
||||
}
|
||||
|
||||
try {
|
||||
closeReadDataSource();
|
||||
closeReadPool();
|
||||
} catch(Exception e) {
|
||||
log.error("Error closing read data source", e);
|
||||
log.error("Error closing read pool", e);
|
||||
}
|
||||
|
||||
log.debug("Shutdown complete");
|
||||
}
|
||||
|
||||
public int getActiveReadConnections() {
|
||||
return readDataSource != null ? readDataSource.getHikariPoolMXBean().getActiveConnections() : 0;
|
||||
}
|
||||
|
||||
public int getIdleReadConnections() {
|
||||
return readDataSource != null ? readDataSource.getHikariPoolMXBean().getIdleConnections() : 0;
|
||||
}
|
||||
|
||||
public int getTotalReadConnections() {
|
||||
return readDataSource != null ? readDataSource.getHikariPoolMXBean().getTotalConnections() : 0;
|
||||
public boolean isReadPoolActive() {
|
||||
return readPool != null;
|
||||
}
|
||||
|
||||
public boolean isShutdown() {
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
module com.sparrowwallet.frigate {
|
||||
requires com.sparrowwallet.drongo;
|
||||
requires duckdb.jdbc;
|
||||
requires com.zaxxer.hikari;
|
||||
requires com.fasterxml.jackson.annotation;
|
||||
requires simple.json.rpc.core;
|
||||
requires simple.json.rpc.client;
|
||||
|
||||
@ -3,9 +3,6 @@
|
||||
|
||||
<logger name="sun.net.www.protocol.http.HttpURLConnection" level="INFO" />
|
||||
<logger name="com.github.arteam.simplejsonrpc.server.JsonRpcServer" level="INFO" />
|
||||
<logger name="com.zaxxer.hikari.HikariDataSource" level="WARN" />
|
||||
<logger name="com.zaxxer.hikari.pool.PoolBase" level="ERROR" />
|
||||
<logger name="com.zaxxer.hikari.pool.HikariPool" level="ERROR" />
|
||||
|
||||
<define name="appDir" class="com.sparrowwallet.drongo.PropertyDefiner">
|
||||
<application>frigate</application>
|
||||
|
||||
Loading…
Reference in New Issue
Block a user