throttle TCP writes to electrum servers (#485)

This commit is contained in:
Pierre-Marie Padiou 2018-03-21 10:17:05 +01:00 committed by Fabrice Drouin
parent 2a879f6b0c
commit 438d8e3d4b
2 changed files with 57 additions and 4 deletions

View File

@ -3,7 +3,7 @@ package fr.acinq.eclair.blockchain.electrum
import java.io.InputStream
import java.net.InetSocketAddress
import akka.actor.{Actor, ActorLogging, ActorRef, Stash, Terminated}
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Stash, Terminated}
import akka.io.{IO, Tcp}
import akka.util.ByteString
import fr.acinq.bitcoin._
@ -83,7 +83,7 @@ class ElectrumClient(serverAddresses: Seq[InetSocketAddress]) extends Actor with
}) ~ ("id" -> request.id) ~ ("jsonrpc" -> request.jsonrpc)
val serialized = compact(render(json))
val bytes = (serialized + newline).getBytes
connection ! Tcp.Write(ByteString.fromArray(bytes))
connection ! ByteString.fromArray(bytes)
}
private def nextPeer() = {
@ -119,8 +119,8 @@ class ElectrumClient(serverAddresses: Seq[InetSocketAddress]) extends Actor with
case Tcp.Connected(remote, _) =>
log.info(s"connected to $remote")
connectionFailures.clear()
val connection = sender()
connection ! Tcp.Register(self)
sender ! Tcp.Register(self)
val connection = context.actorOf(Props(new WriteAckSender(sender())), name = "electrum-sender")
val request = version
send(connection, makeRequest(request, "" + reqId))
reqId = reqId + 1

View File

@ -0,0 +1,53 @@
package fr.acinq.eclair.blockchain.electrum
import akka.actor.{Actor, ActorLogging, ActorRef, PoisonPill, Terminated}
import akka.io.Tcp
import akka.util.ByteString
/**
* Simple ACK-based throttling mechanism for sending messages to a TCP connection
* See https://doc.akka.io/docs/akka/snapshot/scala/io-tcp.html#throttling-reads-and-writes
*/
class WriteAckSender(connection: ActorRef) extends Actor with ActorLogging {
// this actor will kill itself if connection dies
context watch connection
case object Ack extends Tcp.Event
override def receive = idle
def idle: Receive = {
case data: ByteString =>
connection ! Tcp.Write(data, Ack)
context become buffering(Vector.empty[ByteString])
}
def buffering(buffer: Vector[ByteString]): Receive = {
case _: ByteString if buffer.size > MAX_BUFFERED =>
log.warning(s"buffer overrun, closing connection")
connection ! PoisonPill
case data: ByteString =>
log.debug("buffering write {}", data)
context become buffering(buffer :+ data)
case Ack =>
buffer.headOption match {
case Some(data) =>
connection ! Tcp.Write(data, Ack)
context become buffering(buffer.drop(1))
case None =>
log.debug(s"got last ack, back to idle")
context become idle
}
}
override def unhandled(message: Any): Unit = message match {
case _: Tcp.ConnectionClosed => context stop self
case _: Terminated => context stop self
case _ => log.warning(s"unhandled message $message")
}
val MAX_BUFFERED = 100000L
}