throttle TCP writes to electrum servers (#485)
This commit is contained in:
parent
2a879f6b0c
commit
438d8e3d4b
@ -3,7 +3,7 @@ package fr.acinq.eclair.blockchain.electrum
|
||||
import java.io.InputStream
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import akka.actor.{Actor, ActorLogging, ActorRef, Stash, Terminated}
|
||||
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Stash, Terminated}
|
||||
import akka.io.{IO, Tcp}
|
||||
import akka.util.ByteString
|
||||
import fr.acinq.bitcoin._
|
||||
@ -83,7 +83,7 @@ class ElectrumClient(serverAddresses: Seq[InetSocketAddress]) extends Actor with
|
||||
}) ~ ("id" -> request.id) ~ ("jsonrpc" -> request.jsonrpc)
|
||||
val serialized = compact(render(json))
|
||||
val bytes = (serialized + newline).getBytes
|
||||
connection ! Tcp.Write(ByteString.fromArray(bytes))
|
||||
connection ! ByteString.fromArray(bytes)
|
||||
}
|
||||
|
||||
private def nextPeer() = {
|
||||
@ -119,8 +119,8 @@ class ElectrumClient(serverAddresses: Seq[InetSocketAddress]) extends Actor with
|
||||
case Tcp.Connected(remote, _) =>
|
||||
log.info(s"connected to $remote")
|
||||
connectionFailures.clear()
|
||||
val connection = sender()
|
||||
connection ! Tcp.Register(self)
|
||||
sender ! Tcp.Register(self)
|
||||
val connection = context.actorOf(Props(new WriteAckSender(sender())), name = "electrum-sender")
|
||||
val request = version
|
||||
send(connection, makeRequest(request, "" + reqId))
|
||||
reqId = reqId + 1
|
||||
|
||||
@ -0,0 +1,53 @@
|
||||
package fr.acinq.eclair.blockchain.electrum
|
||||
|
||||
import akka.actor.{Actor, ActorLogging, ActorRef, PoisonPill, Terminated}
|
||||
import akka.io.Tcp
|
||||
import akka.util.ByteString
|
||||
|
||||
|
||||
/**
|
||||
* Simple ACK-based throttling mechanism for sending messages to a TCP connection
|
||||
* See https://doc.akka.io/docs/akka/snapshot/scala/io-tcp.html#throttling-reads-and-writes
|
||||
*/
|
||||
class WriteAckSender(connection: ActorRef) extends Actor with ActorLogging {
|
||||
|
||||
// this actor will kill itself if connection dies
|
||||
context watch connection
|
||||
|
||||
case object Ack extends Tcp.Event
|
||||
|
||||
override def receive = idle
|
||||
|
||||
def idle: Receive = {
|
||||
case data: ByteString =>
|
||||
connection ! Tcp.Write(data, Ack)
|
||||
context become buffering(Vector.empty[ByteString])
|
||||
}
|
||||
|
||||
def buffering(buffer: Vector[ByteString]): Receive = {
|
||||
case _: ByteString if buffer.size > MAX_BUFFERED =>
|
||||
log.warning(s"buffer overrun, closing connection")
|
||||
connection ! PoisonPill
|
||||
case data: ByteString =>
|
||||
log.debug("buffering write {}", data)
|
||||
context become buffering(buffer :+ data)
|
||||
case Ack =>
|
||||
buffer.headOption match {
|
||||
case Some(data) =>
|
||||
connection ! Tcp.Write(data, Ack)
|
||||
context become buffering(buffer.drop(1))
|
||||
case None =>
|
||||
log.debug(s"got last ack, back to idle")
|
||||
context become idle
|
||||
}
|
||||
}
|
||||
|
||||
override def unhandled(message: Any): Unit = message match {
|
||||
case _: Tcp.ConnectionClosed => context stop self
|
||||
case _: Terminated => context stop self
|
||||
case _ => log.warning(s"unhandled message $message")
|
||||
}
|
||||
|
||||
val MAX_BUFFERED = 100000L
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user