From 273d2aacf3677bf70ac3afeb754b65d7298d59cd Mon Sep 17 00:00:00 2001 From: Craig Raw Date: Fri, 15 May 2026 11:00:06 +0200 Subject: [PATCH] release electrum transport read lock during socket reads to avoid client request starvation --- .../sparrow/net/TcpTransport.java | 129 +++++++++++------- 1 file changed, 83 insertions(+), 46 deletions(-) diff --git a/src/main/java/com/sparrowwallet/sparrow/net/TcpTransport.java b/src/main/java/com/sparrowwallet/sparrow/net/TcpTransport.java index 1f7d052f..2ce45f24 100644 --- a/src/main/java/com/sparrowwallet/sparrow/net/TcpTransport.java +++ b/src/main/java/com/sparrowwallet/sparrow/net/TcpTransport.java @@ -56,9 +56,9 @@ public class TcpTransport implements CloseableTransport, TimeoutCounter { private final Condition readingCondition = readLock.newCondition(); private final ReentrantLock clientRequestLock = new ReentrantLock(); - private boolean running = false; + private volatile boolean running = false; private volatile boolean reading = true; - private boolean closed = false; + private volatile boolean closed = false; private boolean firstRead = true; private int readTimeoutIndex; private int requestIdCount = 1; @@ -164,7 +164,7 @@ public class TcpTransport implements CloseableTransport, TimeoutCounter { firstRead = false; } - while(reading) { + while(reading && running) { try { readingCondition.await(); } catch(InterruptedException e) { @@ -178,6 +178,10 @@ public class TcpTransport implements CloseableTransport, TimeoutCounter { throw new IOException("Error reading response: " + lastException.getMessage(), lastException); } + if(!running) { + throw new IOException("Transport closed"); + } + reading = true; readingCondition.signal(); @@ -188,56 +192,80 @@ public class TcpTransport implements CloseableTransport, TimeoutCounter { } public void readInputLoop() throws ServerException { - readLock.lock(); - readReadySignal.countDown(); - + BufferedReader in; try { - try { - //Don't start reading until first RPC request is sent - readingCondition.await(); - } catch(InterruptedException e) { - Thread.currentThread().interrupt(); - } - - BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); - - while(running) { - try { - String received = readInputStream(in); - wireLog.info("< " + received); - if(isNotification(received)) { - jsonRpcServer.handle(received, subscriptionService); - } else { - response = received; - reading = false; - readingCondition.signal(); - readingCondition.await(); - } - } catch(InterruptedException e) { - //Restore interrupt status and continue - Thread.currentThread().interrupt(); - } catch(Exception e) { - log.trace("Connection error while reading", e); - if(running) { - lastException = e; - reading = false; - readingCondition.signal(); - //Allow this thread to terminate as we will need to reconnect with a new transport anyway - running = false; - } - } - } + in = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); } catch(IOException e) { if(!closed) { log.error("Error opening socket inputstream", e); } if(running) { - lastException = e; - reading = false; - readingCondition.signal(); - //Allow this thread to terminate as we will need to reconnect with a new transport anyway + signalException(e); running = false; } + return; + } + + //Wait for first RPC request before starting to read. The lock must be acquired before + //signaling readiness so readResponse() blocks until we reach the atomic await/unlock. + readLock.lock(); + try { + readReadySignal.countDown(); + if(running) { + readingCondition.await(); + } + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } finally { + readLock.unlock(); + } + + while(running) { + try { + String received = readInputStream(in); + wireLog.info("< " + received); + if(isNotification(received)) { + jsonRpcServer.handle(received, subscriptionService); + } else { + deliverResponse(received); + } + } catch(InterruptedException e) { + //Restore interrupt status and continue + Thread.currentThread().interrupt(); + } catch(Exception e) { + if(!closed) { + log.trace("Connection error while reading", e); + } + if(running) { + signalException(e); + //Allow this thread to terminate as we will need to reconnect with a new transport anyway + running = false; + } + } + } + } + + private void deliverResponse(String received) throws InterruptedException { + readLock.lock(); + try { + response = received; + reading = false; + readingCondition.signal(); + while(!reading && running) { + readingCondition.await(); + } + } finally { + readLock.unlock(); + } + } + + private void signalException(Exception e) { + readLock.lock(); + try { + lastException = e; + reading = false; + readingCondition.signal(); } finally { readLock.unlock(); } @@ -303,10 +331,19 @@ public class TcpTransport implements CloseableTransport, TimeoutCounter { @Override public void close() throws IOException { + running = false; + closed = true; + + readLock.lock(); + try { + readingCondition.signalAll(); + } finally { + readLock.unlock(); + } + if(socket != null) { socket.close(); } - closed = true; } @Override