release electrum transport read lock during socket reads to avoid client request starvation

This commit is contained in:
Craig Raw 2026-05-15 11:00:06 +02:00
parent 97383a4e35
commit 273d2aacf3

View File

@ -56,9 +56,9 @@ public class TcpTransport implements CloseableTransport, TimeoutCounter {
private final Condition readingCondition = readLock.newCondition();
private final ReentrantLock clientRequestLock = new ReentrantLock();
private boolean running = false;
private volatile boolean running = false;
private volatile boolean reading = true;
private boolean closed = false;
private volatile boolean closed = false;
private boolean firstRead = true;
private int readTimeoutIndex;
private int requestIdCount = 1;
@ -164,7 +164,7 @@ public class TcpTransport implements CloseableTransport, TimeoutCounter {
firstRead = false;
}
while(reading) {
while(reading && running) {
try {
readingCondition.await();
} catch(InterruptedException e) {
@ -178,6 +178,10 @@ public class TcpTransport implements CloseableTransport, TimeoutCounter {
throw new IOException("Error reading response: " + lastException.getMessage(), lastException);
}
if(!running) {
throw new IOException("Transport closed");
}
reading = true;
readingCondition.signal();
@ -188,56 +192,80 @@ public class TcpTransport implements CloseableTransport, TimeoutCounter {
}
public void readInputLoop() throws ServerException {
readLock.lock();
readReadySignal.countDown();
BufferedReader in;
try {
try {
//Don't start reading until first RPC request is sent
readingCondition.await();
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
while(running) {
try {
String received = readInputStream(in);
wireLog.info("< " + received);
if(isNotification(received)) {
jsonRpcServer.handle(received, subscriptionService);
} else {
response = received;
reading = false;
readingCondition.signal();
readingCondition.await();
}
} catch(InterruptedException e) {
//Restore interrupt status and continue
Thread.currentThread().interrupt();
} catch(Exception e) {
log.trace("Connection error while reading", e);
if(running) {
lastException = e;
reading = false;
readingCondition.signal();
//Allow this thread to terminate as we will need to reconnect with a new transport anyway
running = false;
}
}
}
in = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
} catch(IOException e) {
if(!closed) {
log.error("Error opening socket inputstream", e);
}
if(running) {
lastException = e;
reading = false;
readingCondition.signal();
//Allow this thread to terminate as we will need to reconnect with a new transport anyway
signalException(e);
running = false;
}
return;
}
//Wait for first RPC request before starting to read. The lock must be acquired before
//signaling readiness so readResponse() blocks until we reach the atomic await/unlock.
readLock.lock();
try {
readReadySignal.countDown();
if(running) {
readingCondition.await();
}
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
return;
} finally {
readLock.unlock();
}
while(running) {
try {
String received = readInputStream(in);
wireLog.info("< " + received);
if(isNotification(received)) {
jsonRpcServer.handle(received, subscriptionService);
} else {
deliverResponse(received);
}
} catch(InterruptedException e) {
//Restore interrupt status and continue
Thread.currentThread().interrupt();
} catch(Exception e) {
if(!closed) {
log.trace("Connection error while reading", e);
}
if(running) {
signalException(e);
//Allow this thread to terminate as we will need to reconnect with a new transport anyway
running = false;
}
}
}
}
private void deliverResponse(String received) throws InterruptedException {
readLock.lock();
try {
response = received;
reading = false;
readingCondition.signal();
while(!reading && running) {
readingCondition.await();
}
} finally {
readLock.unlock();
}
}
private void signalException(Exception e) {
readLock.lock();
try {
lastException = e;
reading = false;
readingCondition.signal();
} finally {
readLock.unlock();
}
@ -303,10 +331,19 @@ public class TcpTransport implements CloseableTransport, TimeoutCounter {
@Override
public void close() throws IOException {
running = false;
closed = true;
readLock.lock();
try {
readingCondition.signalAll();
} finally {
readLock.unlock();
}
if(socket != null) {
socket.close();
}
closed = true;
}
@Override