This updates the required minimum protocol version to the maximum set by dcrwallet 1.8.0. Outbound connections will require the current protocol version.
4317 lines
139 KiB
Go
4317 lines
139 KiB
Go
// Copyright (c) 2013-2016 The btcsuite developers
|
|
// Copyright (c) 2015-2023 The Decred developers
|
|
// Use of this source code is governed by an ISC
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"crypto/elliptic"
|
|
"crypto/rand"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"net"
|
|
"os"
|
|
"path"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/decred/dcrd/addrmgr/v2"
|
|
"github.com/decred/dcrd/blockchain/stake/v5"
|
|
"github.com/decred/dcrd/blockchain/standalone/v2"
|
|
"github.com/decred/dcrd/certgen"
|
|
"github.com/decred/dcrd/chaincfg/chainhash"
|
|
"github.com/decred/dcrd/chaincfg/v3"
|
|
"github.com/decred/dcrd/connmgr/v3"
|
|
"github.com/decred/dcrd/container/apbf"
|
|
"github.com/decred/dcrd/database/v3"
|
|
"github.com/decred/dcrd/dcrutil/v4"
|
|
"github.com/decred/dcrd/internal/blockchain"
|
|
"github.com/decred/dcrd/internal/blockchain/indexers"
|
|
"github.com/decred/dcrd/internal/fees"
|
|
"github.com/decred/dcrd/internal/mempool"
|
|
"github.com/decred/dcrd/internal/mining"
|
|
"github.com/decred/dcrd/internal/mining/cpuminer"
|
|
"github.com/decred/dcrd/internal/netsync"
|
|
"github.com/decred/dcrd/internal/rpcserver"
|
|
"github.com/decred/dcrd/internal/version"
|
|
"github.com/decred/dcrd/math/uint256"
|
|
"github.com/decred/dcrd/peer/v3"
|
|
"github.com/decred/dcrd/txscript/v4"
|
|
"github.com/decred/dcrd/wire"
|
|
"github.com/syndtr/goleveldb/leveldb"
|
|
)
|
|
|
|
const (
|
|
// defaultServices describes the default services that are supported by
|
|
// the server.
|
|
defaultServices = wire.SFNodeNetwork
|
|
|
|
// defaultRequiredServices describes the default services that are
|
|
// required to be supported by outbound peers.
|
|
defaultRequiredServices = wire.SFNodeNetwork
|
|
|
|
// defaultTargetOutbound is the default number of outbound peers to
|
|
// target.
|
|
defaultTargetOutbound = 8
|
|
|
|
// defaultMaximumVoteAge is the threshold of blocks before the tip
|
|
// that can be voted on.
|
|
defaultMaximumVoteAge = 1440
|
|
|
|
// connectionRetryInterval is the base amount of time to wait in between
|
|
// retries when connecting to persistent peers. It is adjusted by the
|
|
// number of retries such that there is a retry backoff.
|
|
connectionRetryInterval = time.Second * 5
|
|
|
|
// maxProtocolVersion is the max protocol version the server supports.
|
|
maxProtocolVersion = wire.RemoveRejectVersion
|
|
|
|
// These fields are used to track known addresses on a per-peer basis.
|
|
//
|
|
// maxKnownAddrsPerPeer is the maximum number of items to track.
|
|
//
|
|
// knownAddrsFPRate is the false positive rate for the APBF used to track
|
|
// them. It is set to a rate of 1 per 1000 since addresses are not very
|
|
// large and they only need to be filtered once per connection, so an extra
|
|
// 10 of them being sent (on average) again even though they technically
|
|
// wouldn't need to is a good tradeoff.
|
|
//
|
|
// These values result in about 40 KiB memory usage including overhead.
|
|
maxKnownAddrsPerPeer = 10000
|
|
knownAddrsFPRate = 0.001
|
|
|
|
// maxCachedNaSubmissions is the maximum number of network address
|
|
// submissions cached.
|
|
maxCachedNaSubmissions = 20
|
|
|
|
// These constants control the maximum number of simultaneous pending
|
|
// getdata messages and the individual data item requests they make without
|
|
// being disconnected.
|
|
//
|
|
// Since each getdata message is comprised of several individual data item
|
|
// requests, the limiting is applied on both dimensions to offer more
|
|
// flexibility while still keeping memory usage bounded to reasonable
|
|
// limits.
|
|
//
|
|
// maxConcurrentGetDataReqs is the maximum number of simultaneous pending
|
|
// getdata message requests.
|
|
//
|
|
// maxPendingGetDataItemReqs is the maximum number of overall total
|
|
// simultaneous pending individual data item requests.
|
|
//
|
|
// In other words, when combined, a peer may mix and match simultaneous
|
|
// getdata requests for varying amounts of data items so long as it does not
|
|
// exceed the maximum specified number of simultaneous pending getdata
|
|
// messages or the maximum number of total overall pending individual data
|
|
// item requests.
|
|
maxConcurrentGetDataReqs = 1000
|
|
maxPendingGetDataItemReqs = 2 * wire.MaxInvPerMsg
|
|
|
|
// maxReorgDepthNotify specifies the maximum reorganization depth for which
|
|
// winning ticket notifications will be sent over RPC. The reorg depth is
|
|
// the number of blocks that would be reorganized out of the current best
|
|
// chain if a side chain being considered for notifications were to
|
|
// ultimately be extended to be longer than the current one.
|
|
//
|
|
// In effect, this helps to prevent large reorgs by refusing to send the
|
|
// winning ticket information to RPC clients, such as voting wallets, which
|
|
// depend on it to cast votes.
|
|
//
|
|
// This check also doubles to help reduce exhaustion attacks that could
|
|
// otherwise arise from sending old orphan blocks and forcing nodes to do
|
|
// expensive lottery data calculations for them.
|
|
maxReorgDepthNotify = 6
|
|
|
|
// These fields are used to track recently confirmed transactions.
|
|
//
|
|
// maxRecentlyConfirmedTxns specifies the maximum number to track and is set
|
|
// to target tracking the maximum number transactions of the minimum
|
|
// realistic size (~206 bytes) in approximately one hour of blocks on the
|
|
// main network.
|
|
//
|
|
// recentlyConfirmedTxnsFPRate is the false positive rate for the APBF used
|
|
// to track them and is set to a rate of 1 per 1 million which supports up
|
|
// to ~11.5 transactions/s before a single false positive would be seen on
|
|
// average and thus allows for plenty of future growth.
|
|
//
|
|
// These values result in about 183 KiB memory usage including overhead.
|
|
maxRecentlyConfirmedTxns = 23000
|
|
recentlyConfirmedTxnsFPRate = 0.000001
|
|
)
|
|
|
|
var (
|
|
// userAgentName is the user agent name and is used to help identify
|
|
// ourselves to other Decred peers.
|
|
userAgentName = "dcrd"
|
|
|
|
// userAgentVersion is the user agent version and is used to help
|
|
// identify ourselves to other peers.
|
|
userAgentVersion = fmt.Sprintf("%d.%d.%d", version.Major, version.Minor,
|
|
version.Patch)
|
|
)
|
|
|
|
// simpleAddr implements the net.Addr interface with two struct fields
|
|
type simpleAddr struct {
|
|
net, addr string
|
|
}
|
|
|
|
// String returns the address.
|
|
//
|
|
// This is part of the net.Addr interface.
|
|
func (a simpleAddr) String() string {
|
|
return a.addr
|
|
}
|
|
|
|
// Network returns the network.
|
|
//
|
|
// This is part of the net.Addr interface.
|
|
func (a simpleAddr) Network() string {
|
|
return a.net
|
|
}
|
|
|
|
// Ensure simpleAddr implements the net.Addr interface.
|
|
var _ net.Addr = simpleAddr{}
|
|
|
|
// broadcastMsg provides the ability to house a Decred message to be broadcast
|
|
// to all connected peers except specified excluded peers.
|
|
type broadcastMsg struct {
|
|
message wire.Message
|
|
excludePeers []*serverPeer
|
|
}
|
|
|
|
// broadcastInventoryAdd is a type used to declare that the InvVect it contains
|
|
// needs to be added to the rebroadcast map
|
|
type broadcastInventoryAdd relayMsg
|
|
|
|
// broadcastInventoryDel is a type used to declare that the InvVect it contains
|
|
// needs to be removed from the rebroadcast map
|
|
type broadcastInventoryDel *wire.InvVect
|
|
|
|
// broadcastPruneInventory is a type used to declare that rebroadcast
|
|
// inventory entries need to be filtered and removed where necessary
|
|
type broadcastPruneInventory struct{}
|
|
|
|
// relayMsg packages an inventory vector along with the newly discovered
|
|
// inventory and a flag that determines if the relay should happen immediately
|
|
// (it will be put into a trickle queue if false) so the relay has access to
|
|
// that information.
|
|
type relayMsg struct {
|
|
invVect *wire.InvVect
|
|
data interface{}
|
|
immediate bool
|
|
reqServices wire.ServiceFlag
|
|
}
|
|
|
|
// naSubmission represents a network address submission from an outbound peer.
|
|
type naSubmission struct {
|
|
na *wire.NetAddress
|
|
netType addrmgr.NetAddressType
|
|
reach addrmgr.NetAddressReach
|
|
score uint32
|
|
lastAccessed int64
|
|
}
|
|
|
|
// naSubmissionCache represents a bounded map for network address submisions.
|
|
type naSubmissionCache struct {
|
|
cache map[string]*naSubmission
|
|
limit int
|
|
mtx sync.Mutex
|
|
}
|
|
|
|
// add caches the provided address submission.
|
|
func (sc *naSubmissionCache) add(sub *naSubmission) error {
|
|
if sub == nil {
|
|
return fmt.Errorf("submission cannot be nil")
|
|
}
|
|
|
|
key := sub.na.IP.String()
|
|
if key == "" {
|
|
return fmt.Errorf("submission key cannot be an empty string")
|
|
}
|
|
|
|
sc.mtx.Lock()
|
|
defer sc.mtx.Unlock()
|
|
|
|
// Remove the oldest submission if cache limit has been reached.
|
|
if len(sc.cache) == sc.limit {
|
|
var oldestSub *naSubmission
|
|
for _, sub := range sc.cache {
|
|
if oldestSub == nil {
|
|
oldestSub = sub
|
|
continue
|
|
}
|
|
|
|
if sub.lastAccessed < oldestSub.lastAccessed {
|
|
oldestSub = sub
|
|
}
|
|
}
|
|
|
|
if oldestSub != nil {
|
|
delete(sc.cache, oldestSub.na.IP.String())
|
|
}
|
|
}
|
|
|
|
sub.score = 1
|
|
sub.lastAccessed = time.Now().Unix()
|
|
sc.cache[key] = sub
|
|
return nil
|
|
}
|
|
|
|
// exists returns true if the provided key exist in the submissions cache.
|
|
func (sc *naSubmissionCache) exists(key string) bool {
|
|
if key == "" {
|
|
return false
|
|
}
|
|
|
|
sc.mtx.Lock()
|
|
_, ok := sc.cache[key]
|
|
sc.mtx.Unlock()
|
|
return ok
|
|
}
|
|
|
|
// incrementScore increases the score of address submission referenced by
|
|
// the provided key by one.
|
|
func (sc *naSubmissionCache) incrementScore(key string) error {
|
|
if key == "" {
|
|
return fmt.Errorf("submission key cannot be an empty string")
|
|
}
|
|
|
|
sc.mtx.Lock()
|
|
defer sc.mtx.Unlock()
|
|
|
|
sub, ok := sc.cache[key]
|
|
if !ok {
|
|
return fmt.Errorf("submission key not found: %s", key)
|
|
}
|
|
|
|
sub.score++
|
|
sub.lastAccessed = time.Now().Unix()
|
|
sc.cache[key] = sub
|
|
return nil
|
|
}
|
|
|
|
// bestSubmission fetches the best scoring submission of the provided
|
|
// network interface.
|
|
func (sc *naSubmissionCache) bestSubmission(net addrmgr.NetAddressType) *naSubmission {
|
|
sc.mtx.Lock()
|
|
defer sc.mtx.Unlock()
|
|
|
|
var best *naSubmission
|
|
for _, sub := range sc.cache {
|
|
if sub.netType != net {
|
|
continue
|
|
}
|
|
|
|
if best == nil {
|
|
best = sub
|
|
continue
|
|
}
|
|
|
|
if sub.score > best.score {
|
|
best = sub
|
|
}
|
|
}
|
|
|
|
return best
|
|
}
|
|
|
|
// peerState maintains state of inbound, persistent, outbound peers as well
|
|
// as banned peers and outbound groups.
|
|
type peerState struct {
|
|
inboundPeers map[int32]*serverPeer
|
|
outboundPeers map[int32]*serverPeer
|
|
persistentPeers map[int32]*serverPeer
|
|
banned map[string]time.Time
|
|
outboundGroups map[string]int
|
|
subCache *naSubmissionCache
|
|
}
|
|
|
|
// ConnectionsWithIP returns the number of connections with the given IP.
|
|
func (ps *peerState) ConnectionsWithIP(ip net.IP) int {
|
|
var total int
|
|
for _, p := range ps.inboundPeers {
|
|
if ip.Equal(p.NA().IP) {
|
|
total++
|
|
}
|
|
}
|
|
for _, p := range ps.outboundPeers {
|
|
if ip.Equal(p.NA().IP) {
|
|
total++
|
|
}
|
|
}
|
|
for _, p := range ps.persistentPeers {
|
|
if ip.Equal(p.NA().IP) {
|
|
total++
|
|
}
|
|
}
|
|
return total
|
|
}
|
|
|
|
// Count returns the count of all known peers.
|
|
func (ps *peerState) Count() int {
|
|
return len(ps.inboundPeers) + len(ps.outboundPeers) +
|
|
len(ps.persistentPeers)
|
|
}
|
|
|
|
// forAllOutboundPeers is a helper function that runs closure on all outbound
|
|
// peers known to peerState.
|
|
func (ps *peerState) forAllOutboundPeers(closure func(sp *serverPeer)) {
|
|
for _, e := range ps.outboundPeers {
|
|
closure(e)
|
|
}
|
|
for _, e := range ps.persistentPeers {
|
|
closure(e)
|
|
}
|
|
}
|
|
|
|
// forAllPeers is a helper function that runs closure on all peers known to
|
|
// peerState.
|
|
func (ps *peerState) forAllPeers(closure func(sp *serverPeer)) {
|
|
for _, e := range ps.inboundPeers {
|
|
closure(e)
|
|
}
|
|
ps.forAllOutboundPeers(closure)
|
|
}
|
|
|
|
// ResolveLocalAddress picks the best suggested network address from available
|
|
// options, per the network interface key provided. The best suggestion, if
|
|
// found, is added as a local address.
|
|
func (ps *peerState) ResolveLocalAddress(netType addrmgr.NetAddressType, addrMgr *addrmgr.AddrManager, services wire.ServiceFlag) {
|
|
best := ps.subCache.bestSubmission(netType)
|
|
if best == nil {
|
|
return
|
|
}
|
|
|
|
targetOutbound := defaultTargetOutbound
|
|
if cfg.MaxPeers < targetOutbound {
|
|
targetOutbound = cfg.MaxPeers
|
|
}
|
|
|
|
// A valid best address suggestion must have a majority
|
|
// (60 percent majority) of outbound peers concluding on
|
|
// the same result.
|
|
if best.score < uint32(math.Ceil(float64(targetOutbound)*0.6)) {
|
|
return
|
|
}
|
|
|
|
addLocalAddress := func(bestSuggestion string, port uint16, services wire.ServiceFlag) {
|
|
na, err := addrMgr.HostToNetAddress(bestSuggestion, port, services)
|
|
if err != nil {
|
|
amgrLog.Errorf("unable to generate network address using host %v: "+
|
|
"%v", bestSuggestion, err)
|
|
return
|
|
}
|
|
|
|
if !addrMgr.HasLocalAddress(na) {
|
|
err := addrMgr.AddLocalAddress(na, addrmgr.ManualPrio)
|
|
if err != nil {
|
|
amgrLog.Errorf("unable to add local address: %v", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
stripIPv6Zone := func(ip string) string {
|
|
// Strip IPv6 zone id if present.
|
|
zoneIndex := strings.LastIndex(ip, "%")
|
|
if zoneIndex > 0 {
|
|
return ip[:zoneIndex]
|
|
}
|
|
|
|
return ip
|
|
}
|
|
|
|
for _, listener := range cfg.Listeners {
|
|
host, portStr, err := net.SplitHostPort(listener)
|
|
if err != nil {
|
|
amgrLog.Errorf("unable to split network address: %v", err)
|
|
return
|
|
}
|
|
|
|
port, err := strconv.ParseUint(portStr, 10, 16)
|
|
if err != nil {
|
|
amgrLog.Errorf("unable to parse port: %v", err)
|
|
return
|
|
}
|
|
|
|
host = stripIPv6Zone(host)
|
|
|
|
// Add a local address if the best suggestion is referenced by a
|
|
// listener.
|
|
if best.na.IP.String() == host {
|
|
addLocalAddress(best.na.IP.String(), uint16(port), services)
|
|
continue
|
|
}
|
|
|
|
// Add a local address if the listener is generic (applies
|
|
// for both IPv4 and IPv6).
|
|
if host == "" || (host == "*" && runtime.GOOS == "plan9") {
|
|
addLocalAddress(best.na.IP.String(), uint16(port), services)
|
|
continue
|
|
}
|
|
|
|
listenerIP := net.ParseIP(host)
|
|
if listenerIP == nil {
|
|
amgrLog.Errorf("unable to parse listener: %v", host)
|
|
return
|
|
}
|
|
|
|
// Add a local address if the network address is a probable external
|
|
// endpoint of the listener.
|
|
lNa := wire.NewNetAddressIPPort(listenerIP, uint16(port), services)
|
|
lNet := addrmgr.IPv4Address
|
|
if lNa.IP.To4() == nil {
|
|
lNet = addrmgr.IPv6Address
|
|
}
|
|
|
|
validExternal := (lNet == addrmgr.IPv4Address &&
|
|
best.reach == addrmgr.Ipv4) || lNet == addrmgr.IPv6Address &&
|
|
(best.reach == addrmgr.Ipv6Weak || best.reach == addrmgr.Ipv6Strong ||
|
|
best.reach == addrmgr.Teredo)
|
|
|
|
if validExternal {
|
|
addLocalAddress(best.na.IP.String(), uint16(port), services)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
// server provides a Decred server for handling communications to and from
|
|
// Decred peers.
|
|
type server struct {
|
|
bytesReceived atomic.Uint64 // Total bytes received from all peers since start.
|
|
bytesSent atomic.Uint64 // Total bytes sent by all peers since start.
|
|
shutdown atomic.Bool
|
|
|
|
// minKnownWork houses the minimum known work from the associated network
|
|
// params converted to a uint256 so the conversion only needs to be
|
|
// performed once when the server is initialized. Ideally, the chain params
|
|
// should be updated to use the new type, but that will be a major version
|
|
// bump, so a one-time conversion is a good tradeoff in the mean time.
|
|
minKnownWork uint256.Uint256
|
|
|
|
chainParams *chaincfg.Params
|
|
addrManager *addrmgr.AddrManager
|
|
connManager *connmgr.ConnManager
|
|
sigCache *txscript.SigCache
|
|
subsidyCache *standalone.SubsidyCache
|
|
rpcServer *rpcserver.Server
|
|
syncManager *netsync.SyncManager
|
|
bg *mining.BgBlkTmplGenerator
|
|
chain *blockchain.BlockChain
|
|
txMemPool *mempool.TxPool
|
|
feeEstimator *fees.Estimator
|
|
cpuMiner *cpuminer.CPUMiner
|
|
modifyRebroadcastInv chan interface{}
|
|
newPeers chan *serverPeer
|
|
donePeers chan *serverPeer
|
|
banPeers chan *serverPeer
|
|
query chan interface{}
|
|
relayInv chan relayMsg
|
|
broadcast chan broadcastMsg
|
|
nat *upnpNAT
|
|
db database.DB
|
|
timeSource blockchain.MedianTimeSource
|
|
services wire.ServiceFlag
|
|
quit chan struct{}
|
|
|
|
// The following fields are used for optional indexes. They will be nil
|
|
// if the associated index is not enabled. These fields are set during
|
|
// initial creation of the server and never changed afterwards, so they
|
|
// do not need to be protected for concurrent access.
|
|
indexSubscriber *indexers.IndexSubscriber
|
|
txIndex *indexers.TxIndex
|
|
existsAddrIndex *indexers.ExistsAddrIndex
|
|
|
|
// These following fields are used to filter duplicate block lottery data
|
|
// anouncements.
|
|
lotteryDataBroadcastMtx sync.RWMutex
|
|
lotteryDataBroadcast map[chainhash.Hash]struct{}
|
|
|
|
// recentlyConfirmedTxns tracks transactions that have been confirmed in the
|
|
// most recent blocks.
|
|
recentlyConfirmedTxns *apbf.Filter
|
|
}
|
|
|
|
// serverPeer extends the peer to maintain state shared by the server.
|
|
type serverPeer struct {
|
|
*peer.Peer
|
|
|
|
connReq *connmgr.ConnReq
|
|
server *server
|
|
persistent bool
|
|
continueHash atomic.Pointer[chainhash.Hash]
|
|
relayMtx sync.Mutex
|
|
disableRelayTx bool
|
|
isWhitelisted bool
|
|
knownAddresses *apbf.Filter
|
|
banScore connmgr.DynamicBanScore
|
|
quit chan struct{}
|
|
|
|
// syncMgrPeer houses the network sync manager peer instance that wraps the
|
|
// underlying peer similar to the way this server peer itself wraps it.
|
|
syncMgrPeer *netsync.Peer
|
|
|
|
// addrsSent, getMiningStateSent and initState all track whether or not
|
|
// the peer has already sent the respective request. It is used to
|
|
// prevent more than one response per connection.
|
|
addrsSent bool
|
|
getMiningStateSent bool
|
|
initStateSent bool
|
|
|
|
// The following fields are used to synchronize the net sync manager and
|
|
// server.
|
|
txProcessed chan struct{}
|
|
blockProcessed chan struct{}
|
|
|
|
// peerNa is network address of the peer connected to.
|
|
peerNa *wire.NetAddress
|
|
peerNaMtx sync.Mutex
|
|
|
|
// announcedBlock tracks the most recent block announced to this peer and is
|
|
// used to filter duplicates.
|
|
announcedBlock *chainhash.Hash
|
|
|
|
// The following fields are used to serve getdata requests asynchronously as
|
|
// opposed to directly in the peer input handler.
|
|
//
|
|
// getDataQueue is a buffered channel for queueing up concurrent getdata
|
|
// requests.
|
|
//
|
|
// numPendingGetDataItemReqs tracks the total number of pending individual
|
|
// data item requests that still need to be served.
|
|
getDataQueue chan []*wire.InvVect
|
|
numPendingGetDataItemReqs atomic.Uint32
|
|
}
|
|
|
|
// newServerPeer returns a new serverPeer instance. The peer needs to be set by
|
|
// the caller.
|
|
func newServerPeer(s *server, isPersistent bool) *serverPeer {
|
|
return &serverPeer{
|
|
server: s,
|
|
persistent: isPersistent,
|
|
knownAddresses: apbf.NewFilter(maxKnownAddrsPerPeer, knownAddrsFPRate),
|
|
quit: make(chan struct{}),
|
|
txProcessed: make(chan struct{}, 1),
|
|
blockProcessed: make(chan struct{}, 1),
|
|
getDataQueue: make(chan []*wire.InvVect, maxConcurrentGetDataReqs),
|
|
}
|
|
}
|
|
|
|
// handleServeGetData is the primary logic for servicing queued getdata
|
|
// requests.
|
|
//
|
|
// It makes use of the given send done channel and semaphore to provide
|
|
// a little pipelining of database loads while keeping the memory usage bounded
|
|
// to reasonable limits.
|
|
//
|
|
// It is invoked from the serveGetData goroutine.
|
|
func (sp *serverPeer) handleServeGetData(invVects []*wire.InvVect,
|
|
sendDoneChan chan struct{}, semaphore chan struct{}) {
|
|
|
|
var notFoundMsg *wire.MsgNotFound
|
|
for _, iv := range invVects {
|
|
var sendInv bool
|
|
var dataMsg wire.Message
|
|
switch iv.Type {
|
|
case wire.InvTypeTx:
|
|
// Attempt to fetch the requested transaction from the pool. A call
|
|
// could be made to check for existence first, but simply trying to
|
|
// fetch a missing transaction results in the same behavior. Do not
|
|
// allow peers to request transactions already in a block but are
|
|
// unconfirmed, as they may be expensive. Restrict that to the
|
|
// authenticated RPC only.
|
|
txHash := &iv.Hash
|
|
tx, err := sp.server.txMemPool.FetchTransaction(txHash)
|
|
if err != nil {
|
|
peerLog.Tracef("Unable to fetch tx %v from transaction pool: %v",
|
|
txHash, err)
|
|
break
|
|
}
|
|
dataMsg = tx.MsgTx()
|
|
|
|
case wire.InvTypeBlock:
|
|
blockHash := &iv.Hash
|
|
block, err := sp.server.chain.BlockByHash(blockHash)
|
|
if err != nil {
|
|
peerLog.Tracef("Unable to fetch requested block hash %v: %v",
|
|
blockHash, err)
|
|
break
|
|
}
|
|
dataMsg = block.MsgBlock()
|
|
|
|
// When the peer requests the final block that was advertised in
|
|
// response to a getblocks message which requested more blocks than
|
|
// would fit into a single message, it requires a new inventory
|
|
// message to trigger it to issue another getblocks message for the
|
|
// next batch of inventory.
|
|
//
|
|
// However, that inventory message should not be sent until after
|
|
// the block itself is sent, so keep a flag for later use.
|
|
//
|
|
// Note that this is to support the legacy syncing model that is no
|
|
// longer used in dcrd which is now based on a much more robust
|
|
// headers-based syncing model. Nevertheless, this behavior is
|
|
// still a required part of the getblocks protocol semantics. It
|
|
// can be removed if a future protocol upgrade also removes the
|
|
// getblocks message.
|
|
continueHash := sp.continueHash.Load()
|
|
sendInv = continueHash != nil && *continueHash == *blockHash
|
|
|
|
default:
|
|
peerLog.Warnf("Unknown type '%d' in inventory request from %s",
|
|
iv.Type, sp)
|
|
continue
|
|
}
|
|
if dataMsg == nil {
|
|
// Keep track of all items that were not found in order to send a
|
|
// consolidated messsage once the entire batch is processed.
|
|
//
|
|
// The error when adding the inventory vector is ignored because the
|
|
// only way it could fail would be by exceeding the max allowed
|
|
// number of items which is impossible given the getdata message is
|
|
// enforced to not exceed that same maximum limit.
|
|
if notFoundMsg == nil {
|
|
notFoundMsg = wire.NewMsgNotFound()
|
|
}
|
|
notFoundMsg.AddInvVect(iv)
|
|
|
|
// There is no need to wait for the semaphore below when there is
|
|
// not any data to send.
|
|
sp.numPendingGetDataItemReqs.Add(^uint32(0))
|
|
continue
|
|
}
|
|
|
|
// Limit the number of items that can be queued to prevent wasting a
|
|
// bunch of memory by queuing far more data than can be sent in a
|
|
// reasonable time. The waiting occurs after the database fetch for the
|
|
// next one to provide a little pipelining.
|
|
//
|
|
// This also monitors the channel that is notified when queued messages
|
|
// are sent in order to release the semaphore without needing a separate
|
|
// monitoring goroutine.
|
|
for semAcquired := false; !semAcquired; {
|
|
select {
|
|
case <-sp.quit:
|
|
return
|
|
|
|
case semaphore <- struct{}{}:
|
|
semAcquired = true
|
|
|
|
case <-sendDoneChan:
|
|
// Release semaphore.
|
|
<-semaphore
|
|
}
|
|
}
|
|
|
|
// Decrement the pending data item requests accordingly and queue the
|
|
// data to be sent to the peer.
|
|
sp.numPendingGetDataItemReqs.Add(^uint32(0))
|
|
sp.QueueMessage(dataMsg, sendDoneChan)
|
|
|
|
// Send a new inventory message to trigger the peer to issue another
|
|
// getblocks message for the next batch of inventory if needed.
|
|
if sendInv {
|
|
best := sp.server.chain.BestSnapshot()
|
|
invMsg := wire.NewMsgInvSizeHint(1)
|
|
iv := wire.NewInvVect(wire.InvTypeBlock, &best.Hash)
|
|
invMsg.AddInvVect(iv)
|
|
sp.QueueMessage(invMsg, nil)
|
|
sp.continueHash.Store(nil)
|
|
}
|
|
}
|
|
if notFoundMsg != nil {
|
|
sp.QueueMessage(notFoundMsg, nil)
|
|
}
|
|
}
|
|
|
|
// serveGetData provides an asynchronous queue that services all data requested
|
|
// via getdata requests such that the peer may mix and match simultaneous
|
|
// getdata requests for varying amounts of data items so long as it does not
|
|
// exceed the maximum number of simultaneous pending getdata messages or the
|
|
// maximum number of total overall pending data item requests.
|
|
//
|
|
// It must be run in a goroutine.
|
|
func (sp *serverPeer) serveGetData() {
|
|
// Allow a max number of items to be loaded from the database/mempool and
|
|
// queued for send.
|
|
const maxPendingSend = 3
|
|
sendDoneChan := make(chan struct{}, maxPendingSend+1)
|
|
semaphore := make(chan struct{}, maxPendingSend)
|
|
|
|
for {
|
|
select {
|
|
case <-sp.quit:
|
|
return
|
|
|
|
case invVects := <-sp.getDataQueue:
|
|
sp.handleServeGetData(invVects, sendDoneChan, semaphore)
|
|
|
|
// Release the semaphore as queued messages are sent.
|
|
case <-sendDoneChan:
|
|
<-semaphore
|
|
}
|
|
}
|
|
}
|
|
|
|
// Run starts additional async processing for the peer and blocks until the peer
|
|
// disconnects at which point it notifies the server and net sync manager that
|
|
// the peer has disconnected and performs other associated cleanup such as
|
|
// evicting any remaining orphans sent by the peer and shutting down all
|
|
// goroutines.
|
|
func (sp *serverPeer) Run() {
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
sp.serveGetData()
|
|
wg.Done()
|
|
}()
|
|
|
|
// Wait for the peer to disconnect and notify the net sync manager and
|
|
// server accordingly.
|
|
sp.WaitForDisconnect()
|
|
srvr := sp.server
|
|
srvr.DonePeer(sp)
|
|
srvr.syncManager.PeerDisconnected(sp.syncMgrPeer)
|
|
|
|
if sp.VersionKnown() {
|
|
// Evict any remaining orphans that were sent by the peer.
|
|
numEvicted := srvr.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID()))
|
|
if numEvicted > 0 {
|
|
srvrLog.Debugf("Evicted %d %s from peer %v (id %d)", numEvicted,
|
|
pickNoun(numEvicted, "orphan", "orphans"), sp, sp.ID())
|
|
}
|
|
}
|
|
|
|
// Shutdown remaining peer goroutines.
|
|
close(sp.quit)
|
|
wg.Wait()
|
|
}
|
|
|
|
// newestBlock returns the current best block hash and height using the format
|
|
// required by the configuration for the peer package.
|
|
func (sp *serverPeer) newestBlock() (*chainhash.Hash, int64, error) {
|
|
best := sp.server.chain.BestSnapshot()
|
|
return &best.Hash, best.Height, nil
|
|
}
|
|
|
|
// addKnownAddress adds the given address to the set of known addresses to
|
|
// the peer to prevent sending duplicate addresses.
|
|
func (sp *serverPeer) addKnownAddress(na *addrmgr.NetAddress) {
|
|
sp.knownAddresses.Add([]byte(na.Key()))
|
|
}
|
|
|
|
// addKnownAddresses adds the given addresses to the set of known addresses to
|
|
// the peer to prevent sending duplicate addresses.
|
|
func (sp *serverPeer) addKnownAddresses(addresses []*addrmgr.NetAddress) {
|
|
for _, na := range addresses {
|
|
sp.addKnownAddress(na)
|
|
}
|
|
}
|
|
|
|
// addressKnown true if the given address is already known to the peer.
|
|
func (sp *serverPeer) addressKnown(na *addrmgr.NetAddress) bool {
|
|
return sp.knownAddresses.Contains([]byte(na.Key()))
|
|
}
|
|
|
|
// setDisableRelayTx toggles relaying of transactions for the given peer.
|
|
// It is safe for concurrent access.
|
|
func (sp *serverPeer) setDisableRelayTx(disable bool) {
|
|
sp.relayMtx.Lock()
|
|
sp.disableRelayTx = disable
|
|
sp.relayMtx.Unlock()
|
|
}
|
|
|
|
// relayTxDisabled returns whether or not relaying of transactions for the given
|
|
// peer is disabled.
|
|
// It is safe for concurrent access.
|
|
func (sp *serverPeer) relayTxDisabled() bool {
|
|
sp.relayMtx.Lock()
|
|
isDisabled := sp.disableRelayTx
|
|
sp.relayMtx.Unlock()
|
|
|
|
return isDisabled
|
|
}
|
|
|
|
// wireToAddrmgrNetAddress converts a wire NetAddress to an address manager
|
|
// NetAddress.
|
|
func wireToAddrmgrNetAddress(netAddr *wire.NetAddress) *addrmgr.NetAddress {
|
|
newNetAddr := addrmgr.NewNetAddressIPPort(netAddr.IP, netAddr.Port, netAddr.Services)
|
|
newNetAddr.Timestamp = netAddr.Timestamp
|
|
return newNetAddr
|
|
}
|
|
|
|
// wireToAddrmgrNetAddresses converts a collection of wire net addresses to a
|
|
// collection of address manager net addresses.
|
|
func wireToAddrmgrNetAddresses(netAddr []*wire.NetAddress) []*addrmgr.NetAddress {
|
|
addrs := make([]*addrmgr.NetAddress, len(netAddr))
|
|
for i, wireAddr := range netAddr {
|
|
addrs[i] = wireToAddrmgrNetAddress(wireAddr)
|
|
}
|
|
return addrs
|
|
}
|
|
|
|
// addrmgrToWireNetAddress converts an address manager net address to a wire net
|
|
// address.
|
|
func addrmgrToWireNetAddress(netAddr *addrmgr.NetAddress) *wire.NetAddress {
|
|
return wire.NewNetAddressTimestamp(netAddr.Timestamp, netAddr.Services,
|
|
netAddr.IP, netAddr.Port)
|
|
}
|
|
|
|
// pushAddrMsg sends an addr message to the connected peer using the provided
|
|
// addresses.
|
|
func (sp *serverPeer) pushAddrMsg(addresses []*addrmgr.NetAddress) {
|
|
// Filter addresses already known to the peer.
|
|
addrs := make([]*wire.NetAddress, 0, len(addresses))
|
|
for _, addr := range addresses {
|
|
if !sp.addressKnown(addr) {
|
|
wireNetAddr := addrmgrToWireNetAddress(addr)
|
|
addrs = append(addrs, wireNetAddr)
|
|
}
|
|
}
|
|
known, err := sp.PushAddrMsg(addrs)
|
|
if err != nil {
|
|
peerLog.Errorf("Can't push address message to %s: %v", sp, err)
|
|
sp.Disconnect()
|
|
return
|
|
}
|
|
|
|
knownNetAddrs := wireToAddrmgrNetAddresses(known)
|
|
sp.addKnownAddresses(knownNetAddrs)
|
|
}
|
|
|
|
// addBanScore increases the persistent and decaying ban score fields by the
|
|
// values passed as parameters. If the resulting score exceeds half of the ban
|
|
// threshold, a warning is logged including the reason provided. Further, if
|
|
// the score is above the ban threshold, the peer will be banned and
|
|
// disconnected.
|
|
func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) bool {
|
|
// No warning is logged and no score is calculated if banning is disabled.
|
|
if cfg.DisableBanning {
|
|
return false
|
|
}
|
|
if sp.isWhitelisted {
|
|
peerLog.Debugf("Misbehaving whitelisted peer %s: %s", sp, reason)
|
|
return false
|
|
}
|
|
|
|
warnThreshold := cfg.BanThreshold >> 1
|
|
if transient == 0 && persistent == 0 {
|
|
// The score is not being increased, but a warning message is still
|
|
// logged if the score is above the warn threshold.
|
|
score := sp.banScore.Int()
|
|
if score > warnThreshold {
|
|
peerLog.Warnf("Misbehaving peer %s: %s -- ban score is %d, "+
|
|
"it was not increased this time", sp, reason, score)
|
|
}
|
|
return false
|
|
}
|
|
score := sp.banScore.Increase(persistent, transient)
|
|
if score > warnThreshold {
|
|
peerLog.Warnf("Misbehaving peer %s: %s -- ban score increased to %d",
|
|
sp, reason, score)
|
|
if score > cfg.BanThreshold {
|
|
peerLog.Warnf("Misbehaving peer %s -- banning and disconnecting",
|
|
sp)
|
|
sp.server.BanPeer(sp)
|
|
sp.Disconnect()
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// hasServices returns whether or not the provided advertised service flags have
|
|
// all of the provided desired service flags set.
|
|
func hasServices(advertised, desired wire.ServiceFlag) bool {
|
|
return advertised&desired == desired
|
|
}
|
|
|
|
// OnVersion is invoked when a peer receives a version wire message and is used
|
|
// to negotiate the protocol version details as well as kick start the
|
|
// communications.
|
|
func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) {
|
|
// Update the address manager with the advertised services for outbound
|
|
// connections in case they have changed. This is not done for inbound
|
|
// connections to help prevent malicious behavior and is skipped when
|
|
// running on the simulation and regression test networks since they are
|
|
// only intended to connect to specified peers and actively avoid
|
|
// advertising and connecting to discovered peers.
|
|
//
|
|
// NOTE: This is done before rejecting peers that are too old to ensure
|
|
// it is updated regardless in the case a new minimum protocol version is
|
|
// enforced and the remote node has not upgraded yet.
|
|
isInbound := sp.Inbound()
|
|
remoteAddr := wireToAddrmgrNetAddress(sp.NA())
|
|
addrManager := sp.server.addrManager
|
|
if !cfg.SimNet && !cfg.RegNet && !isInbound {
|
|
err := addrManager.SetServices(remoteAddr, msg.Services)
|
|
if err != nil {
|
|
srvrLog.Errorf("Setting services for address failed: %v", err)
|
|
}
|
|
}
|
|
|
|
// Enforce the minimum protocol limit on outbound connections.
|
|
if !isInbound && msg.ProtocolVersion < int32(wire.RemoveRejectVersion) {
|
|
srvrLog.Debugf("Rejecting outbound peer %s with protocol version %d prior to "+
|
|
"the required version %d", sp, msg.ProtocolVersion,
|
|
wire.RemoveRejectVersion)
|
|
sp.Disconnect()
|
|
return
|
|
}
|
|
|
|
// Reject peers that have a protocol version that is too old.
|
|
// This is the maximum protocol version negotiated by dcrwallet 1.8.0.
|
|
if msg.ProtocolVersion < int32(wire.InitStateVersion) {
|
|
srvrLog.Debugf("Rejecting peer %s with protocol version %d prior to "+
|
|
"the required version %d", sp, msg.ProtocolVersion,
|
|
wire.InitStateVersion)
|
|
sp.Disconnect()
|
|
return
|
|
}
|
|
|
|
// Reject outbound peers that are not full nodes.
|
|
wantServices := wire.SFNodeNetwork
|
|
if !isInbound && !hasServices(msg.Services, wantServices) {
|
|
missingServices := wantServices & ^msg.Services
|
|
srvrLog.Debugf("Rejecting peer %s with services %v due to not "+
|
|
"providing desired services %v", sp, msg.Services, missingServices)
|
|
sp.Disconnect()
|
|
return
|
|
}
|
|
|
|
// Update the address manager and request known addresses from the
|
|
// remote peer for outbound connections. This is skipped when running
|
|
// on the simulation and regression test networks since they are only
|
|
// intended to connect to specified peers and actively avoid advertising
|
|
// and connecting to discovered peers.
|
|
if !cfg.SimNet && !cfg.RegNet && !isInbound {
|
|
// Advertise the local address when the server accepts incoming
|
|
// connections and it believes itself to be close to the best
|
|
// known tip.
|
|
if !cfg.DisableListen && sp.server.syncManager.IsCurrent() {
|
|
// Get address that best matches.
|
|
lna := addrManager.GetBestLocalAddress(remoteAddr)
|
|
if lna.IsRoutable() {
|
|
// Filter addresses the peer already knows about.
|
|
addresses := []*addrmgr.NetAddress{lna}
|
|
sp.pushAddrMsg(addresses)
|
|
}
|
|
}
|
|
|
|
// Request known addresses if the server address manager needs
|
|
// more.
|
|
if addrManager.NeedMoreAddresses() {
|
|
sp.QueueMessage(wire.NewMsgGetAddr(), nil)
|
|
}
|
|
|
|
// Mark the address as a known good address.
|
|
err := addrManager.Good(remoteAddr)
|
|
if err != nil {
|
|
srvrLog.Errorf("Marking address as good failed: %v", err)
|
|
}
|
|
}
|
|
|
|
sp.peerNaMtx.Lock()
|
|
sp.peerNa = &msg.AddrYou
|
|
sp.peerNaMtx.Unlock()
|
|
|
|
// Choose whether or not to relay transactions.
|
|
sp.setDisableRelayTx(msg.DisableRelayTx)
|
|
|
|
// Add the remote peer time as a sample for creating an offset against
|
|
// the local clock to keep the network time in sync.
|
|
sp.server.timeSource.AddTimeSample(sp.Addr(), msg.Timestamp)
|
|
|
|
// Add valid peer to the server.
|
|
sp.server.AddPeer(sp)
|
|
}
|
|
|
|
// OnVerAck is invoked when a peer receives a verack wire message. It creates
|
|
// and sends a sendheaders message to request all block annoucements are made
|
|
// via full headers instead of the inv message.
|
|
func (sp *serverPeer) OnVerAck(_ *peer.Peer, msg *wire.MsgVerAck) {
|
|
sp.QueueMessage(wire.NewMsgSendHeaders(), nil)
|
|
}
|
|
|
|
// OnMemPool is invoked when a peer receives a mempool wire message. It creates
|
|
// and sends an inventory message with the contents of the memory pool up to the
|
|
// maximum inventory allowed per message.
|
|
func (sp *serverPeer) OnMemPool(_ *peer.Peer, msg *wire.MsgMemPool) {
|
|
// A decaying ban score increase is applied to prevent flooding.
|
|
// The ban score accumulates and passes the ban threshold if a burst of
|
|
// mempool messages comes from a peer. The score decays each minute to
|
|
// half of its value.
|
|
if sp.addBanScore(0, 33, "mempool") {
|
|
return
|
|
}
|
|
|
|
// Generate inventory message with the available transactions in the
|
|
// transaction memory pool. Limit it to the max allowed inventory
|
|
// per message. The NewMsgInvSizeHint function automatically limits
|
|
// the passed hint to the maximum allowed, so it's safe to pass it
|
|
// without double checking it here.
|
|
txMemPool := sp.server.txMemPool
|
|
txDescs := txMemPool.TxDescs()
|
|
|
|
// Send the inventory message if there is anything to send.
|
|
for _, txDesc := range txDescs {
|
|
iv := wire.NewInvVect(wire.InvTypeTx, txDesc.Tx.Hash())
|
|
sp.QueueInventory(iv)
|
|
}
|
|
}
|
|
|
|
// pushMiningStateMsg pushes a mining state message to the queue for a
|
|
// requesting peer.
|
|
func (sp *serverPeer) pushMiningStateMsg(height uint32, blockHashes []chainhash.Hash, voteHashes []chainhash.Hash) error {
|
|
// Nothing to send, abort.
|
|
if len(blockHashes) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Construct the mining state request and queue it to be sent.
|
|
msg := wire.NewMsgMiningState()
|
|
msg.Height = height
|
|
for i := range blockHashes {
|
|
err := msg.AddBlockHash(&blockHashes[i])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
for i := range voteHashes {
|
|
err := msg.AddVoteHash(&voteHashes[i])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if i+1 >= wire.MaxMSBlocksAtHeadPerMsg {
|
|
break
|
|
}
|
|
}
|
|
|
|
sp.QueueMessage(msg, nil)
|
|
|
|
return nil
|
|
}
|
|
|
|
// OnGetMiningState is invoked when a peer receives a getminings wire message.
|
|
// It constructs a list of the current best blocks and votes that should be
|
|
// mined on and pushes a miningstate wire message back to the requesting peer.
|
|
func (sp *serverPeer) OnGetMiningState(_ *peer.Peer, msg *wire.MsgGetMiningState) {
|
|
if sp.getMiningStateSent {
|
|
peerLog.Tracef("Ignoring getminingstate from %v - already sent", sp)
|
|
return
|
|
}
|
|
sp.getMiningStateSent = true
|
|
|
|
// Send out blank mining states if it's early in the blockchain.
|
|
best := sp.server.chain.BestSnapshot()
|
|
if best.Height < sp.server.chainParams.StakeValidationHeight-1 {
|
|
err := sp.pushMiningStateMsg(0, nil, nil)
|
|
if err != nil {
|
|
peerLog.Warnf("unexpected error while pushing data for "+
|
|
"mining state request: %v", err.Error())
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Obtain the entire generation of blocks stemming from the parent of the
|
|
// current tip.
|
|
children := sp.server.chain.TipGeneration()
|
|
|
|
// Get the list of blocks that are eligible to build on and limit the
|
|
// list to the maximum number of allowed eligible block hashes per
|
|
// mining state message. There is nothing to send when there are no
|
|
// eligible blocks.
|
|
mp := sp.server.txMemPool
|
|
blockHashes := mining.SortParentsByVotes(mp, best.Hash, children,
|
|
sp.server.chainParams)
|
|
numBlocks := len(blockHashes)
|
|
if numBlocks == 0 {
|
|
return
|
|
}
|
|
if numBlocks > wire.MaxMSBlocksAtHeadPerMsg {
|
|
blockHashes = blockHashes[:wire.MaxMSBlocksAtHeadPerMsg]
|
|
}
|
|
|
|
// Construct the set of votes to send.
|
|
voteHashes := make([]chainhash.Hash, 0, wire.MaxMSVotesAtHeadPerMsg)
|
|
for i := range blockHashes {
|
|
// Fetch the vote hashes themselves and append them.
|
|
bh := &blockHashes[i]
|
|
vhsForBlock := mp.VoteHashesForBlock(bh)
|
|
if len(vhsForBlock) == 0 {
|
|
peerLog.Warnf("unexpected error while fetching vote hashes "+
|
|
"for block %v for a mining state request: no vote "+
|
|
"metadata for block", bh)
|
|
return
|
|
}
|
|
voteHashes = append(voteHashes, vhsForBlock...)
|
|
}
|
|
|
|
err := sp.pushMiningStateMsg(uint32(best.Height), blockHashes, voteHashes)
|
|
if err != nil {
|
|
peerLog.Warnf("unexpected error while pushing data for "+
|
|
"mining state request: %v", err.Error())
|
|
}
|
|
}
|
|
|
|
// OnMiningState is invoked when a peer receives a miningstate wire message. It
|
|
// requests the data advertised in the message from the peer.
|
|
func (sp *serverPeer) OnMiningState(_ *peer.Peer, msg *wire.MsgMiningState) {
|
|
var blockHashes, voteHashes []chainhash.Hash
|
|
if len(msg.BlockHashes) > 0 {
|
|
blockHashes = make([]chainhash.Hash, 0, len(msg.BlockHashes))
|
|
for _, hash := range msg.BlockHashes {
|
|
blockHashes = append(blockHashes, *hash)
|
|
}
|
|
}
|
|
if len(msg.VoteHashes) > 0 {
|
|
voteHashes = make([]chainhash.Hash, 0, len(msg.VoteHashes))
|
|
for _, hash := range msg.VoteHashes {
|
|
voteHashes = append(voteHashes, *hash)
|
|
}
|
|
}
|
|
|
|
err := sp.server.syncManager.RequestFromPeer(sp.syncMgrPeer, blockHashes,
|
|
voteHashes, nil)
|
|
if err != nil {
|
|
peerLog.Warnf("couldn't handle mining state message: %v",
|
|
err.Error())
|
|
}
|
|
}
|
|
|
|
// OnGetInitState is invoked when a peer receives a getinitstate wire message.
|
|
// It sends the available requested info to the remote peer.
|
|
func (sp *serverPeer) OnGetInitState(_ *peer.Peer, msg *wire.MsgGetInitState) {
|
|
if sp.initStateSent {
|
|
peerLog.Tracef("Ignoring getinitstate from %v - already sent", sp)
|
|
return
|
|
}
|
|
sp.initStateSent = true
|
|
|
|
// Send out blank mining states if it's early in the blockchain.
|
|
best := sp.server.chain.BestSnapshot()
|
|
if best.Height < sp.server.chainParams.StakeValidationHeight-1 {
|
|
sp.QueueMessage(wire.NewMsgInitState(), nil)
|
|
return
|
|
}
|
|
|
|
// Response data.
|
|
var blockHashes, voteHashes, tspendHashes []chainhash.Hash
|
|
|
|
// Map from the types slice into a map for easier checking.
|
|
types := make(map[string]struct{}, len(msg.Types))
|
|
for _, typ := range msg.Types {
|
|
types[typ] = struct{}{}
|
|
}
|
|
_, wantBlocks := types[wire.InitStateHeadBlocks]
|
|
_, wantVotes := types[wire.InitStateHeadBlockVotes]
|
|
_, wantTSpends := types[wire.InitStateTSpends]
|
|
|
|
// Fetch head block hashes if we need to send either them or their
|
|
// votes.
|
|
mp := sp.server.txMemPool
|
|
if wantBlocks || wantVotes {
|
|
// Obtain the entire generation of blocks stemming from the parent of
|
|
// the current tip.
|
|
children := sp.server.chain.TipGeneration()
|
|
|
|
// Get the list of blocks that are eligible to build on and
|
|
// limit the list to the maximum number of allowed eligible
|
|
// block hashes per init state message. There is nothing to
|
|
// send when there are no eligible blocks.
|
|
blockHashes = mining.SortParentsByVotes(mp, best.Hash, children,
|
|
sp.server.chainParams)
|
|
if len(blockHashes) > wire.MaxISBlocksAtHeadPerMsg {
|
|
blockHashes = blockHashes[:wire.MaxISBlocksAtHeadPerMsg]
|
|
}
|
|
}
|
|
|
|
// Construct the set of votes to send.
|
|
if wantVotes {
|
|
for i := range blockHashes {
|
|
// Fetch the vote hashes themselves and append them.
|
|
bh := &blockHashes[i]
|
|
vhsForBlock := mp.VoteHashesForBlock(bh)
|
|
voteHashes = append(voteHashes, vhsForBlock...)
|
|
}
|
|
}
|
|
|
|
// Construct tspends to send.
|
|
if wantTSpends {
|
|
tspendHashes = mp.TSpendHashes()
|
|
}
|
|
|
|
// Clear out block hashes to be sent if they weren't requested.
|
|
if !wantBlocks {
|
|
blockHashes = nil
|
|
}
|
|
|
|
// Build and push the response.
|
|
initMsg, err := wire.NewMsgInitStateFilled(blockHashes, voteHashes, tspendHashes)
|
|
if err != nil {
|
|
peerLog.Warnf("Unexpected error while building initstate msg: %v", err)
|
|
return
|
|
}
|
|
sp.QueueMessage(initMsg, nil)
|
|
}
|
|
|
|
// OnInitState is invoked when a peer receives a initstate wire message. It
|
|
// requests the data advertised in the message from the peer.
|
|
func (sp *serverPeer) OnInitState(_ *peer.Peer, msg *wire.MsgInitState) {
|
|
err := sp.server.syncManager.RequestFromPeer(sp.syncMgrPeer,
|
|
msg.BlockHashes, msg.VoteHashes, msg.TSpendHashes)
|
|
if err != nil {
|
|
peerLog.Warnf("couldn't handle init state message: %v", err)
|
|
}
|
|
}
|
|
|
|
// OnTx is invoked when a peer receives a tx wire message. It blocks until the
|
|
// transaction has been fully processed. Unlock the block handler this does not
|
|
// serialize all transactions through a single thread transactions don't rely on
|
|
// the previous one in a linear fashion like blocks.
|
|
func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) {
|
|
if cfg.BlocksOnly {
|
|
peerLog.Tracef("Ignoring tx %v from %v - blocksonly enabled",
|
|
msg.TxHash(), sp)
|
|
return
|
|
}
|
|
|
|
// Add the transaction to the known inventory for the peer.
|
|
// Convert the raw MsgTx to a dcrutil.Tx which provides some convenience
|
|
// methods and things such as hash caching.
|
|
tx := dcrutil.NewTx(msg)
|
|
iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
|
|
sp.AddKnownInventory(iv)
|
|
|
|
// Queue the transaction up to be handled by the net sync manager and
|
|
// intentionally block further receives until the transaction is fully
|
|
// processed and known good or bad. This helps prevent a malicious peer
|
|
// from queuing up a bunch of bad transactions before disconnecting (or
|
|
// being disconnected) and wasting memory.
|
|
sp.server.syncManager.QueueTx(tx, sp.syncMgrPeer, sp.txProcessed)
|
|
<-sp.txProcessed
|
|
}
|
|
|
|
// OnBlock is invoked when a peer receives a block wire message. It blocks
|
|
// until the network block has been fully processed.
|
|
func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) {
|
|
// Convert the raw MsgBlock to a dcrutil.Block which provides some
|
|
// convenience methods and things such as hash caching.
|
|
block := dcrutil.NewBlockFromBlockAndBytes(msg, buf)
|
|
|
|
// Add the block to the known inventory for the peer.
|
|
iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
|
|
sp.AddKnownInventory(iv)
|
|
|
|
// Queue the block up to be handled by the net sync manager and
|
|
// intentionally block further receives until the network block is fully
|
|
// processed and known good or bad. This helps prevent a malicious peer
|
|
// from queuing up a bunch of bad blocks before disconnecting (or being
|
|
// disconnected) and wasting memory. Additionally, this behavior is
|
|
// depended on by at least the block acceptance test tool as the reference
|
|
// implementation processes blocks in the same thread and therefore blocks
|
|
// further messages until the network block has been fully processed.
|
|
sp.server.syncManager.QueueBlock(block, sp.syncMgrPeer, sp.blockProcessed)
|
|
<-sp.blockProcessed
|
|
}
|
|
|
|
// OnInv is invoked when a peer receives an inv wire message and is used to
|
|
// examine the inventory being advertised by the remote peer and react
|
|
// accordingly. We pass the message down to the net sync manager which will
|
|
// call QueueMessage with any appropriate responses.
|
|
func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) {
|
|
// Ban peers sending empty inventory announcements.
|
|
if len(msg.InvList) == 0 {
|
|
sp.server.BanPeer(sp)
|
|
return
|
|
}
|
|
|
|
if !cfg.BlocksOnly {
|
|
sp.server.syncManager.QueueInv(msg, sp.syncMgrPeer)
|
|
return
|
|
}
|
|
|
|
newInv := wire.NewMsgInvSizeHint(uint(len(msg.InvList)))
|
|
for _, invVect := range msg.InvList {
|
|
if invVect.Type == wire.InvTypeTx {
|
|
peerLog.Infof("Peer %v is announcing transactions -- disconnecting",
|
|
sp)
|
|
sp.Disconnect()
|
|
return
|
|
}
|
|
err := newInv.AddInvVect(invVect)
|
|
if err != nil {
|
|
peerLog.Errorf("Failed to add inventory vector: %v", err)
|
|
break
|
|
}
|
|
}
|
|
|
|
sp.server.syncManager.QueueInv(newInv, sp.syncMgrPeer)
|
|
}
|
|
|
|
// OnHeaders is invoked when a peer receives a headers wire message. The
|
|
// message is passed down to the net sync manager.
|
|
func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) {
|
|
sp.server.syncManager.QueueHeaders(msg, sp.syncMgrPeer)
|
|
}
|
|
|
|
// OnGetData is invoked when a peer receives a getdata wire message and is used
|
|
// to deliver block and transaction information.
|
|
func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
|
|
// Ban peers sending empty getdata requests.
|
|
if len(msg.InvList) == 0 {
|
|
sp.server.BanPeer(sp)
|
|
return
|
|
}
|
|
|
|
// A decaying ban score increase is applied to prevent exhausting resources
|
|
// with unusually large inventory queries.
|
|
//
|
|
// Requesting more than the maximum inventory vector length within a short
|
|
// period of time yields a score above the default ban threshold. Sustained
|
|
// bursts of small requests are not penalized as that would potentially ban
|
|
// peers performing the inintial chain sync.
|
|
//
|
|
// This incremental score decays each minute to half of its value.
|
|
numNewReqs := uint32(len(msg.InvList))
|
|
if sp.addBanScore(0, numNewReqs*99/wire.MaxInvPerMsg, "getdata") {
|
|
return
|
|
}
|
|
|
|
// Prevent too many outstanding requests while still allowing the
|
|
// flexibility to send multiple simultaneous getdata requests that are
|
|
// served asynchronously.
|
|
numPendingGetDataReqs := len(sp.getDataQueue)
|
|
if numPendingGetDataReqs+1 > maxConcurrentGetDataReqs {
|
|
peerLog.Debugf("%s exceeded max allowed concurrent pending getdata "+
|
|
"requests (max %d) -- disconnecting", sp, maxConcurrentGetDataReqs)
|
|
sp.Disconnect()
|
|
return
|
|
}
|
|
numPendingDataItemReqs := sp.numPendingGetDataItemReqs.Load()
|
|
if numPendingDataItemReqs+numNewReqs > maxPendingGetDataItemReqs {
|
|
peerLog.Debugf("%s exceeded max allowed pending data item requests "+
|
|
"(new %d, pending %d, max %d) -- disconnecting", sp, numNewReqs,
|
|
numPendingDataItemReqs, maxPendingGetDataItemReqs)
|
|
sp.Disconnect()
|
|
return
|
|
}
|
|
|
|
// Queue the data requests to be served asynchronously. Note that this will
|
|
// not block due to the use of a buffered channel and the checks above that
|
|
// disconnect the peer when the new request would otherwise exceed the
|
|
// capacity.
|
|
sp.numPendingGetDataItemReqs.Add(numNewReqs)
|
|
select {
|
|
case <-sp.quit:
|
|
case sp.getDataQueue <- msg.InvList:
|
|
}
|
|
}
|
|
|
|
// OnGetBlocks is invoked when a peer receives a getblocks wire message.
|
|
func (sp *serverPeer) OnGetBlocks(_ *peer.Peer, msg *wire.MsgGetBlocks) {
|
|
// Find the most recent known block in the best chain based on the block
|
|
// locator and fetch all of the block hashes after it until either
|
|
// wire.MaxBlocksPerMsg have been fetched or the provided stop hash is
|
|
// encountered.
|
|
//
|
|
// Use the block after the genesis block if no other blocks in the
|
|
// provided locator are known. This does mean the client will start
|
|
// over with the genesis block if unknown block locators are provided.
|
|
chain := sp.server.chain
|
|
hashList := chain.LocateBlocks(msg.BlockLocatorHashes, &msg.HashStop,
|
|
wire.MaxBlocksPerMsg)
|
|
|
|
// Generate inventory message.
|
|
invMsg := wire.NewMsgInv()
|
|
for i := range hashList {
|
|
iv := wire.NewInvVect(wire.InvTypeBlock, &hashList[i])
|
|
if sp.IsKnownInventory(iv) {
|
|
// TODO: Increase ban score
|
|
continue
|
|
}
|
|
invMsg.AddInvVect(iv)
|
|
}
|
|
|
|
// Send the inventory message if there is anything to send.
|
|
if len(invMsg.InvList) > 0 {
|
|
invListLen := len(invMsg.InvList)
|
|
if invListLen == wire.MaxBlocksPerMsg {
|
|
// Intentionally use a copy of the final hash so there
|
|
// is not a reference into the inventory slice which
|
|
// would prevent the entire slice from being eligible
|
|
// for GC as soon as it's sent.
|
|
continueHash := invMsg.InvList[invListLen-1].Hash
|
|
sp.continueHash.Store(&continueHash)
|
|
}
|
|
sp.QueueMessage(invMsg, nil)
|
|
}
|
|
}
|
|
|
|
// OnGetHeaders is invoked when a peer receives a getheaders wire message.
|
|
func (sp *serverPeer) OnGetHeaders(_ *peer.Peer, msg *wire.MsgGetHeaders) {
|
|
// Send an empty headers message in the case the local best known chain does
|
|
// not have the minimum cumulative work value already known to have been
|
|
// achieved on the network. This signals to the remote peer that there are
|
|
// no interesting headers available without appearing unresponsive.
|
|
chain := sp.server.chain
|
|
tipHash := chain.BestSnapshot().Hash
|
|
workSum, err := chain.ChainWork(&tipHash)
|
|
if err == nil && workSum.Lt(&sp.server.minKnownWork) {
|
|
srvrLog.Debugf("Sending empty headers to peer %s in response to "+
|
|
"getheaders due to local best known tip having too little work", sp)
|
|
sp.QueueMessage(&wire.MsgHeaders{}, nil)
|
|
return
|
|
}
|
|
|
|
// Find the most recent known block in the best chain based on the block
|
|
// locator and fetch all of the headers after it until either
|
|
// wire.MaxBlockHeadersPerMsg have been fetched or the provided stop
|
|
// hash is encountered.
|
|
//
|
|
// Use the block after the genesis block if no other blocks in the
|
|
// provided locator are known. This does mean the client will start
|
|
// over with the genesis block if unknown block locators are provided.
|
|
locatorHashes := msg.BlockLocatorHashes
|
|
headers := chain.LocateHeaders(locatorHashes, &msg.HashStop)
|
|
|
|
// Send found headers to the requesting peer.
|
|
blockHeaders := make([]*wire.BlockHeader, len(headers))
|
|
for i := range headers {
|
|
blockHeaders[i] = &headers[i]
|
|
}
|
|
sp.QueueMessage(&wire.MsgHeaders{Headers: blockHeaders}, nil)
|
|
}
|
|
|
|
// enforceNodeCFFlag bans the peer if it has negotiated to a protocol version
|
|
// that is high enough to observe the committed filter service support bit since
|
|
// it is intentionally violating the protocol by requesting one from when the
|
|
// server does not advertise support for it.
|
|
//
|
|
// It disconnects the peer when it has negotiated to a protocol version prior to
|
|
// being able to understand the service bit.
|
|
func (sp *serverPeer) enforceNodeCFFlag(cmd string) {
|
|
// Ban the peer if the protocol version is high enough that the peer is
|
|
// knowingly violating the protocol and banning is enabled.
|
|
//
|
|
// NOTE: Even though the addBanScore function already examines whether
|
|
// or not banning is enabled, it is checked here as well to ensure the
|
|
// violation is logged and the peer is disconnected regardless.
|
|
if sp.ProtocolVersion() >= wire.NodeCFVersion && !cfg.DisableBanning {
|
|
// Disconnect the peer regardless of whether it was banned.
|
|
sp.addBanScore(100, 0, cmd)
|
|
sp.Disconnect()
|
|
return
|
|
}
|
|
|
|
// Disconnect the peer regardless of protocol version or banning state.
|
|
peerLog.Debugf("%s sent an unsupported %s request -- disconnecting", sp,
|
|
cmd)
|
|
sp.Disconnect()
|
|
}
|
|
|
|
// OnGetCFilter is invoked when a peer receives a getcfilter wire message.
|
|
func (sp *serverPeer) OnGetCFilter(_ *peer.Peer, msg *wire.MsgGetCFilter) {
|
|
// Disconnect and/or ban depending on the node cf services flag and
|
|
// negotiated protocol version.
|
|
sp.enforceNodeCFFlag(msg.Command())
|
|
}
|
|
|
|
// OnGetCFilterV2 is invoked when a peer receives a getcfilterv2 wire message.
|
|
func (sp *serverPeer) OnGetCFilterV2(_ *peer.Peer, msg *wire.MsgGetCFilterV2) {
|
|
// Attempt to obtain the requested filter.
|
|
//
|
|
// Ignore request for unknown block or otherwise missing filters.
|
|
chain := sp.server.chain
|
|
filter, proof, err := chain.FilterByBlockHash(&msg.BlockHash)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
filterMsg := wire.NewMsgCFilterV2(&msg.BlockHash, filter.Bytes(),
|
|
proof.ProofIndex, proof.ProofHashes)
|
|
sp.QueueMessage(filterMsg, nil)
|
|
}
|
|
|
|
// OnGetCFHeaders is invoked when a peer receives a getcfheader wire message.
|
|
func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) {
|
|
// Disconnect and/or ban depending on the node cf services flag and
|
|
// negotiated protocol version.
|
|
sp.enforceNodeCFFlag(msg.Command())
|
|
}
|
|
|
|
// OnGetCFTypes is invoked when a peer receives a getcftypes wire message.
|
|
func (sp *serverPeer) OnGetCFTypes(_ *peer.Peer, msg *wire.MsgGetCFTypes) {
|
|
// Disconnect and/or ban depending on the node cf services flag and
|
|
// negotiated protocol version.
|
|
sp.enforceNodeCFFlag(msg.Command())
|
|
}
|
|
|
|
// OnGetAddr is invoked when a peer receives a getaddr wire message and is used
|
|
// to provide the peer with known addresses from the address manager.
|
|
func (sp *serverPeer) OnGetAddr(_ *peer.Peer, msg *wire.MsgGetAddr) {
|
|
// Don't return any addresses when running on the simulation and regression
|
|
// test networks. This helps prevent the networks from becoming another
|
|
// public test network since they will not be able to learn about other
|
|
// peers that have not specifically been provided.
|
|
if cfg.SimNet || cfg.RegNet {
|
|
return
|
|
}
|
|
|
|
// Do not accept getaddr requests from outbound peers. This reduces
|
|
// fingerprinting attacks.
|
|
if !sp.Inbound() {
|
|
return
|
|
}
|
|
|
|
// Only respond with addresses once per connection. This helps reduce
|
|
// traffic and further reduces fingerprinting attacks.
|
|
if sp.addrsSent {
|
|
peerLog.Tracef("Ignoring getaddr from %v - already sent", sp)
|
|
return
|
|
}
|
|
sp.addrsSent = true
|
|
|
|
// Get the current known addresses from the address manager.
|
|
addrCache := sp.server.addrManager.AddressCache()
|
|
|
|
// Push the addresses.
|
|
sp.pushAddrMsg(addrCache)
|
|
}
|
|
|
|
// OnAddr is invoked when a peer receives an addr wire message and is used to
|
|
// notify the server about advertised addresses.
|
|
func (sp *serverPeer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) {
|
|
// Ignore addresses when running on the simulation and regression test
|
|
// networks. This helps prevent the networks from becoming another public
|
|
// test network since they will not be able to learn about other peers that
|
|
// have not specifically been provided.
|
|
if cfg.SimNet || cfg.RegNet {
|
|
return
|
|
}
|
|
|
|
// A message that has no addresses is invalid.
|
|
if len(msg.AddrList) == 0 {
|
|
peerLog.Errorf("Command [%s] from %s does not contain any addresses",
|
|
msg.Command(), sp)
|
|
|
|
// Ban peers sending empty address requests.
|
|
sp.server.BanPeer(sp)
|
|
return
|
|
}
|
|
|
|
now := time.Now()
|
|
addrList := wireToAddrmgrNetAddresses(msg.AddrList)
|
|
for _, na := range addrList {
|
|
// Don't add more address if we're disconnecting.
|
|
if !sp.Connected() {
|
|
return
|
|
}
|
|
|
|
// Set the timestamp to 5 days ago if it's more than 24 hours
|
|
// in the future so this address is one of the first to be
|
|
// removed when space is needed.
|
|
if na.Timestamp.After(now.Add(time.Minute * 10)) {
|
|
na.Timestamp = now.Add(-1 * time.Hour * 24 * 5)
|
|
}
|
|
|
|
// Add address to known addresses for this peer.
|
|
sp.addKnownAddress(na)
|
|
}
|
|
|
|
// Add addresses to server address manager. The address manager handles
|
|
// the details of things such as preventing duplicate addresses, max
|
|
// addresses, and last seen updates.
|
|
remoteAddr := wireToAddrmgrNetAddress(sp.NA())
|
|
sp.server.addrManager.AddAddresses(addrList, remoteAddr)
|
|
}
|
|
|
|
// OnRead is invoked when a peer receives a message and it is used to update
|
|
// the bytes received by the server.
|
|
func (sp *serverPeer) OnRead(_ *peer.Peer, bytesRead int, msg wire.Message, err error) {
|
|
// Ban peers sending messages that do not conform to the wire protocol.
|
|
var errCode wire.ErrorCode
|
|
if errors.As(err, &errCode) {
|
|
peerLog.Errorf("Unable to read wire message from %s: %v", sp, err)
|
|
sp.server.BanPeer(sp)
|
|
}
|
|
|
|
sp.server.AddBytesReceived(uint64(bytesRead))
|
|
}
|
|
|
|
// OnWrite is invoked when a peer sends a message and it is used to update
|
|
// the bytes sent by the server.
|
|
func (sp *serverPeer) OnWrite(_ *peer.Peer, bytesWritten int, msg wire.Message, err error) {
|
|
sp.server.AddBytesSent(uint64(bytesWritten))
|
|
}
|
|
|
|
// OnNotFound is invoked when a peer sends a notfound message.
|
|
func (sp *serverPeer) OnNotFound(_ *peer.Peer, msg *wire.MsgNotFound) {
|
|
if !sp.Connected() {
|
|
return
|
|
}
|
|
|
|
var numBlocks, numTxns uint32
|
|
for _, inv := range msg.InvList {
|
|
switch inv.Type {
|
|
case wire.InvTypeBlock:
|
|
numBlocks++
|
|
case wire.InvTypeTx:
|
|
numTxns++
|
|
default:
|
|
peerLog.Debugf("Invalid inv type '%d' in notfound message from %s",
|
|
inv.Type, sp)
|
|
sp.Disconnect()
|
|
return
|
|
}
|
|
}
|
|
if numBlocks > 0 {
|
|
blockStr := pickNoun(uint64(numBlocks), "block", "blocks")
|
|
reason := fmt.Sprintf("%d %v not found", numBlocks, blockStr)
|
|
if sp.addBanScore(20*numBlocks, 0, reason) {
|
|
return
|
|
}
|
|
}
|
|
if numTxns > 0 {
|
|
txStr := pickNoun(uint64(numTxns), "transaction", "transactions")
|
|
reason := fmt.Sprintf("%d %v not found", numTxns, txStr)
|
|
if sp.addBanScore(0, 10*numTxns, reason) {
|
|
return
|
|
}
|
|
}
|
|
sp.server.syncManager.QueueNotFound(msg, sp.syncMgrPeer)
|
|
}
|
|
|
|
// randomUint16Number returns a random uint16 in a specified input range. Note
|
|
// that the range is in zeroth ordering; if you pass it 1800, you will get
|
|
// values from 0 to 1800.
|
|
func randomUint16Number(max uint16) uint16 {
|
|
// In order to avoid modulo bias and ensure every possible outcome in
|
|
// [0, max) has equal probability, the random number must be sampled
|
|
// from a random source that has a range limited to a multiple of the
|
|
// modulus.
|
|
var randomNumber uint16
|
|
var limitRange = (math.MaxUint16 / max) * max
|
|
for {
|
|
binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
|
|
if randomNumber < limitRange {
|
|
return (randomNumber % max)
|
|
}
|
|
}
|
|
}
|
|
|
|
// attemptDcrdDial is a wrapper function around dcrdDial which adds and marks
|
|
// the remote peer as attempted in the address manager.
|
|
func (s *server) attemptDcrdDial(ctx context.Context, network, addr string) (net.Conn, error) {
|
|
if !cfg.SimNet && !cfg.RegNet {
|
|
host, portStr, err := net.SplitHostPort(addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
port, err := strconv.ParseUint(portStr, 10, 16)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
remoteAddr, err := s.addrManager.HostToNetAddress(host, uint16(port), 0)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Be sure the address exists in the address manager.
|
|
s.addrManager.AddAddresses([]*addrmgr.NetAddress{remoteAddr},
|
|
remoteAddr)
|
|
|
|
err = s.addrManager.Attempt(remoteAddr)
|
|
if err != nil {
|
|
srvrLog.Errorf("Marking address as attempted failed: %v", err)
|
|
}
|
|
}
|
|
|
|
return dcrdDial(ctx, network, addr)
|
|
}
|
|
|
|
// AddRebroadcastInventory adds 'iv' to the list of inventories to be
|
|
// rebroadcasted at random intervals until they show up in a block.
|
|
func (s *server) AddRebroadcastInventory(iv *wire.InvVect, data interface{}) {
|
|
select {
|
|
case <-s.quit:
|
|
case s.modifyRebroadcastInv <- broadcastInventoryAdd{invVect: iv, data: data}:
|
|
}
|
|
}
|
|
|
|
// RemoveRebroadcastInventory removes 'iv' from the list of items to be
|
|
// rebroadcasted if present.
|
|
func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) {
|
|
select {
|
|
case <-s.quit:
|
|
case s.modifyRebroadcastInv <- broadcastInventoryDel(iv):
|
|
}
|
|
}
|
|
|
|
// PruneRebroadcastInventory filters and removes rebroadcast inventory entries
|
|
// where necessary.
|
|
func (s *server) PruneRebroadcastInventory() {
|
|
select {
|
|
case <-s.quit:
|
|
case s.modifyRebroadcastInv <- broadcastPruneInventory{}:
|
|
}
|
|
}
|
|
|
|
// relayTransactions generates and relays inventory vectors for all of the
|
|
// passed transactions to all connected peers.
|
|
func (s *server) relayTransactions(txns []*dcrutil.Tx) {
|
|
for _, tx := range txns {
|
|
iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
|
|
s.RelayInventory(iv, tx, false)
|
|
}
|
|
}
|
|
|
|
// AnnounceNewTransactions generates and relays inventory vectors and notifies
|
|
// websocket clients of the passed transactions. This function should be
|
|
// called whenever new transactions are added to the mempool.
|
|
func (s *server) AnnounceNewTransactions(txns []*dcrutil.Tx) {
|
|
// Generate and relay inventory vectors for all newly accepted
|
|
// transactions.
|
|
s.relayTransactions(txns)
|
|
|
|
// Notify websocket clients of all newly accepted transactions.
|
|
if s.rpcServer != nil {
|
|
s.rpcServer.NotifyNewTransactions(txns)
|
|
}
|
|
}
|
|
|
|
// TransactionConfirmed marks the provided single confirmation transaction as
|
|
// no longer needing rebroadcasting and keeps track of it for use when avoiding
|
|
// requests for recently confirmed transactions.
|
|
func (s *server) TransactionConfirmed(tx *dcrutil.Tx) {
|
|
txHash := tx.Hash()
|
|
s.recentlyConfirmedTxns.Add(txHash[:])
|
|
|
|
// Rebroadcasting is only necessary when the RPC server is active.
|
|
if s.rpcServer != nil {
|
|
iv := wire.NewInvVect(wire.InvTypeTx, txHash)
|
|
s.RemoveRebroadcastInventory(iv)
|
|
}
|
|
}
|
|
|
|
// handleAddPeerMsg deals with adding new peers. It is invoked from the
|
|
// peerHandler goroutine.
|
|
func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool {
|
|
if sp == nil {
|
|
return false
|
|
}
|
|
|
|
// Ignore new peers if we're shutting down.
|
|
if s.shutdown.Load() {
|
|
srvrLog.Infof("New peer %s ignored - server is shutting down", sp)
|
|
sp.Disconnect()
|
|
return false
|
|
}
|
|
|
|
// Disconnect banned peers.
|
|
host, _, err := net.SplitHostPort(sp.Addr())
|
|
if err != nil {
|
|
srvrLog.Debugf("can't split hostport %v", err)
|
|
sp.Disconnect()
|
|
return false
|
|
}
|
|
if banEnd, ok := state.banned[host]; ok {
|
|
if time.Now().Before(banEnd) {
|
|
srvrLog.Debugf("Peer %s is banned for another %v - disconnecting",
|
|
host, time.Until(banEnd))
|
|
sp.Disconnect()
|
|
return false
|
|
}
|
|
|
|
srvrLog.Infof("Peer %s is no longer banned", host)
|
|
delete(state.banned, host)
|
|
}
|
|
|
|
// Limit max number of connections from a single IP. However, allow
|
|
// whitelisted inbound peers and localhost connections regardless.
|
|
isInboundWhitelisted := sp.isWhitelisted && sp.Inbound()
|
|
peerIP := sp.NA().IP
|
|
if cfg.MaxSameIP > 0 && !isInboundWhitelisted && !peerIP.IsLoopback() &&
|
|
state.ConnectionsWithIP(peerIP)+1 > cfg.MaxSameIP {
|
|
srvrLog.Infof("Max connections with %s reached [%d] - "+
|
|
"disconnecting peer", sp, cfg.MaxSameIP)
|
|
sp.Disconnect()
|
|
return false
|
|
}
|
|
|
|
// Limit max number of total peers. However, allow whitelisted inbound
|
|
// peers regardless.
|
|
if state.Count()+1 > cfg.MaxPeers && !isInboundWhitelisted {
|
|
srvrLog.Infof("Max peers reached [%d] - disconnecting peer %s",
|
|
cfg.MaxPeers, sp)
|
|
sp.Disconnect()
|
|
// TODO: how to handle permanent peers here?
|
|
// they should be rescheduled.
|
|
return false
|
|
}
|
|
|
|
sp.peerNaMtx.Lock()
|
|
na := sp.peerNa
|
|
sp.peerNaMtx.Unlock()
|
|
|
|
// Add the new peer and start it.
|
|
srvrLog.Debugf("New peer %s", sp)
|
|
if sp.Inbound() {
|
|
state.inboundPeers[sp.ID()] = sp
|
|
|
|
if na != nil {
|
|
id := na.IP.String()
|
|
|
|
// Inbound peers can only corroborate existing address submissions.
|
|
if state.subCache.exists(id) {
|
|
err := state.subCache.incrementScore(id)
|
|
if err != nil {
|
|
srvrLog.Errorf("unable to increment submission score: %v", err)
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
remoteAddr := wireToAddrmgrNetAddress(sp.NA())
|
|
state.outboundGroups[remoteAddr.GroupKey()]++
|
|
if sp.persistent {
|
|
state.persistentPeers[sp.ID()] = sp
|
|
} else {
|
|
state.outboundPeers[sp.ID()] = sp
|
|
}
|
|
|
|
// Fetch the suggested public ip from the outbound peer if
|
|
// there are no prevailing conditions to disable automatic
|
|
// network address discovery.
|
|
//
|
|
// The conditions to disable automatic network address
|
|
// discovery are:
|
|
// - If there is a proxy set (--proxy, --onion).
|
|
// - If automatic network address discovery is explicitly
|
|
// disabled (--nodiscoverip).
|
|
// - If there is an external ip explicitly set (--externalip).
|
|
// - If listening has been disabled (--nolisten, listen
|
|
// disabled because of --connect, etc).
|
|
// - If Universal Plug and Play is enabled (--upnp).
|
|
// - If the active network is simnet or regnet.
|
|
if (cfg.Proxy != "" || cfg.OnionProxy != "") ||
|
|
cfg.NoDiscoverIP || len(cfg.ExternalIPs) > 0 ||
|
|
(cfg.DisableListen || len(cfg.Listeners) == 0) || cfg.Upnp ||
|
|
s.chainParams.Name == simNetParams.Name ||
|
|
s.chainParams.Name == regNetParams.Name {
|
|
return true
|
|
}
|
|
|
|
if na != nil {
|
|
net := addrmgr.IPv4Address
|
|
if na.IP.To4() == nil {
|
|
net = addrmgr.IPv6Address
|
|
}
|
|
|
|
localAddr := wireToAddrmgrNetAddress(na)
|
|
valid, reach := s.addrManager.ValidatePeerNa(localAddr, remoteAddr)
|
|
if !valid {
|
|
return true
|
|
}
|
|
|
|
id := na.IP.String()
|
|
if state.subCache.exists(id) {
|
|
// Increment the submission score if it already exists.
|
|
err := state.subCache.incrementScore(id)
|
|
if err != nil {
|
|
srvrLog.Errorf("unable to increment submission score: %v", err)
|
|
return true
|
|
}
|
|
} else {
|
|
// Create a cache entry for a new submission.
|
|
sub := &naSubmission{
|
|
na: na,
|
|
netType: net,
|
|
reach: reach,
|
|
}
|
|
|
|
err := state.subCache.add(sub)
|
|
if err != nil {
|
|
srvrLog.Errorf("unable to add submission: %v", err)
|
|
return true
|
|
}
|
|
}
|
|
|
|
// Pick the local address for the provided network based on
|
|
// submission scores.
|
|
state.ResolveLocalAddress(net, s.addrManager, s.services)
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// handleDonePeerMsg deals with peers that have signalled they are done. It is
|
|
// invoked from the peerHandler goroutine.
|
|
func (s *server) handleDonePeerMsg(state *peerState, sp *serverPeer) {
|
|
var list map[int32]*serverPeer
|
|
if sp.persistent {
|
|
list = state.persistentPeers
|
|
} else if sp.Inbound() {
|
|
list = state.inboundPeers
|
|
} else {
|
|
list = state.outboundPeers
|
|
}
|
|
if _, ok := list[sp.ID()]; ok {
|
|
if !sp.Inbound() && sp.VersionKnown() {
|
|
remoteAddr := wireToAddrmgrNetAddress(sp.NA())
|
|
state.outboundGroups[remoteAddr.GroupKey()]--
|
|
}
|
|
if !sp.Inbound() && sp.connReq != nil {
|
|
s.connManager.Disconnect(sp.connReq.ID())
|
|
}
|
|
delete(list, sp.ID())
|
|
srvrLog.Debugf("Removed peer %s", sp)
|
|
return
|
|
}
|
|
|
|
if sp.connReq != nil {
|
|
s.connManager.Disconnect(sp.connReq.ID())
|
|
}
|
|
|
|
// Update the address' last seen time if the peer has acknowledged
|
|
// our version and has sent us its version as well.
|
|
if sp.VerAckReceived() && sp.VersionKnown() && sp.NA() != nil {
|
|
remoteAddr := wireToAddrmgrNetAddress(sp.NA())
|
|
err := s.addrManager.Connected(remoteAddr)
|
|
if err != nil {
|
|
srvrLog.Errorf("Marking address as connected failed: %v", err)
|
|
}
|
|
}
|
|
|
|
// If we get here it means that either we didn't know about the peer
|
|
// or we purposefully deleted it.
|
|
}
|
|
|
|
// handleBanPeerMsg deals with banning peers. It is invoked from the
|
|
// peerHandler goroutine.
|
|
func (s *server) handleBanPeerMsg(state *peerState, sp *serverPeer) {
|
|
host, _, err := net.SplitHostPort(sp.Addr())
|
|
if err != nil {
|
|
srvrLog.Debugf("can't split ban peer %s %v", sp.Addr(), err)
|
|
return
|
|
}
|
|
direction := directionString(sp.Inbound())
|
|
srvrLog.Infof("Banned peer %s (%s) for %v", host, direction,
|
|
cfg.BanDuration)
|
|
state.banned[host] = time.Now().Add(cfg.BanDuration)
|
|
}
|
|
|
|
// handleRelayInvMsg deals with relaying inventory to peers that are not already
|
|
// known to have it. It is invoked from the peerHandler goroutine.
|
|
func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) {
|
|
state.forAllPeers(func(sp *serverPeer) {
|
|
if !sp.Connected() {
|
|
return
|
|
}
|
|
|
|
// Ignore peers that do not have the required service flags.
|
|
if !hasServices(sp.Services(), msg.reqServices) {
|
|
return
|
|
}
|
|
|
|
// Filter duplicate block announcements.
|
|
iv := msg.invVect
|
|
isBlockAnnouncement := iv.Type == wire.InvTypeBlock
|
|
if isBlockAnnouncement {
|
|
if sp.announcedBlock != nil && *sp.announcedBlock == iv.Hash {
|
|
sp.announcedBlock = nil
|
|
return
|
|
}
|
|
sp.announcedBlock = &iv.Hash
|
|
}
|
|
|
|
// Generate and send a headers message instead of an inventory message
|
|
// for block announcements when the peer prefers headers.
|
|
if isBlockAnnouncement && sp.WantsHeaders() {
|
|
blockHeader, ok := msg.data.(wire.BlockHeader)
|
|
if !ok {
|
|
peerLog.Warnf("Underlying data for headers" +
|
|
" is not a block header")
|
|
return
|
|
}
|
|
msgHeaders := wire.NewMsgHeaders()
|
|
if err := msgHeaders.AddBlockHeader(&blockHeader); err != nil {
|
|
peerLog.Errorf("Failed to add block"+
|
|
" header: %v", err)
|
|
return
|
|
}
|
|
sp.QueueMessage(msgHeaders, nil)
|
|
return
|
|
}
|
|
|
|
if iv.Type == wire.InvTypeTx {
|
|
// Don't relay the transaction to the peer when it has
|
|
// transaction relaying disabled.
|
|
if sp.relayTxDisabled() {
|
|
return
|
|
}
|
|
}
|
|
|
|
// Either queue the inventory to be relayed immediately or with
|
|
// the next batch depending on the immediate flag.
|
|
//
|
|
// It will be ignored in either case if the peer is already
|
|
// known to have the inventory.
|
|
if msg.immediate {
|
|
sp.QueueInventoryImmediate(iv)
|
|
} else {
|
|
sp.QueueInventory(iv)
|
|
}
|
|
})
|
|
}
|
|
|
|
// handleBroadcastMsg deals with broadcasting messages to peers. It is invoked
|
|
// from the peerHandler goroutine.
|
|
func (s *server) handleBroadcastMsg(state *peerState, bmsg *broadcastMsg) {
|
|
state.forAllPeers(func(sp *serverPeer) {
|
|
if !sp.Connected() {
|
|
return
|
|
}
|
|
|
|
for _, ep := range bmsg.excludePeers {
|
|
if sp == ep {
|
|
return
|
|
}
|
|
}
|
|
|
|
sp.QueueMessage(bmsg.message, nil)
|
|
})
|
|
}
|
|
|
|
type getConnCountMsg struct {
|
|
reply chan int32
|
|
}
|
|
|
|
type getPeersMsg struct {
|
|
reply chan []*serverPeer
|
|
}
|
|
|
|
type getOutboundGroup struct {
|
|
key string
|
|
reply chan int
|
|
}
|
|
|
|
type getAddedNodesMsg struct {
|
|
reply chan []*serverPeer
|
|
}
|
|
|
|
type disconnectNodeMsg struct {
|
|
cmp func(*serverPeer) bool
|
|
reply chan error
|
|
}
|
|
|
|
type connectNodeMsg struct {
|
|
addr string
|
|
permanent bool
|
|
reply chan error
|
|
}
|
|
|
|
type removeNodeMsg struct {
|
|
cmp func(*serverPeer) bool
|
|
reply chan error
|
|
}
|
|
|
|
type cancelPendingMsg struct {
|
|
addr string
|
|
reply chan error
|
|
}
|
|
|
|
// handleQuery is the central handler for all queries and commands from other
|
|
// goroutines related to peer state.
|
|
func (s *server) handleQuery(ctx context.Context, state *peerState, querymsg interface{}) {
|
|
switch msg := querymsg.(type) {
|
|
case getConnCountMsg:
|
|
nconnected := int32(0)
|
|
state.forAllPeers(func(sp *serverPeer) {
|
|
if sp.Connected() {
|
|
nconnected++
|
|
}
|
|
})
|
|
msg.reply <- nconnected
|
|
|
|
case getPeersMsg:
|
|
peers := make([]*serverPeer, 0, state.Count())
|
|
state.forAllPeers(func(sp *serverPeer) {
|
|
if !sp.Connected() {
|
|
return
|
|
}
|
|
peers = append(peers, sp)
|
|
})
|
|
msg.reply <- peers
|
|
|
|
case connectNodeMsg:
|
|
// XXX duplicate oneshots?
|
|
// Limit max number of total peers.
|
|
if state.Count() >= cfg.MaxPeers {
|
|
msg.reply <- errors.New("max peers reached")
|
|
return
|
|
}
|
|
err := s.connManager.ForEachConnReq(func(c *connmgr.ConnReq) error {
|
|
if c.Addr != nil && c.Addr.String() == msg.addr {
|
|
if c.Permanent {
|
|
return errors.New("peer exists as a permanent peer")
|
|
}
|
|
|
|
switch c.State() {
|
|
case connmgr.ConnPending:
|
|
return errors.New("peer pending connection")
|
|
case connmgr.ConnEstablished:
|
|
return errors.New("peer already connected")
|
|
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
msg.reply <- err
|
|
return
|
|
}
|
|
|
|
netAddr, err := addrStringToNetAddr(msg.addr)
|
|
if err != nil {
|
|
msg.reply <- err
|
|
return
|
|
}
|
|
|
|
// TODO: if too many, nuke a non-perm peer.
|
|
go s.connManager.Connect(ctx,
|
|
&connmgr.ConnReq{
|
|
Addr: netAddr,
|
|
Permanent: msg.permanent,
|
|
})
|
|
msg.reply <- nil
|
|
|
|
case removeNodeMsg:
|
|
found := disconnectPeer(state.persistentPeers, msg.cmp, func(sp *serverPeer) {
|
|
// Keep group counts ok since we remove from
|
|
// the list now.
|
|
remoteAddr := wireToAddrmgrNetAddress(sp.NA())
|
|
state.outboundGroups[remoteAddr.GroupKey()]--
|
|
|
|
peerLog.Debugf("Removing persistent peer %s (reqid %d)", remoteAddr,
|
|
sp.connReq.ID())
|
|
connReq := sp.connReq
|
|
|
|
// Mark the peer's connReq as nil to prevent it from scheduling a
|
|
// re-connect attempt.
|
|
sp.connReq = nil
|
|
s.connManager.Remove(connReq.ID())
|
|
})
|
|
|
|
if found {
|
|
msg.reply <- nil
|
|
} else {
|
|
msg.reply <- errors.New("peer not found")
|
|
}
|
|
|
|
case cancelPendingMsg:
|
|
netAddr, err := addrStringToNetAddr(msg.addr)
|
|
if err != nil {
|
|
msg.reply <- err
|
|
return
|
|
}
|
|
msg.reply <- s.connManager.CancelPending(netAddr)
|
|
|
|
case getOutboundGroup:
|
|
count, ok := state.outboundGroups[msg.key]
|
|
if ok {
|
|
msg.reply <- count
|
|
} else {
|
|
msg.reply <- 0
|
|
}
|
|
|
|
case getAddedNodesMsg:
|
|
// Respond with a slice of the relevant peers.
|
|
peers := make([]*serverPeer, 0, len(state.persistentPeers))
|
|
for _, sp := range state.persistentPeers {
|
|
peers = append(peers, sp)
|
|
}
|
|
msg.reply <- peers
|
|
|
|
case disconnectNodeMsg:
|
|
// Check inbound peers. We pass a nil callback since we don't
|
|
// require any additional actions on disconnect for inbound peers.
|
|
found := disconnectPeer(state.inboundPeers, msg.cmp, nil)
|
|
if found {
|
|
msg.reply <- nil
|
|
return
|
|
}
|
|
|
|
// Check outbound peers.
|
|
found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) {
|
|
// Keep group counts ok since we remove from
|
|
// the list now.
|
|
remoteAddr := wireToAddrmgrNetAddress(sp.NA())
|
|
state.outboundGroups[remoteAddr.GroupKey()]--
|
|
})
|
|
if found {
|
|
// If there are multiple outbound connections to the same
|
|
// ip:port, continue disconnecting them all until no such
|
|
// peers are found.
|
|
for found {
|
|
found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) {
|
|
remoteAddr := wireToAddrmgrNetAddress(sp.NA())
|
|
state.outboundGroups[remoteAddr.GroupKey()]--
|
|
})
|
|
}
|
|
msg.reply <- nil
|
|
return
|
|
}
|
|
|
|
msg.reply <- errors.New("peer not found")
|
|
}
|
|
}
|
|
|
|
// disconnectPeer attempts to drop the connection of a targeted peer in the
|
|
// passed peer list. Targets are identified via usage of the passed
|
|
// `compareFunc`, which should return `true` if the passed peer is the target
|
|
// peer. This function returns true on success and false if the peer is unable
|
|
// to be located. If the peer is found, and the passed callback: `whenFound'
|
|
// isn't nil, we call it with the peer as the argument before it is removed
|
|
// from the peerList, and is disconnected from the server.
|
|
func disconnectPeer(peerList map[int32]*serverPeer, compareFunc func(*serverPeer) bool, whenFound func(*serverPeer)) bool {
|
|
for addr, peer := range peerList {
|
|
if compareFunc(peer) {
|
|
if whenFound != nil {
|
|
whenFound(peer)
|
|
}
|
|
|
|
// This is ok because we are not continuing
|
|
// to iterate so won't corrupt the loop.
|
|
delete(peerList, addr)
|
|
peer.Disconnect()
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// newPeerConfig returns the configuration for the given serverPeer.
|
|
func newPeerConfig(sp *serverPeer) *peer.Config {
|
|
var userAgentComments []string
|
|
if version.PreRelease != "" {
|
|
userAgentComments = append(userAgentComments, version.PreRelease)
|
|
}
|
|
|
|
return &peer.Config{
|
|
Listeners: peer.MessageListeners{
|
|
OnVersion: sp.OnVersion,
|
|
OnVerAck: sp.OnVerAck,
|
|
OnMemPool: sp.OnMemPool,
|
|
OnGetMiningState: sp.OnGetMiningState,
|
|
OnMiningState: sp.OnMiningState,
|
|
OnGetInitState: sp.OnGetInitState,
|
|
OnInitState: sp.OnInitState,
|
|
OnTx: sp.OnTx,
|
|
OnBlock: sp.OnBlock,
|
|
OnInv: sp.OnInv,
|
|
OnHeaders: sp.OnHeaders,
|
|
OnGetData: sp.OnGetData,
|
|
OnGetBlocks: sp.OnGetBlocks,
|
|
OnGetHeaders: sp.OnGetHeaders,
|
|
OnGetCFilter: sp.OnGetCFilter,
|
|
OnGetCFilterV2: sp.OnGetCFilterV2,
|
|
OnGetCFHeaders: sp.OnGetCFHeaders,
|
|
OnGetCFTypes: sp.OnGetCFTypes,
|
|
OnGetAddr: sp.OnGetAddr,
|
|
OnAddr: sp.OnAddr,
|
|
OnRead: sp.OnRead,
|
|
OnWrite: sp.OnWrite,
|
|
OnNotFound: sp.OnNotFound,
|
|
},
|
|
NewestBlock: sp.newestBlock,
|
|
HostToNetAddress: func(host string, port uint16, services wire.ServiceFlag) (*wire.NetAddress, error) {
|
|
address, err := sp.server.addrManager.HostToNetAddress(host, port, services)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return addrmgrToWireNetAddress(address), nil
|
|
},
|
|
Proxy: cfg.Proxy,
|
|
UserAgentName: userAgentName,
|
|
UserAgentVersion: userAgentVersion,
|
|
UserAgentComments: userAgentComments,
|
|
Net: sp.server.chainParams.Net,
|
|
Services: sp.server.services,
|
|
DisableRelayTx: cfg.BlocksOnly,
|
|
ProtocolVersion: maxProtocolVersion,
|
|
IdleTimeout: cfg.PeerIdleTimeout,
|
|
}
|
|
}
|
|
|
|
// inboundPeerConnected is invoked by the connection manager when a new inbound
|
|
// connection is established. It initializes a new inbound server peer
|
|
// instance, associates it with the connection, and starts all additional server
|
|
// peer processing goroutines.
|
|
func (s *server) inboundPeerConnected(conn net.Conn) {
|
|
sp := newServerPeer(s, false)
|
|
sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
|
|
sp.Peer = peer.NewInboundPeer(newPeerConfig(sp))
|
|
sp.syncMgrPeer = netsync.NewPeer(sp.Peer)
|
|
sp.AssociateConnection(conn)
|
|
go sp.Run()
|
|
}
|
|
|
|
// outboundPeerConnected is invoked by the connection manager when a new
|
|
// outbound connection is established. It initializes a new outbound server
|
|
// peer instance, associates it with the relevant state such as the connection
|
|
// request instance and the connection itself, and start all additional server
|
|
// peer processing goroutines.
|
|
func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
|
|
sp := newServerPeer(s, c.Permanent)
|
|
p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String())
|
|
if err != nil {
|
|
srvrLog.Debugf("Cannot create outbound peer %s: %v", c.Addr, err)
|
|
s.connManager.Disconnect(c.ID())
|
|
return
|
|
}
|
|
sp.Peer = p
|
|
sp.syncMgrPeer = netsync.NewPeer(sp.Peer)
|
|
sp.connReq = c
|
|
sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
|
|
sp.AssociateConnection(conn)
|
|
go sp.Run()
|
|
}
|
|
|
|
// peerHandler is used to handle peer operations such as adding and removing
|
|
// peers to and from the server, banning peers, and broadcasting messages to
|
|
// peers. It must be run in a goroutine.
|
|
func (s *server) peerHandler(ctx context.Context) {
|
|
// Start the address manager which is needed by peers. This is done here
|
|
// since its lifecycle is closely tied to this handler and rather than
|
|
// adding more channels to synchronize things, it's easier and slightly
|
|
// faster to simply start and stop it in this handler.
|
|
s.addrManager.Start()
|
|
|
|
srvrLog.Tracef("Starting peer handler")
|
|
|
|
state := &peerState{
|
|
inboundPeers: make(map[int32]*serverPeer),
|
|
persistentPeers: make(map[int32]*serverPeer),
|
|
outboundPeers: make(map[int32]*serverPeer),
|
|
banned: make(map[string]time.Time),
|
|
outboundGroups: make(map[string]int),
|
|
subCache: &naSubmissionCache{
|
|
cache: make(map[string]*naSubmission, maxCachedNaSubmissions),
|
|
limit: maxCachedNaSubmissions,
|
|
},
|
|
}
|
|
|
|
out:
|
|
for {
|
|
select {
|
|
// New peers connected to the server.
|
|
case p := <-s.newPeers:
|
|
s.handleAddPeerMsg(state, p)
|
|
|
|
// Signal the net sync manager this peer is a new sync candidate
|
|
// unless it was disconnected above.
|
|
if p.Connected() {
|
|
s.syncManager.PeerConnected(p.syncMgrPeer)
|
|
}
|
|
|
|
// Disconnected peers.
|
|
case p := <-s.donePeers:
|
|
s.handleDonePeerMsg(state, p)
|
|
|
|
// Peer to ban.
|
|
case p := <-s.banPeers:
|
|
s.handleBanPeerMsg(state, p)
|
|
|
|
// New inventory to potentially be relayed to other peers.
|
|
case invMsg := <-s.relayInv:
|
|
s.handleRelayInvMsg(state, invMsg)
|
|
|
|
// Message to broadcast to all connected peers except those
|
|
// which are excluded by the message.
|
|
case bmsg := <-s.broadcast:
|
|
s.handleBroadcastMsg(state, &bmsg)
|
|
|
|
case qmsg := <-s.query:
|
|
s.handleQuery(ctx, state, qmsg)
|
|
|
|
case <-ctx.Done():
|
|
close(s.quit)
|
|
|
|
// Disconnect all peers on server shutdown.
|
|
state.forAllPeers(func(sp *serverPeer) {
|
|
srvrLog.Tracef("Shutdown peer %s", sp)
|
|
sp.Disconnect()
|
|
})
|
|
break out
|
|
}
|
|
}
|
|
|
|
s.addrManager.Stop()
|
|
srvrLog.Tracef("Peer handler done")
|
|
}
|
|
|
|
// AddPeer adds a new peer that has already been connected to the server.
|
|
func (s *server) AddPeer(sp *serverPeer) {
|
|
select {
|
|
case <-s.quit:
|
|
case s.newPeers <- sp:
|
|
}
|
|
}
|
|
|
|
// DonePeer removes a disconnected peer from the server.
|
|
func (s *server) DonePeer(sp *serverPeer) {
|
|
select {
|
|
case <-s.quit:
|
|
case s.donePeers <- sp:
|
|
}
|
|
}
|
|
|
|
// BanPeer bans a peer that has already been connected to the server by ip
|
|
// unless banning is disabled or the peer has been whitelisted.
|
|
func (s *server) BanPeer(sp *serverPeer) {
|
|
if cfg.DisableBanning || sp.isWhitelisted {
|
|
return
|
|
}
|
|
sp.Disconnect()
|
|
|
|
select {
|
|
case <-s.quit:
|
|
case s.banPeers <- sp:
|
|
}
|
|
}
|
|
|
|
// RelayInventory relays the passed inventory vector to all connected peers
|
|
// that are not already known to have it.
|
|
func (s *server) RelayInventory(invVect *wire.InvVect, data interface{}, immediate bool) {
|
|
select {
|
|
case <-s.quit:
|
|
case s.relayInv <- relayMsg{invVect: invVect, data: data, immediate: immediate}:
|
|
}
|
|
}
|
|
|
|
// RelayBlockAnnouncement creates a block announcement for the passed block and
|
|
// relays that announcement immediately to all connected peers that advertise
|
|
// the given required services and are not already known to have it.
|
|
func (s *server) RelayBlockAnnouncement(block *dcrutil.Block, reqServices wire.ServiceFlag) {
|
|
invVect := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
|
|
select {
|
|
case <-s.quit:
|
|
case s.relayInv <- relayMsg{
|
|
invVect: invVect,
|
|
data: block.MsgBlock().Header,
|
|
immediate: true,
|
|
reqServices: reqServices,
|
|
}:
|
|
}
|
|
}
|
|
|
|
// BroadcastMessage sends msg to all peers currently connected to the server
|
|
// except those in the passed peers to exclude.
|
|
func (s *server) BroadcastMessage(msg wire.Message, exclPeers ...*serverPeer) {
|
|
select {
|
|
case <-s.quit:
|
|
case s.broadcast <- broadcastMsg{message: msg, excludePeers: exclPeers}:
|
|
}
|
|
}
|
|
|
|
// ConnectedCount returns the number of currently connected peers.
|
|
func (s *server) ConnectedCount() int32 {
|
|
replyChan := make(chan int32)
|
|
select {
|
|
case <-s.quit:
|
|
return 0
|
|
case s.query <- getConnCountMsg{reply: replyChan}:
|
|
return <-replyChan
|
|
}
|
|
}
|
|
|
|
// OutboundGroupCount returns the number of peers connected to the given
|
|
// outbound group key.
|
|
func (s *server) OutboundGroupCount(key string) int {
|
|
replyChan := make(chan int)
|
|
select {
|
|
case <-s.quit:
|
|
return 0
|
|
case s.query <- getOutboundGroup{key: key, reply: replyChan}:
|
|
return <-replyChan
|
|
}
|
|
}
|
|
|
|
// AddedNodeInfo returns an array of dcrjson.GetAddedNodeInfoResult structures
|
|
// describing the persistent (added) nodes.
|
|
func (s *server) AddedNodeInfo() []*serverPeer {
|
|
replyChan := make(chan []*serverPeer)
|
|
select {
|
|
case <-s.quit:
|
|
return nil
|
|
case s.query <- getAddedNodesMsg{reply: replyChan}:
|
|
return <-replyChan
|
|
}
|
|
}
|
|
|
|
// AddBytesSent adds the passed number of bytes to the total bytes sent counter
|
|
// for the server. It is safe for concurrent access.
|
|
func (s *server) AddBytesSent(bytesSent uint64) {
|
|
s.bytesSent.Add(bytesSent)
|
|
}
|
|
|
|
// AddBytesReceived adds the passed number of bytes to the total bytes received
|
|
// counter for the server. It is safe for concurrent access.
|
|
func (s *server) AddBytesReceived(bytesReceived uint64) {
|
|
s.bytesReceived.Add(bytesReceived)
|
|
}
|
|
|
|
// NetTotals returns the sum of all bytes received and sent across the network
|
|
// for all peers. It is safe for concurrent access.
|
|
func (s *server) NetTotals() (uint64, uint64) {
|
|
return s.bytesReceived.Load(), s.bytesSent.Load()
|
|
}
|
|
|
|
// notifiedWinningTickets returns whether or not the winning tickets
|
|
// notification for the specified block hash has already been sent.
|
|
func (s *server) notifiedWinningTickets(hash *chainhash.Hash) bool {
|
|
s.lotteryDataBroadcastMtx.Lock()
|
|
_, beenNotified := s.lotteryDataBroadcast[*hash]
|
|
s.lotteryDataBroadcastMtx.Unlock()
|
|
return beenNotified
|
|
}
|
|
|
|
// headerApprovesParent returns whether or not the vote bits in the passed
|
|
// header indicate the regular transaction tree of the parent block should be
|
|
// considered valid.
|
|
func headerApprovesParent(header *wire.BlockHeader) bool {
|
|
return dcrutil.IsFlagSet16(header.VoteBits, dcrutil.BlockValid)
|
|
}
|
|
|
|
// proactivelyEvictSigCacheEntries fetches the block that is
|
|
// txscript.ProactiveEvictionDepth levels deep from bestHeight and passes it to
|
|
// SigCache to evict the entries associated with the transactions in that block.
|
|
func (s *server) proactivelyEvictSigCacheEntries(bestHeight int64) {
|
|
// Nothing to do before the eviction depth is reached.
|
|
if bestHeight <= txscript.ProactiveEvictionDepth {
|
|
return
|
|
}
|
|
|
|
evictHeight := bestHeight - txscript.ProactiveEvictionDepth
|
|
block, err := s.chain.BlockByHeight(evictHeight)
|
|
if err != nil {
|
|
srvrLog.Warnf("Failed to retrieve the block at height %d: %v",
|
|
evictHeight, err)
|
|
return
|
|
}
|
|
|
|
s.sigCache.EvictEntries(block.MsgBlock())
|
|
}
|
|
|
|
// handleBlockchainNotification handles notifications from blockchain. It does
|
|
// things such as request orphan block parents and relay accepted blocks to
|
|
// connected peers.
|
|
func (s *server) handleBlockchainNotification(notification *blockchain.Notification) {
|
|
switch notification.Type {
|
|
// A block that intends to extend the main chain has passed all sanity and
|
|
// contextual checks and the chain is believed to be current. Relay it to
|
|
// other peers.
|
|
case blockchain.NTNewTipBlockChecked:
|
|
// WARNING: The chain lock is not released before sending this
|
|
// notification, so care must be taken to avoid calling chain functions
|
|
// which could result in a deadlock.
|
|
block, ok := notification.Data.(*dcrutil.Block)
|
|
if !ok {
|
|
syncLog.Warnf("New tip block checked notification is not a block.")
|
|
break
|
|
}
|
|
|
|
// Relay the block announcement immediately to full nodes.
|
|
s.RelayBlockAnnouncement(block, wire.SFNodeNetwork)
|
|
|
|
// A block has been accepted into the block chain. Relay it to other peers
|
|
// (will be ignored if already relayed via NTNewTipBlockChecked) and
|
|
// possibly notify RPC clients with the winning tickets.
|
|
case blockchain.NTBlockAccepted:
|
|
// Don't relay or notify RPC clients with winning tickets if we are not
|
|
// current and unsynced mining is not allowed. Other peers that are
|
|
// current should already know about it and clients, such as wallets,
|
|
// shouldn't be voting on old blocks.
|
|
if !cfg.AllowUnsyncedMining && !s.syncManager.IsCurrent() {
|
|
return
|
|
}
|
|
|
|
band, ok := notification.Data.(*blockchain.BlockAcceptedNtfnsData)
|
|
if !ok {
|
|
syncLog.Warnf("Chain accepted notification is not " +
|
|
"BlockAcceptedNtfnsData.")
|
|
break
|
|
}
|
|
block := band.Block
|
|
|
|
// Send a winning tickets notification as needed. The notification will
|
|
// only be sent when the following conditions hold:
|
|
//
|
|
// - The RPC server is running
|
|
// - The block that would build on this one is at or after the height
|
|
// voting begins
|
|
// - The block that would build on this one would not cause a reorg
|
|
// larger than the max reorg notify depth
|
|
// - A notification for this block has not already been sent
|
|
//
|
|
// To help visualize the math here, consider the following two competing
|
|
// branches:
|
|
//
|
|
// 100 -> 101 -> 102 -> 103 -> 104 -> 105 -> 106
|
|
// \-> 101' -> 102'
|
|
//
|
|
// Further, assume that this is a notification for block 103', or in
|
|
// other words, it is extending the shorter side chain. The reorg depth
|
|
// would be 106 - (103 - 3) = 6. This should intuitively make sense,
|
|
// because if the side chain were to be extended enough to become the
|
|
// best chain, it would result in a reorg that would remove 6 blocks,
|
|
// namely blocks 101, 102, 103, 104, 105, and 106.
|
|
//
|
|
// Additionally, a notification will NOT be sent for mainnet once block
|
|
// height 777240 has been reached and the block version is prior to 10.
|
|
// The intent is for future code to perform this type of check more
|
|
// dynamically so it happens for all upgrades after a certain time frame
|
|
// is provided for upgrades to occur, but it is hard coded for now in
|
|
// the interest of time to allow PoS to force PoW to upgrade.
|
|
blockHash := block.Hash()
|
|
bestHeight := band.BestHeight
|
|
blockHeader := &block.MsgBlock().Header
|
|
blockHeight := int64(blockHeader.Height)
|
|
reorgDepth := bestHeight - (blockHeight - band.ForkLen)
|
|
isOldMainnetBlock := s.chainParams.Net == wire.MainNet &&
|
|
blockHeight >= 777240 && blockHeader.Version < 10
|
|
if s.rpcServer != nil &&
|
|
blockHeight >= s.chainParams.StakeValidationHeight-1 &&
|
|
reorgDepth < maxReorgDepthNotify &&
|
|
!isOldMainnetBlock &&
|
|
!s.notifiedWinningTickets(blockHash) {
|
|
|
|
// Obtain the winning tickets for this block. handleNotifyMsg
|
|
// should be safe for concurrent access of things contained within
|
|
// blockchain.
|
|
wt, _, _, err := s.chain.LotteryDataForBlock(blockHash)
|
|
if err != nil {
|
|
syncLog.Errorf("Couldn't calculate winning tickets for "+
|
|
"accepted block %v: %v", blockHash, err.Error())
|
|
} else {
|
|
// Notify registered websocket clients of newly eligible tickets
|
|
// to vote on.
|
|
s.rpcServer.NotifyWinningTickets(&rpcserver.WinningTicketsNtfnData{
|
|
BlockHash: *blockHash,
|
|
BlockHeight: blockHeight,
|
|
Tickets: wt,
|
|
})
|
|
|
|
s.lotteryDataBroadcastMtx.Lock()
|
|
s.lotteryDataBroadcast[*blockHash] = struct{}{}
|
|
s.lotteryDataBroadcastMtx.Unlock()
|
|
}
|
|
}
|
|
|
|
// Relay the block announcement immediately to all peers that were not
|
|
// already notified via NTNewTipBlockChecked.
|
|
const noRequiredServices = 0
|
|
s.RelayBlockAnnouncement(block, noRequiredServices)
|
|
|
|
// Inform the background block template generator about the accepted
|
|
// block.
|
|
if s.bg != nil {
|
|
s.bg.BlockAccepted(block)
|
|
}
|
|
|
|
if !s.feeEstimator.IsEnabled() {
|
|
// fee estimation can only start after we have performed an initial
|
|
// sync, otherwise we'll start adding mempool transactions at the
|
|
// wrong height.
|
|
s.feeEstimator.Enable(block.Height())
|
|
}
|
|
|
|
// A block has been connected to the main block chain.
|
|
case blockchain.NTBlockConnected:
|
|
ntfn, ok := notification.Data.(*blockchain.BlockConnectedNtfnsData)
|
|
if !ok {
|
|
syncLog.Warnf("Block connected notification is not " +
|
|
"BlockConnectedNtfnsData")
|
|
break
|
|
}
|
|
block := ntfn.Block
|
|
parentBlock := ntfn.ParentBlock
|
|
|
|
// Determine active agendas based on flags.
|
|
isTreasuryEnabled := ntfn.CheckTxFlags.IsTreasuryEnabled()
|
|
|
|
// Account for transactions mined in the newly connected block for fee
|
|
// estimation. This must be done before attempting to remove
|
|
// transactions from the mempool because the mempool will alert the
|
|
// estimator of the txs that are leaving
|
|
s.feeEstimator.ProcessBlock(block)
|
|
|
|
// TODO: In the case the new tip disapproves the previous block, any
|
|
// transactions the previous block contains in its regular tree which
|
|
// double spend the same inputs as transactions in either tree of the
|
|
// current tip should ideally be tracked in the pool as eligible for
|
|
// inclusion in an alternative tip (side chain block) in case the
|
|
// current tip block does not get enough votes. However, the
|
|
// transaction pool currently does not provide any way to distinguish
|
|
// this condition and thus only provides tracking based on the current
|
|
// tip. In order to handle this condition, the pool would have to
|
|
// provide a way to track and independently query which txns are
|
|
// eligible based on the current tip both approving and disapproving the
|
|
// previous block as well as the previous block itself.
|
|
|
|
// Remove all of the regular and stake transactions in the connected
|
|
// block from the transaction pool. Also, remove any transactions which
|
|
// are now double spends as a result of these new transactions.
|
|
// Finally, remove any transaction that is no longer an orphan.
|
|
// Transactions which depend on a confirmed transaction are NOT removed
|
|
// recursively because they are still valid. Also, the coinbase of the
|
|
// regular tx tree is skipped because the transaction pool doesn't (and
|
|
// can't) have regular tree coinbase transactions in it.
|
|
//
|
|
// Also, in the case the RPC server is enabled, stop rebroadcasting any
|
|
// transactions in the block that were setup to be rebroadcast.
|
|
txMemPool := s.txMemPool
|
|
handleConnectedBlockTxns := func(txns []*dcrutil.Tx) {
|
|
for _, tx := range txns {
|
|
txMemPool.RemoveTransaction(tx, false)
|
|
txMemPool.MaybeAcceptDependents(tx, isTreasuryEnabled)
|
|
txMemPool.RemoveDoubleSpends(tx)
|
|
txMemPool.RemoveOrphan(tx)
|
|
acceptedTxs := txMemPool.ProcessOrphans(tx, ntfn.CheckTxFlags)
|
|
s.AnnounceNewTransactions(acceptedTxs)
|
|
|
|
// Now that this block is in the blockchain, mark the
|
|
// transaction (except the coinbase) as no longer needing
|
|
// rebroadcasting and keep track of it for use when avoiding
|
|
// requests for recently confirmed transactions.
|
|
s.TransactionConfirmed(tx)
|
|
}
|
|
}
|
|
|
|
// Add regular transactions back to the mempool, excluding the coinbase
|
|
// since it does not belong in the mempool.
|
|
handleConnectedBlockTxns(block.Transactions()[1:])
|
|
if isTreasuryEnabled {
|
|
// Skip treasurybase
|
|
handleConnectedBlockTxns(block.STransactions()[1:])
|
|
} else {
|
|
handleConnectedBlockTxns(block.STransactions())
|
|
}
|
|
|
|
// In the case the regular tree of the previous block was disapproved,
|
|
// add all of the its transactions, with the exception of the coinbase,
|
|
// back to the transaction pool to be mined in a future block.
|
|
//
|
|
// Notice that some of those transactions might have been included in
|
|
// the current block and others might also be spending some of the same
|
|
// outputs that transactions in the previous originally block spent.
|
|
// This is the expected behavior because disapproval of the regular tree
|
|
// of the previous block essentially makes it as if those transactions
|
|
// never happened.
|
|
//
|
|
// Finally, if transactions fail to add to the pool for some reason
|
|
// other than the pool already having it (a duplicate) or now being a
|
|
// double spend, remove all transactions that depend on it as well.
|
|
// The dependents are not removed for double spends because the only
|
|
// way a transaction which was not a double spend in the previous block
|
|
// to now be one is due to some transaction in the current block
|
|
// (probably the same one) also spending those outputs, and, in that
|
|
// case, anything that happens to be in the pool which depends on the
|
|
// transaction is still valid.
|
|
if !headerApprovesParent(&block.MsgBlock().Header) {
|
|
txns := parentBlock.Transactions()[1:]
|
|
txMemPool.MaybeAcceptTransactions(txns)
|
|
}
|
|
if r := s.rpcServer; r != nil {
|
|
// Filter and update the rebroadcast inventory.
|
|
s.PruneRebroadcastInventory()
|
|
|
|
// Notify registered websocket clients of incoming block.
|
|
r.NotifyBlockConnected(block)
|
|
}
|
|
|
|
if s.bg != nil {
|
|
s.bg.BlockConnected(block)
|
|
}
|
|
|
|
// Notify subscribed indexes of connected block.
|
|
if s.indexSubscriber != nil {
|
|
s.indexSubscriber.Notify(&indexers.IndexNtfn{
|
|
NtfnType: indexers.ConnectNtfn,
|
|
Block: block,
|
|
Parent: parentBlock,
|
|
IsTreasuryEnabled: isTreasuryEnabled,
|
|
})
|
|
}
|
|
|
|
// Proactively evict signature cache entries that are virtually
|
|
// guaranteed to no longer be useful.
|
|
s.proactivelyEvictSigCacheEntries(block.Height())
|
|
|
|
// Stake tickets are matured from the most recently connected block.
|
|
case blockchain.NTNewTickets:
|
|
// WARNING: The chain lock is not released before sending this
|
|
// notification, so care must be taken to avoid calling chain functions
|
|
// which could result in a deadlock.
|
|
tnd, ok := notification.Data.(*blockchain.TicketNotificationsData)
|
|
if !ok {
|
|
syncLog.Warnf("Tickets connected notification is not " +
|
|
"TicketNotificationsData")
|
|
break
|
|
}
|
|
|
|
if r := s.rpcServer; r != nil {
|
|
r.NotifyNewTickets(tnd)
|
|
}
|
|
|
|
// A block has been disconnected from the main block chain.
|
|
case blockchain.NTBlockDisconnected:
|
|
// NOTE: The chain lock is released for this notification.
|
|
ntfn, ok := notification.Data.(*blockchain.BlockDisconnectedNtfnsData)
|
|
if !ok {
|
|
syncLog.Warnf("Block disconnected notification is not " +
|
|
"BlockDisconnectedNtfnsData.")
|
|
break
|
|
}
|
|
block := ntfn.Block
|
|
parentBlock := ntfn.ParentBlock
|
|
|
|
// Determine active agendas based on flags.
|
|
isTreasuryEnabled := ntfn.CheckTxFlags.IsTreasuryEnabled()
|
|
|
|
// In the case the regular tree of the previous block was disapproved,
|
|
// disconnecting the current block makes all of those transactions valid
|
|
// again. Thus, with the exception of the coinbase, remove all of those
|
|
// transactions and any that are now double spends from the transaction
|
|
// pool. Transactions which depend on a confirmed transaction are NOT
|
|
// removed recursively because they are still valid.
|
|
txMemPool := s.txMemPool
|
|
if !headerApprovesParent(&block.MsgBlock().Header) {
|
|
for _, tx := range parentBlock.Transactions()[1:] {
|
|
txMemPool.RemoveTransaction(tx, false)
|
|
txMemPool.MaybeAcceptDependents(tx, isTreasuryEnabled)
|
|
txMemPool.RemoveDoubleSpends(tx)
|
|
txMemPool.RemoveOrphan(tx)
|
|
txMemPool.ProcessOrphans(tx, ntfn.CheckTxFlags)
|
|
}
|
|
}
|
|
|
|
// Add all of the regular and stake transactions in the disconnected
|
|
// block, with the exception of the regular tree coinbase, back to the
|
|
// transaction pool to be mined in a future block.
|
|
//
|
|
// Notice that, in the case the previous block was disapproved, some of
|
|
// the transactions in the block being disconnected might have been
|
|
// included in the previous block and others might also have been
|
|
// spending some of the same outputs. This is the expected behavior
|
|
// because disapproval of the regular tree of the previous block
|
|
// essentially makes it as if those transactions never happened, so
|
|
// disconnecting the block that disapproved those transactions
|
|
// effectively revives them.
|
|
//
|
|
// Finally, if transactions fail to add to the pool for some reason
|
|
// other than the pool already having it (a duplicate) or now being a
|
|
// double spend, remove all transactions that depend on it as well.
|
|
// The dependents are not removed for double spends because the only
|
|
// way a transaction which was not a double spend in the block being
|
|
// disconnected to now be one is due to some transaction in the previous
|
|
// block (probably the same one), which was disapproved, also spending
|
|
// those outputs, and, in that case, anything that happens to be in the
|
|
// pool which depends on the transaction is still valid.
|
|
handleDisconnectedBlockTxns := func(txns []*dcrutil.Tx) {
|
|
txMemPool.MaybeAcceptTransactions(txns)
|
|
}
|
|
handleDisconnectedBlockTxns(block.Transactions()[1:])
|
|
|
|
if isTreasuryEnabled {
|
|
// Skip treasurybase
|
|
handleDisconnectedBlockTxns(block.STransactions()[1:])
|
|
} else {
|
|
handleDisconnectedBlockTxns(block.STransactions())
|
|
}
|
|
|
|
if s.bg != nil {
|
|
s.bg.BlockDisconnected(block)
|
|
}
|
|
|
|
// Notify subscribed indexes of disconnected block.
|
|
if s.indexSubscriber != nil {
|
|
s.indexSubscriber.Notify(&indexers.IndexNtfn{
|
|
NtfnType: indexers.DisconnectNtfn,
|
|
Block: block,
|
|
Parent: parentBlock,
|
|
IsTreasuryEnabled: isTreasuryEnabled,
|
|
})
|
|
}
|
|
|
|
// Notify registered websocket clients.
|
|
if r := s.rpcServer; r != nil {
|
|
// Filter and update the rebroadcast inventory.
|
|
s.PruneRebroadcastInventory()
|
|
|
|
// Notify registered websocket clients.
|
|
r.NotifyBlockDisconnected(block)
|
|
}
|
|
|
|
// Chain reorganization has commenced.
|
|
case blockchain.NTChainReorgStarted:
|
|
// WARNING: The chain lock is not released before sending this
|
|
// notification, so care must be taken to avoid calling chain functions
|
|
// which could result in a deadlock.
|
|
if s.bg != nil {
|
|
s.bg.ChainReorgStarted()
|
|
}
|
|
|
|
// Chain reorganization has concluded.
|
|
case blockchain.NTChainReorgDone:
|
|
// WARNING: The chain lock is not released before sending this
|
|
// notification, so care must be taken to avoid calling chain functions
|
|
// which could result in a deadlock.
|
|
if s.bg != nil {
|
|
s.bg.ChainReorgDone()
|
|
}
|
|
|
|
// The blockchain is reorganizing.
|
|
case blockchain.NTReorganization:
|
|
// WARNING: The chain lock is not released before sending this
|
|
// notification, so care must be taken to avoid calling chain functions
|
|
// which could result in a deadlock.
|
|
rd, ok := notification.Data.(*blockchain.ReorganizationNtfnsData)
|
|
if !ok {
|
|
syncLog.Warnf("Chain reorganization notification is malformed")
|
|
break
|
|
}
|
|
|
|
// Notify registered websocket clients.
|
|
if r := s.rpcServer; r != nil {
|
|
r.NotifyReorganization(rd)
|
|
}
|
|
}
|
|
}
|
|
|
|
// rebroadcastHandler keeps track of user submitted inventories that we have
|
|
// sent out but have not yet made it into a block. We periodically rebroadcast
|
|
// them in case our peers restarted or otherwise lost track of them.
|
|
func (s *server) rebroadcastHandler(ctx context.Context) {
|
|
// Wait 5 min before first tx rebroadcast.
|
|
timer := time.NewTimer(5 * time.Minute)
|
|
pendingInvs := make(map[wire.InvVect]interface{})
|
|
|
|
for {
|
|
select {
|
|
case riv := <-s.modifyRebroadcastInv:
|
|
switch msg := riv.(type) {
|
|
|
|
// Incoming InvVects are added to our map of RPC txs.
|
|
case broadcastInventoryAdd:
|
|
pendingInvs[*msg.invVect] = msg.data
|
|
|
|
// When an InvVect has been added to a block, we can
|
|
// now remove it, if it was present.
|
|
case broadcastInventoryDel:
|
|
delete(pendingInvs, *msg)
|
|
|
|
case broadcastPruneInventory:
|
|
best := s.chain.BestSnapshot()
|
|
for iv, data := range pendingInvs {
|
|
tx, ok := data.(*dcrutil.Tx)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
txType := stake.DetermineTxType(tx.MsgTx())
|
|
|
|
// Remove the ticket rebroadcast if the amount not equal to
|
|
// the current stake difficulty.
|
|
if txType == stake.TxTypeSStx &&
|
|
tx.MsgTx().TxOut[0].Value != best.NextStakeDiff {
|
|
delete(pendingInvs, iv)
|
|
srvrLog.Debugf("Pending ticket purchase broadcast "+
|
|
"inventory for tx %v removed. Ticket value not "+
|
|
"equal to stake difficulty.", tx.Hash())
|
|
continue
|
|
}
|
|
|
|
// Remove the ticket rebroadcast if it has already expired.
|
|
if txType == stake.TxTypeSStx &&
|
|
blockchain.IsExpired(tx, best.Height) {
|
|
delete(pendingInvs, iv)
|
|
srvrLog.Debugf("Pending ticket purchase broadcast "+
|
|
"inventory for tx %v removed. Transaction "+
|
|
"expired.", tx.Hash())
|
|
continue
|
|
}
|
|
|
|
// Remove the revocation rebroadcast if the associated
|
|
// ticket has been revived.
|
|
if txType == stake.TxTypeSSRtx {
|
|
refSStxHash := tx.MsgTx().TxIn[0].PreviousOutPoint.Hash
|
|
if !s.chain.CheckLiveTicket(refSStxHash) {
|
|
delete(pendingInvs, iv)
|
|
srvrLog.Debugf("Pending revocation broadcast "+
|
|
"inventory for tx %v removed. "+
|
|
"Associated ticket was revived.", tx.Hash())
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
case <-timer.C:
|
|
// Any inventory we have has not made it into a block
|
|
// yet. We periodically resubmit them until they have.
|
|
for iv, data := range pendingInvs {
|
|
ivCopy := iv
|
|
s.RelayInventory(&ivCopy, data, false)
|
|
}
|
|
|
|
// Process at a random time up to 30mins (in seconds)
|
|
// in the future.
|
|
timer.Reset(time.Second *
|
|
time.Duration(randomUint16Number(1800)))
|
|
|
|
case <-ctx.Done():
|
|
timer.Stop()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// querySeeders queries the configured seeders to discover peers that supported
|
|
// the required services and adds the discovered peers to the address manager.
|
|
// Each seeder is contacted in a separate goroutine.
|
|
func (s *server) querySeeders(ctx context.Context) {
|
|
// Add peers discovered through DNS to the address manager.
|
|
seeders := s.chainParams.Seeders()
|
|
for _, seeder := range seeders {
|
|
go func(seeder string) {
|
|
ctx, cancel := context.WithTimeout(ctx, time.Minute)
|
|
defer cancel()
|
|
|
|
addrs, err := connmgr.SeedAddrs(ctx, seeder, dcrdDial,
|
|
connmgr.SeedFilterServices(defaultRequiredServices))
|
|
if err != nil {
|
|
srvrLog.Infof("seeder '%s' error: %v", seeder, err)
|
|
return
|
|
}
|
|
|
|
// Nothing to do if the seeder didn't return any addresses.
|
|
if len(addrs) == 0 {
|
|
return
|
|
}
|
|
|
|
// Lookup the IP of the https seeder to use as the source of the
|
|
// seeded addresses. In the incredibly rare event that the lookup
|
|
// fails after it just succeeded, fall back to using the first
|
|
// returned address as the source.
|
|
srcAddr := wireToAddrmgrNetAddress(addrs[0])
|
|
srcIPs, err := dcrdLookup(seeder)
|
|
if err == nil && len(srcIPs) > 0 {
|
|
const httpsPort = 443
|
|
srcAddr = addrmgr.NewNetAddressIPPort(srcIPs[0], httpsPort, 0)
|
|
}
|
|
addresses := wireToAddrmgrNetAddresses(addrs)
|
|
s.addrManager.AddAddresses(addresses, srcAddr)
|
|
}(seeder)
|
|
}
|
|
}
|
|
|
|
// Run starts the server and blocks until the provided context is cancelled.
|
|
// This entails accepting connections from peers.
|
|
func (s *server) Run(ctx context.Context) {
|
|
srvrLog.Trace("Starting server")
|
|
|
|
// Start the peer handler which in turn starts the address manager.
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
s.peerHandler(ctx)
|
|
wg.Done()
|
|
}()
|
|
|
|
// Start the sync manager.
|
|
wg.Add(1)
|
|
go func() {
|
|
s.syncManager.Run(ctx)
|
|
wg.Done()
|
|
}()
|
|
|
|
// Query the seeders and start the connection manager.
|
|
wg.Add(1)
|
|
go func() {
|
|
if !cfg.DisableSeeders {
|
|
s.querySeeders(ctx)
|
|
}
|
|
s.connManager.Run(ctx)
|
|
wg.Done()
|
|
}()
|
|
|
|
if s.nat != nil {
|
|
wg.Add(1)
|
|
go func() {
|
|
s.upnpUpdateThread(ctx)
|
|
wg.Done()
|
|
}()
|
|
}
|
|
|
|
if !cfg.DisableRPC {
|
|
// Start the RPC server and rebroadcast handler which ensures
|
|
// transactions submitted to the RPC server are rebroadcast until being
|
|
// included in a block.
|
|
wg.Add(2)
|
|
go func() {
|
|
s.rebroadcastHandler(ctx)
|
|
wg.Done()
|
|
}()
|
|
go func() {
|
|
s.rpcServer.Run(ctx)
|
|
wg.Done()
|
|
}()
|
|
}
|
|
|
|
// Start the background block template generator and CPU miner if the config
|
|
// provides a mining address.
|
|
if len(cfg.miningAddrs) > 0 {
|
|
wg.Add(2)
|
|
go func() {
|
|
s.bg.Run(ctx)
|
|
wg.Done()
|
|
}()
|
|
go func() {
|
|
s.cpuMiner.Run(ctx)
|
|
wg.Done()
|
|
}()
|
|
|
|
// The CPU miner is started without any workers which means it is idle.
|
|
// Start mining by setting the default number of workers when requested.
|
|
if cfg.Generate {
|
|
s.cpuMiner.SetNumWorkers(-1)
|
|
}
|
|
}
|
|
|
|
// Start the chain's index subscriber.
|
|
wg.Add(1)
|
|
go func() {
|
|
s.indexSubscriber.Run(ctx)
|
|
wg.Done()
|
|
}()
|
|
|
|
// Shutdown the server when the context is cancelled.
|
|
<-ctx.Done()
|
|
s.shutdown.Store(true)
|
|
|
|
srvrLog.Warnf("Server shutting down")
|
|
s.feeEstimator.Close()
|
|
s.chain.ShutdownUtxoCache()
|
|
wg.Wait()
|
|
srvrLog.Trace("Server stopped")
|
|
}
|
|
|
|
// parseListeners determines whether each listen address is IPv4 and IPv6 and
|
|
// returns a slice of appropriate net.Addrs to listen on with TCP. It also
|
|
// properly detects addresses which apply to "all interfaces" and adds the
|
|
// address as both IPv4 and IPv6.
|
|
func parseListeners(addrs []string) ([]net.Addr, error) {
|
|
netAddrs := make([]net.Addr, 0, len(addrs)*2)
|
|
for _, addr := range addrs {
|
|
host, _, err := net.SplitHostPort(addr)
|
|
if err != nil {
|
|
// Shouldn't happen due to already being normalized.
|
|
return nil, err
|
|
}
|
|
|
|
// Empty host or host of * on plan9 is both IPv4 and IPv6.
|
|
if host == "" || (host == "*" && runtime.GOOS == "plan9") {
|
|
netAddrs = append(netAddrs, simpleAddr{net: "tcp4", addr: addr})
|
|
netAddrs = append(netAddrs, simpleAddr{net: "tcp6", addr: addr})
|
|
continue
|
|
}
|
|
|
|
// Strip IPv6 zone id if present since net.ParseIP does not
|
|
// handle it.
|
|
zoneIndex := strings.LastIndex(host, "%")
|
|
if zoneIndex > 0 {
|
|
host = host[:zoneIndex]
|
|
}
|
|
|
|
// Parse the IP.
|
|
ip := net.ParseIP(host)
|
|
if ip == nil {
|
|
return nil, fmt.Errorf("'%s' is not a valid IP address", host)
|
|
}
|
|
|
|
// To4 returns nil when the IP is not an IPv4 address, so use
|
|
// this determine the address type.
|
|
if ip.To4() == nil {
|
|
netAddrs = append(netAddrs, simpleAddr{net: "tcp6", addr: addr})
|
|
} else {
|
|
netAddrs = append(netAddrs, simpleAddr{net: "tcp4", addr: addr})
|
|
}
|
|
}
|
|
return netAddrs, nil
|
|
}
|
|
|
|
func (s *server) upnpUpdateThread(ctx context.Context) {
|
|
// Go off immediately to prevent code duplication, thereafter we renew
|
|
// lease every 15 minutes.
|
|
timer := time.NewTimer(0 * time.Second)
|
|
lport, _ := strconv.ParseInt(s.chainParams.DefaultPort, 10, 16)
|
|
|
|
first := true
|
|
out:
|
|
for {
|
|
select {
|
|
case <-timer.C:
|
|
// TODO: pick external port more cleverly
|
|
// TODO: know which ports we are listening to on an external net.
|
|
// TODO: if specific listen port doesn't work then ask for wildcard
|
|
// listen port?
|
|
// XXX this assumes timeout is in seconds.
|
|
listenPort, err := s.nat.AddPortMapping("tcp", int(lport), int(lport),
|
|
"dcrd listen port", 20*60)
|
|
if err != nil {
|
|
srvrLog.Warnf("can't add UPnP port mapping: %v", err)
|
|
}
|
|
if first && err == nil {
|
|
// TODO: look this up periodically to see if upnp domain changed
|
|
// and so did ip.
|
|
externalip, err := s.nat.GetExternalAddress()
|
|
if err != nil {
|
|
srvrLog.Warnf("UPnP can't get external address: %v", err)
|
|
continue out
|
|
}
|
|
localAddr := addrmgr.NewNetAddressIPPort(externalip,
|
|
uint16(listenPort), s.services)
|
|
err = s.addrManager.AddLocalAddress(localAddr, addrmgr.UpnpPrio)
|
|
if err != nil {
|
|
srvrLog.Warnf("Failed to add UPnP local address %s: %v",
|
|
localAddr, err)
|
|
} else {
|
|
srvrLog.Warnf("Successfully bound via UPnP to %s",
|
|
localAddr)
|
|
first = false
|
|
}
|
|
}
|
|
timer.Reset(time.Minute * 15)
|
|
|
|
case <-ctx.Done():
|
|
break out
|
|
}
|
|
}
|
|
|
|
timer.Stop()
|
|
|
|
err := s.nat.DeletePortMapping("tcp", int(lport), int(lport))
|
|
if err != nil {
|
|
srvrLog.Warnf("unable to remove UPnP port mapping: %v", err)
|
|
} else {
|
|
srvrLog.Debugf("successfully disestablished UPnP port mapping")
|
|
}
|
|
}
|
|
|
|
// standardScriptVerifyFlags returns the script flags that should be used when
|
|
// executing transaction scripts to enforce additional checks which are required
|
|
// for the script to be considered standard. Note these flags are different
|
|
// than what is required for the consensus rules in that they are more strict.
|
|
func standardScriptVerifyFlags(chain *blockchain.BlockChain) (txscript.ScriptFlags, error) {
|
|
scriptFlags := mempool.BaseStandardVerifyFlags
|
|
|
|
// Enable validation of OP_SHA256 when the associated agenda is active.
|
|
tipHash := &chain.BestSnapshot().Hash
|
|
isActive, err := chain.IsLNFeaturesAgendaActive(tipHash)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if isActive {
|
|
scriptFlags |= txscript.ScriptVerifySHA256
|
|
}
|
|
|
|
// Enable validation of treasury-related opcodes when the associated agenda
|
|
// is active.
|
|
isActive, err = chain.IsTreasuryAgendaActive(tipHash)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if isActive {
|
|
scriptFlags |= txscript.ScriptVerifyTreasury
|
|
}
|
|
|
|
return scriptFlags, nil
|
|
}
|
|
|
|
// genCertPair generates a key/cert pair to the paths provided.
|
|
func genCertPair(certFile, keyFile string, altDNSNames []string, tlsCurve elliptic.Curve) error {
|
|
rpcsLog.Infof("Generating TLS certificates...")
|
|
|
|
org := "dcrd autogenerated cert"
|
|
validUntil := time.Now().Add(10 * 365 * 24 * time.Hour)
|
|
cert, key, err := certgen.NewTLSCertPair(tlsCurve, org,
|
|
validUntil, altDNSNames)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Write cert and key files.
|
|
if err = os.WriteFile(certFile, cert, 0644); err != nil {
|
|
return err
|
|
}
|
|
if err = os.WriteFile(keyFile, key, 0600); err != nil {
|
|
os.Remove(certFile)
|
|
return err
|
|
}
|
|
|
|
rpcsLog.Infof("Done generating TLS certificates")
|
|
return nil
|
|
}
|
|
|
|
// watchedFile houses details about a file that is being watched for updates.
|
|
type watchedFile struct {
|
|
path string
|
|
curTime time.Time
|
|
curSize int64
|
|
}
|
|
|
|
// updated returns whether or not the file has been updated since the last time
|
|
// it was checked and updates the file info details used to make that
|
|
// determination accordingly.
|
|
//
|
|
// It returns true for files that no longer exist.
|
|
//
|
|
// It returns false when any unexpected errors are encountered while attempting
|
|
// to get the file details or the provided watched file does not have a path
|
|
// associated with it.
|
|
func (f *watchedFile) updated() bool {
|
|
// Ignore watched files that don't have a path associated with them.
|
|
if f.path == "" {
|
|
return false
|
|
}
|
|
|
|
// Attempt to get file info about the watched file. Note that errors aside
|
|
// from files that no longer exist are intentionally ignored here so
|
|
// unexpected errors do not result in the file being reported as changed
|
|
// when it very likely was not.
|
|
fi, err := os.Stat(f.path)
|
|
if err != nil {
|
|
return os.IsNotExist(err)
|
|
}
|
|
changed := fi.Size() != f.curSize || fi.ModTime() != f.curTime
|
|
if changed {
|
|
f.curSize = fi.Size()
|
|
f.curTime = fi.ModTime()
|
|
}
|
|
return changed
|
|
}
|
|
|
|
// reloadableTLSConfig houses information for a TLS configuration that will
|
|
// dynamically reload the server certificate, server key, and client CAs when
|
|
// the associated files are updated.
|
|
type reloadableTLSConfig struct {
|
|
mtx sync.Mutex
|
|
minReloadCheckDelay time.Duration
|
|
nextReloadCheck time.Time
|
|
cert watchedFile
|
|
key watchedFile
|
|
clientCAs watchedFile
|
|
cachedConfig *tls.Config
|
|
prevAttemptErr error
|
|
}
|
|
|
|
// needsReload determines whether or the not the watched certificate files (and
|
|
// hence the TLS config that houses them) need to be reloaded.
|
|
//
|
|
// The conditions for reload are as follows:
|
|
// - Enough time has passed since the last time the files were checked
|
|
// - Either the modified time or file of any of the watched cert files have
|
|
// changed.
|
|
//
|
|
// This function MUST be called with the embedded mutex locked (for writes).
|
|
func (c *reloadableTLSConfig) needsReload() bool {
|
|
// Avoid checking for cert updates when not enough time has passed.
|
|
now := time.Now()
|
|
if now.Before(c.nextReloadCheck) {
|
|
return false
|
|
}
|
|
c.nextReloadCheck = now.Add(c.minReloadCheckDelay)
|
|
|
|
return c.cert.updated() || c.key.updated() || c.clientCAs.updated()
|
|
}
|
|
|
|
// newTLSConfig loads the provided server certificate and key pair along with
|
|
// the provided client CAs and returns a new tls.Config instance populated with
|
|
// the parsed values.
|
|
//
|
|
// The clientCAsPath may be an empty string when client authentication is not
|
|
// required.
|
|
func newTLSConfig(certPath, keyPath, clientCAsPath string, minVersion uint16) (*tls.Config, error) {
|
|
serverCert, err := tls.LoadX509KeyPair(certPath, keyPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tlsConfig := tls.Config{
|
|
Certificates: []tls.Certificate{serverCert},
|
|
MinVersion: minVersion,
|
|
}
|
|
if clientCAsPath != "" {
|
|
clientCAs, err := os.ReadFile(clientCAsPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
|
|
tlsConfig.ClientCAs = x509.NewCertPool()
|
|
if !tlsConfig.ClientCAs.AppendCertsFromPEM(clientCAs) {
|
|
return nil, fmt.Errorf("no certificates found in %q", clientCAsPath)
|
|
}
|
|
}
|
|
return &tlsConfig, nil
|
|
}
|
|
|
|
// configFileClient is intended to be set as the GetConfigForClient callback in
|
|
// the initial TLS configuration passed to the listener in order to enable
|
|
// automatically detecting and reloading certificate changes.
|
|
//
|
|
// This function is safe for concurrent access.
|
|
func (c *reloadableTLSConfig) configFileClient(_ *tls.ClientHelloInfo) (*tls.Config, error) {
|
|
defer c.mtx.Unlock()
|
|
c.mtx.Lock()
|
|
|
|
if !c.needsReload() {
|
|
return c.cachedConfig, nil
|
|
}
|
|
|
|
// Attempt to reload the certs and generate a new TLS config for them.
|
|
//
|
|
// Only update the cached config when there was no error in order to
|
|
// preserve the current working config.
|
|
tlsConfig, err := newTLSConfig(c.cert.path, c.key.path, c.clientCAs.path,
|
|
c.cachedConfig.MinVersion)
|
|
if err != nil {
|
|
if c.prevAttemptErr == nil || err.Error() != c.prevAttemptErr.Error() {
|
|
rpcsLog.Warnf("RPC certificates modification detected, but existing "+
|
|
"configuration preserved because the certificates failed to "+
|
|
"reload: %v\n", err)
|
|
}
|
|
c.prevAttemptErr = err
|
|
return c.cachedConfig, nil
|
|
}
|
|
c.prevAttemptErr = nil
|
|
|
|
rpcsLog.Info("Reloaded modified RPC certificates")
|
|
c.cachedConfig = tlsConfig
|
|
return c.cachedConfig, nil
|
|
}
|
|
|
|
// makeReloadableTLSConfig returns a TLS configuration that will dynamically
|
|
// reload the server certificate, server key, and client CAs from the configured
|
|
// paths when the files are updated.
|
|
//
|
|
// The client CAs path may be an empty string when client authentication is not
|
|
// required.
|
|
//
|
|
// This works by hooking up the GetConfigForClient callback which is invoked
|
|
// when a client connects. It makes use of caching and lazy loading (as opposed
|
|
// to polling) for better efficiency.
|
|
//
|
|
// An overview of the behavior is as follows:
|
|
//
|
|
// - All connections used a cached TLS config
|
|
// - When an underlying file is updated, as determined by its modification
|
|
// time being newer or its size changing, the certificates are reloaded and
|
|
// cached
|
|
// - Files are not checked for updates more than once every several seconds
|
|
// - Files are only checked for updates when a connection is made and are not
|
|
// checked more than once every several seconds
|
|
// - The existing cached config will be retained if any errors that would
|
|
// result in an invalid config are encountered (for example, removing the
|
|
// files, replacing the files with malformed or empty data, or replacing the
|
|
// key with one that is not valid for the cert)
|
|
func makeReloadableTLSConfig(certPath, keyPath, clientCAsPath string) (*tls.Config, error) {
|
|
const minVer = tls.VersionTLS12
|
|
cachedConfig, err := newTLSConfig(certPath, keyPath, clientCAsPath, minVer)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
minReloadCheckDelay := 5 * time.Second
|
|
c := &reloadableTLSConfig{
|
|
minReloadCheckDelay: minReloadCheckDelay,
|
|
nextReloadCheck: time.Now().Add(minReloadCheckDelay),
|
|
cert: watchedFile{path: certPath},
|
|
key: watchedFile{path: keyPath},
|
|
clientCAs: watchedFile{path: clientCAsPath},
|
|
cachedConfig: cachedConfig,
|
|
}
|
|
|
|
// Populate the initial file info for all watched files.
|
|
c.cert.updated()
|
|
c.key.updated()
|
|
c.clientCAs.updated()
|
|
|
|
return &tls.Config{
|
|
GetConfigForClient: c.configFileClient,
|
|
MinVersion: minVer,
|
|
}, nil
|
|
}
|
|
|
|
// setupRPCListeners returns a slice of listeners that are configured for use
|
|
// with the RPC server depending on the configuration settings for listen
|
|
// addresses and TLS.
|
|
func setupRPCListeners() ([]net.Listener, error) {
|
|
var notifyAddrServer boundAddrEventServer
|
|
if cfg.BoundAddrEvents {
|
|
notifyAddrServer = newBoundAddrEventServer(outgoingPipeMessages)
|
|
}
|
|
|
|
// Setup TLS if not disabled.
|
|
listenFunc := net.Listen
|
|
if !cfg.DisableRPC && !cfg.DisableTLS {
|
|
// Generate the TLS cert and key file if both don't already exist.
|
|
keyFileExists := fileExists(cfg.RPCKey)
|
|
certFileExists := fileExists(cfg.RPCCert)
|
|
if len(cfg.AltDNSNames) != 0 && (keyFileExists || certFileExists) {
|
|
rpcsLog.Warn("Additional DNS names specified when TLS " +
|
|
"certificates already exist will NOT be included:")
|
|
rpcsLog.Warnf("- In order to create TLS certs that include the "+
|
|
"additional DNS names, delete %q and %q and restart the server",
|
|
cfg.RPCKey, cfg.RPCCert)
|
|
}
|
|
if !keyFileExists && !certFileExists {
|
|
curve, err := tlsCurve(cfg.TLSCurve)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = genCertPair(cfg.RPCCert, cfg.RPCKey, cfg.AltDNSNames, curve)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
var clientCACerts string
|
|
if cfg.RPCAuthType == authTypeClientCert {
|
|
clientCACerts = cfg.RPCClientCAs
|
|
}
|
|
tlsConfig, err := makeReloadableTLSConfig(cfg.RPCCert, cfg.RPCKey,
|
|
clientCACerts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Change the standard net.Listen function to the tls one.
|
|
listenFunc = func(net string, laddr string) (net.Listener, error) {
|
|
return tls.Listen(net, laddr, tlsConfig)
|
|
}
|
|
}
|
|
|
|
netAddrs, err := parseListeners(cfg.RPCListeners)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
listeners := make([]net.Listener, 0, len(netAddrs))
|
|
for _, addr := range netAddrs {
|
|
listener, err := listenFunc(addr.Network(), addr.String())
|
|
if err != nil {
|
|
rpcsLog.Warnf("Can't listen on %s: %v", addr, err)
|
|
continue
|
|
}
|
|
listeners = append(listeners, listener)
|
|
notifyAddrServer.notifyRPCAddress(listener.Addr().String())
|
|
}
|
|
|
|
return listeners, nil
|
|
}
|
|
|
|
// newServer returns a new dcrd server configured to listen on addr for the
|
|
// decred network type specified by chainParams. Use start to begin accepting
|
|
// connections from peers.
|
|
func newServer(ctx context.Context, listenAddrs []string, db database.DB,
|
|
utxoDb *leveldb.DB, chainParams *chaincfg.Params,
|
|
dataDir string) (*server, error) {
|
|
|
|
amgr := addrmgr.New(cfg.DataDir, dcrdLookup)
|
|
services := defaultServices
|
|
|
|
var listeners []net.Listener
|
|
var nat *upnpNAT
|
|
if !cfg.DisableListen {
|
|
var err error
|
|
listeners, nat, err = initListeners(ctx, chainParams, amgr, listenAddrs,
|
|
services)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(listeners) == 0 {
|
|
return nil, errors.New("no valid listen address")
|
|
}
|
|
}
|
|
|
|
// Create a SigCache instance.
|
|
sigCache, err := txscript.NewSigCache(cfg.SigCacheMaxSize)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s := server{
|
|
chainParams: chainParams,
|
|
addrManager: amgr,
|
|
newPeers: make(chan *serverPeer, cfg.MaxPeers),
|
|
donePeers: make(chan *serverPeer, cfg.MaxPeers),
|
|
banPeers: make(chan *serverPeer, cfg.MaxPeers),
|
|
query: make(chan interface{}),
|
|
relayInv: make(chan relayMsg, cfg.MaxPeers),
|
|
broadcast: make(chan broadcastMsg, cfg.MaxPeers),
|
|
modifyRebroadcastInv: make(chan interface{}),
|
|
nat: nat,
|
|
db: db,
|
|
timeSource: blockchain.NewMedianTime(),
|
|
services: services,
|
|
sigCache: sigCache,
|
|
subsidyCache: standalone.NewSubsidyCache(chainParams),
|
|
lotteryDataBroadcast: make(map[chainhash.Hash]struct{}),
|
|
recentlyConfirmedTxns: apbf.NewFilter(maxRecentlyConfirmedTxns,
|
|
recentlyConfirmedTxnsFPRate),
|
|
indexSubscriber: indexers.NewIndexSubscriber(ctx),
|
|
quit: make(chan struct{}),
|
|
}
|
|
|
|
// Convert the minimum known work to a uint256 when it exists. Ideally, the
|
|
// chain params should be updated to use the new type, but that will be a
|
|
// major version bump, so a one-time conversion is a good tradeoff in the
|
|
// mean time.
|
|
minKnownWorkBig := chainParams.MinKnownChainWork
|
|
if minKnownWorkBig != nil {
|
|
s.minKnownWork.SetBig(minKnownWorkBig)
|
|
}
|
|
|
|
feC := fees.EstimatorConfig{
|
|
MinBucketFee: cfg.minRelayTxFee,
|
|
MaxBucketFee: dcrutil.Amount(fees.DefaultMaxBucketFeeMultiplier) * cfg.minRelayTxFee,
|
|
MaxConfirms: fees.DefaultMaxConfirmations,
|
|
FeeRateStep: fees.DefaultFeeRateStep,
|
|
DatabaseFile: path.Join(dataDir, "feesdb"),
|
|
|
|
// 1e5 is the previous (up to 1.1.0) mempool.DefaultMinRelayTxFee that
|
|
// un-upgraded wallets will be using, so track this particular rate
|
|
// explicitly. Note that bumping this value will cause the existing fees
|
|
// database to become invalid and will force nodes to explicitly delete
|
|
// it.
|
|
ExtraBucketFee: 1e5,
|
|
}
|
|
fe, err := fees.NewEstimator(&feC)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s.feeEstimator = fe
|
|
|
|
if cfg.AllowOldForks {
|
|
srvrLog.Info("Processing forks deep in history is enabled")
|
|
}
|
|
|
|
// Set assume valid when enabled.
|
|
var assumeValid chainhash.Hash
|
|
if cfg.AssumeValid != "0" {
|
|
// Default assume valid to the value specified by chain params.
|
|
assumeValid = s.chainParams.AssumeValid
|
|
|
|
// Override assume valid if specified by the config option.
|
|
if cfg.AssumeValid != "" {
|
|
hash, err := chainhash.NewHashFromStr(cfg.AssumeValid)
|
|
if err != nil {
|
|
err = fmt.Errorf("invalid hex for --assumevalid: %w", err)
|
|
return nil, err
|
|
}
|
|
assumeValid = *hash
|
|
srvrLog.Infof("Assume valid set to %v", assumeValid)
|
|
}
|
|
} else {
|
|
srvrLog.Info("Assume valid is disabled")
|
|
}
|
|
|
|
// Create a new block chain instance with the appropriate configuration.
|
|
utxoBackend := blockchain.NewLevelDbUtxoBackend(utxoDb)
|
|
utxoCache := blockchain.NewUtxoCache(&blockchain.UtxoCacheConfig{
|
|
Backend: utxoBackend,
|
|
FlushBlockDB: s.db.Flush,
|
|
MaxSize: uint64(cfg.UtxoCacheMaxSize) * 1024 * 1024,
|
|
})
|
|
s.chain, err = blockchain.New(ctx,
|
|
&blockchain.Config{
|
|
DB: s.db,
|
|
UtxoBackend: utxoBackend,
|
|
ChainParams: s.chainParams,
|
|
AssumeValid: assumeValid,
|
|
TimeSource: s.timeSource,
|
|
Notifications: s.handleBlockchainNotification,
|
|
SigCache: s.sigCache,
|
|
SubsidyCache: s.subsidyCache,
|
|
IndexSubscriber: s.indexSubscriber,
|
|
UtxoCache: utxoCache,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
queryer := &blockchain.ChainQueryerAdapter{BlockChain: s.chain}
|
|
if cfg.TxIndex {
|
|
indxLog.Info("Transaction index is enabled")
|
|
s.txIndex, err = indexers.NewTxIndex(s.indexSubscriber, db, queryer)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if !cfg.NoExistsAddrIndex {
|
|
indxLog.Info("Exists address index is enabled")
|
|
s.existsAddrIndex, err = indexers.NewExistsAddrIndex(s.indexSubscriber,
|
|
db, queryer)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
err = s.indexSubscriber.CatchUp(ctx, s.db, queryer)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
txC := mempool.Config{
|
|
Policy: mempool.Policy{
|
|
EnableAncestorTracking: len(cfg.miningAddrs) > 0,
|
|
AcceptNonStd: cfg.AcceptNonStd,
|
|
MaxOrphanTxs: cfg.MaxOrphanTxs,
|
|
MaxOrphanTxSize: mempool.MaxStandardTxSize,
|
|
MaxSigOpsPerTx: blockchain.MaxSigOpsPerBlock / 5,
|
|
MinRelayTxFee: cfg.minRelayTxFee,
|
|
AllowOldVotes: cfg.AllowOldVotes,
|
|
MaxVoteAge: func() uint16 {
|
|
switch chainParams.Net {
|
|
case wire.MainNet, wire.SimNet, wire.RegNet:
|
|
return chainParams.CoinbaseMaturity
|
|
|
|
case wire.TestNet3:
|
|
return defaultMaximumVoteAge
|
|
|
|
default:
|
|
return chainParams.CoinbaseMaturity
|
|
}
|
|
}(),
|
|
StandardVerifyFlags: func() (txscript.ScriptFlags, error) {
|
|
return standardScriptVerifyFlags(s.chain)
|
|
},
|
|
},
|
|
ChainParams: chainParams,
|
|
NextStakeDifficulty: func() (int64, error) {
|
|
return s.chain.BestSnapshot().NextStakeDiff, nil
|
|
},
|
|
FetchUtxoView: s.chain.FetchUtxoView,
|
|
BlockByHash: s.chain.BlockByHash,
|
|
BestHash: func() *chainhash.Hash { return &s.chain.BestSnapshot().Hash },
|
|
BestHeight: func() int64 { return s.chain.BestSnapshot().Height },
|
|
HeaderByHash: s.chain.HeaderByHash,
|
|
CalcSequenceLock: s.chain.CalcSequenceLock,
|
|
SubsidyCache: s.subsidyCache,
|
|
SigCache: s.sigCache,
|
|
PastMedianTime: func() time.Time {
|
|
return s.chain.BestSnapshot().MedianTime
|
|
},
|
|
ExistsAddrIndex: s.existsAddrIndex,
|
|
AddTxToFeeEstimation: s.feeEstimator.AddMemPoolTransaction,
|
|
RemoveTxFromFeeEstimation: s.feeEstimator.RemoveMemPoolTransaction,
|
|
OnVoteReceived: func(voteTx *dcrutil.Tx) {
|
|
if s.bg != nil {
|
|
s.bg.VoteReceived(voteTx)
|
|
}
|
|
},
|
|
OnTSpendReceived: func(tx *dcrutil.Tx) {
|
|
if s.rpcServer != nil {
|
|
s.rpcServer.NotifyTSpend(tx)
|
|
}
|
|
},
|
|
IsTreasuryAgendaActive: func() (bool, error) {
|
|
tipHash := &s.chain.BestSnapshot().Hash
|
|
return s.chain.IsTreasuryAgendaActive(tipHash)
|
|
},
|
|
IsAutoRevocationsAgendaActive: func() (bool, error) {
|
|
tipHash := &s.chain.BestSnapshot().Hash
|
|
return s.chain.IsAutoRevocationsAgendaActive(tipHash)
|
|
},
|
|
IsSubsidySplitAgendaActive: func() (bool, error) {
|
|
tipHash := &s.chain.BestSnapshot().Hash
|
|
return s.chain.IsSubsidySplitAgendaActive(tipHash)
|
|
},
|
|
IsSubsidySplitR2AgendaActive: func() (bool, error) {
|
|
tipHash := &s.chain.BestSnapshot().Hash
|
|
return s.chain.IsSubsidySplitR2AgendaActive(tipHash)
|
|
},
|
|
TSpendMinedOnAncestor: func(tspend chainhash.Hash) error {
|
|
tipHash := s.chain.BestSnapshot().Hash
|
|
return s.chain.CheckTSpendExists(tipHash, tspend)
|
|
},
|
|
}
|
|
s.txMemPool = mempool.New(&txC)
|
|
|
|
s.syncManager = netsync.New(&netsync.Config{
|
|
PeerNotifier: &s,
|
|
Chain: s.chain,
|
|
ChainParams: s.chainParams,
|
|
TimeSource: s.timeSource,
|
|
TxMemPool: s.txMemPool,
|
|
NoMiningStateSync: cfg.NoMiningStateSync,
|
|
MaxPeers: cfg.MaxPeers,
|
|
MaxOrphanTxs: cfg.MaxOrphanTxs,
|
|
RecentlyConfirmedTxns: s.recentlyConfirmedTxns,
|
|
})
|
|
|
|
// Dump the blockchain and quit if requested.
|
|
if cfg.DumpBlockchain != "" {
|
|
err := dumpBlockChain(s.chainParams, s.chain)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return nil, fmt.Errorf("closing after dumping blockchain")
|
|
}
|
|
|
|
// Create the background block template generator and CPU miner if the
|
|
// config has a mining address.
|
|
if len(cfg.miningAddrs) > 0 {
|
|
// Create the mining policy and block template generator based on the
|
|
// configuration options.
|
|
//
|
|
// NOTE: The CPU miner relies on the mempool, so the mempool has to be
|
|
// created before calling the function to create the CPU miner.
|
|
policy := mining.Policy{
|
|
BlockMaxSize: cfg.BlockMaxSize,
|
|
TxMinFreeFee: cfg.minRelayTxFee,
|
|
AggressiveMining: !cfg.NonAggressive,
|
|
StandardVerifyFlags: func() (txscript.ScriptFlags, error) {
|
|
return standardScriptVerifyFlags(s.chain)
|
|
},
|
|
}
|
|
tg := mining.NewBlkTmplGenerator(&mining.Config{
|
|
Policy: &policy,
|
|
TxSource: s.txMemPool,
|
|
TimeSource: s.timeSource,
|
|
SubsidyCache: s.subsidyCache,
|
|
ChainParams: s.chainParams,
|
|
MiningTimeOffset: cfg.MiningTimeOffset,
|
|
BestSnapshot: s.chain.BestSnapshot,
|
|
BlockByHash: s.chain.BlockByHash,
|
|
CalcNextRequiredDifficulty: s.chain.CalcNextRequiredDifficulty,
|
|
CalcStakeVersionByHash: s.chain.CalcStakeVersionByHash,
|
|
CheckConnectBlockTemplate: s.chain.CheckConnectBlockTemplate,
|
|
CheckTicketExhaustion: s.chain.CheckTicketExhaustion,
|
|
CheckTransactionInputs: func(tx *dcrutil.Tx, txHeight int64,
|
|
view *blockchain.UtxoViewpoint, checkFraudProof bool,
|
|
prevHeader *wire.BlockHeader, isTreasuryEnabled,
|
|
isAutoRevocationsEnabled bool,
|
|
subsidySplitVariant standalone.SubsidySplitVariant) (int64, error) {
|
|
|
|
return blockchain.CheckTransactionInputs(s.subsidyCache, tx, txHeight,
|
|
view, checkFraudProof, s.chainParams, prevHeader, isTreasuryEnabled,
|
|
isAutoRevocationsEnabled, subsidySplitVariant)
|
|
},
|
|
CheckTSpendHasVotes: s.chain.CheckTSpendHasVotes,
|
|
CountSigOps: blockchain.CountSigOps,
|
|
FetchUtxoEntry: s.chain.FetchUtxoEntry,
|
|
FetchUtxoView: s.chain.FetchUtxoView,
|
|
FetchUtxoViewParentTemplate: s.chain.FetchUtxoViewParentTemplate,
|
|
ForceHeadReorganization: s.chain.ForceHeadReorganization,
|
|
HeaderByHash: s.chain.HeaderByHash,
|
|
IsFinalizedTransaction: blockchain.IsFinalizedTransaction,
|
|
IsHeaderCommitmentsAgendaActive: s.chain.IsHeaderCommitmentsAgendaActive,
|
|
IsTreasuryAgendaActive: s.chain.IsTreasuryAgendaActive,
|
|
IsAutoRevocationsAgendaActive: s.chain.IsAutoRevocationsAgendaActive,
|
|
IsSubsidySplitAgendaActive: s.chain.IsSubsidySplitAgendaActive,
|
|
IsSubsidySplitR2AgendaActive: s.chain.IsSubsidySplitR2AgendaActive,
|
|
MaxTreasuryExpenditure: s.chain.MaxTreasuryExpenditure,
|
|
NewUtxoViewpoint: func() *blockchain.UtxoViewpoint {
|
|
return blockchain.NewUtxoViewpoint(utxoCache)
|
|
},
|
|
TipGeneration: s.chain.TipGeneration,
|
|
ValidateTransactionScripts: func(tx *dcrutil.Tx,
|
|
utxoView *blockchain.UtxoViewpoint, flags txscript.ScriptFlags,
|
|
isAutoRevocationsEnabled bool) error {
|
|
|
|
return blockchain.ValidateTransactionScripts(tx, utxoView, flags,
|
|
s.sigCache, isAutoRevocationsEnabled)
|
|
},
|
|
})
|
|
|
|
s.bg = mining.NewBgBlkTmplGenerator(&mining.BgBlkTmplConfig{
|
|
TemplateGenerator: tg,
|
|
MiningAddrs: cfg.miningAddrs,
|
|
AllowUnsyncedMining: cfg.AllowUnsyncedMining,
|
|
IsCurrent: s.syncManager.IsCurrent,
|
|
})
|
|
|
|
s.cpuMiner = cpuminer.New(&cpuminer.Config{
|
|
ChainParams: s.chainParams,
|
|
PermitConnectionlessMining: cfg.SimNet || cfg.RegNet,
|
|
BgBlkTmplGenerator: s.bg,
|
|
ProcessBlock: s.syncManager.ProcessBlock,
|
|
ConnectedCount: s.ConnectedCount,
|
|
IsCurrent: s.syncManager.IsCurrent,
|
|
IsKnownInvalidBlock: s.chain.IsKnownInvalidBlock,
|
|
IsBlake3PowAgendaActive: s.chain.IsBlake3PowAgendaActive,
|
|
})
|
|
}
|
|
|
|
// Only setup a function to return new addresses to connect to when
|
|
// not running in connect-only mode. The simulation and regression networks
|
|
// are always in connect-only mode since they are only intended to connect
|
|
// to specified peers and actively avoid advertising and connecting to
|
|
// discovered peers in order to prevent it from becoming a public test
|
|
// network.
|
|
var newAddressFunc func() (net.Addr, error)
|
|
if !cfg.SimNet && !cfg.RegNet && len(cfg.ConnectPeers) == 0 {
|
|
newAddressFunc = func() (net.Addr, error) {
|
|
for tries := 0; tries < 100; tries++ {
|
|
addr := s.addrManager.GetAddress()
|
|
if addr == nil {
|
|
break
|
|
}
|
|
|
|
// Address will not be invalid, local or unroutable
|
|
// because addrmanager rejects those on addition.
|
|
// Just check that we don't already have an address
|
|
// in the same group so that we are not connecting
|
|
// to the same network segment at the expense of
|
|
// others.
|
|
netAddr := addr.NetAddress()
|
|
if s.OutboundGroupCount(netAddr.GroupKey()) != 0 {
|
|
continue
|
|
}
|
|
|
|
// only allow recent nodes (10mins) after we failed 30
|
|
// times
|
|
if tries < 30 && time.Since(addr.LastAttempt()) < 10*time.Minute {
|
|
continue
|
|
}
|
|
|
|
// allow nondefault ports after 50 failed tries.
|
|
if fmt.Sprintf("%d", netAddr.Port) !=
|
|
s.chainParams.DefaultPort && tries < 50 {
|
|
continue
|
|
}
|
|
|
|
return addrStringToNetAddr(netAddr.Key())
|
|
}
|
|
|
|
return nil, errors.New("no valid connect address")
|
|
}
|
|
}
|
|
|
|
// Create a connection manager.
|
|
targetOutbound := defaultTargetOutbound
|
|
if cfg.MaxPeers < targetOutbound {
|
|
targetOutbound = cfg.MaxPeers
|
|
}
|
|
cmgr, err := connmgr.New(&connmgr.Config{
|
|
Listeners: listeners,
|
|
OnAccept: s.inboundPeerConnected,
|
|
RetryDuration: connectionRetryInterval,
|
|
TargetOutbound: uint32(targetOutbound),
|
|
Dial: s.attemptDcrdDial,
|
|
Timeout: cfg.DialTimeout,
|
|
OnConnection: s.outboundPeerConnected,
|
|
GetNewAddress: newAddressFunc,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s.connManager = cmgr
|
|
|
|
// Start up persistent peers.
|
|
permanentPeers := cfg.ConnectPeers
|
|
if len(permanentPeers) == 0 {
|
|
permanentPeers = cfg.AddPeers
|
|
}
|
|
for _, addr := range permanentPeers {
|
|
tcpAddr, err := addrStringToNetAddr(addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
go s.connManager.Connect(ctx,
|
|
&connmgr.ConnReq{
|
|
Addr: tcpAddr,
|
|
Permanent: true,
|
|
})
|
|
}
|
|
|
|
if !cfg.DisableRPC {
|
|
// Setup listeners for the configured RPC listen addresses and
|
|
// TLS settings.
|
|
rpcListeners, err := setupRPCListeners()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(rpcListeners) == 0 {
|
|
return nil, errors.New("no usable rpc listen addresses")
|
|
}
|
|
|
|
rpcsConfig := rpcserver.Config{
|
|
Listeners: rpcListeners,
|
|
ConnMgr: &rpcConnManager{&s},
|
|
SyncMgr: &rpcSyncMgr{server: &s, syncMgr: s.syncManager},
|
|
FeeEstimator: s.feeEstimator,
|
|
TimeSource: s.timeSource,
|
|
Services: s.services,
|
|
AddrManager: s.addrManager,
|
|
Clock: &rpcClock{},
|
|
SubsidyCache: s.subsidyCache,
|
|
Chain: &rpcChain{s.chain},
|
|
ChainParams: chainParams,
|
|
SanityChecker: &rpcSanityChecker{
|
|
chain: s.chain,
|
|
timeSource: s.timeSource,
|
|
chainParams: chainParams,
|
|
},
|
|
DB: db,
|
|
TxMempooler: s.txMemPool,
|
|
CPUMiner: &rpcCPUMiner{s.cpuMiner},
|
|
NetInfo: cfg.generateNetworkInfo(),
|
|
MinRelayTxFee: cfg.minRelayTxFee,
|
|
Proxy: cfg.Proxy,
|
|
RPCUser: cfg.RPCUser,
|
|
RPCPass: cfg.RPCPass,
|
|
RPCLimitUser: cfg.RPCLimitUser,
|
|
RPCLimitPass: cfg.RPCLimitPass,
|
|
RPCMaxClients: cfg.RPCMaxClients,
|
|
RPCMaxConcurrentReqs: cfg.RPCMaxConcurrentReqs,
|
|
RPCMaxWebsockets: cfg.RPCMaxWebsockets,
|
|
TestNet: cfg.TestNet,
|
|
MiningAddrs: cfg.miningAddrs,
|
|
AllowUnsyncedMining: cfg.AllowUnsyncedMining,
|
|
MaxProtocolVersion: maxProtocolVersion,
|
|
UserAgentVersion: userAgentVersion,
|
|
LogManager: &rpcLogManager{},
|
|
FiltererV2: s.chain,
|
|
}
|
|
if s.existsAddrIndex != nil {
|
|
rpcsConfig.ExistsAddresser = s.existsAddrIndex
|
|
}
|
|
if s.bg != nil {
|
|
rpcsConfig.BlockTemplater = &rpcBlockTemplater{s.bg}
|
|
}
|
|
if s.txIndex != nil {
|
|
rpcsConfig.TxIndexer = s.txIndex
|
|
}
|
|
|
|
s.rpcServer, err = rpcserver.New(&rpcsConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Signal process shutdown when the RPC server requests it.
|
|
go func() {
|
|
<-s.rpcServer.RequestedProcessShutdown()
|
|
shutdownRequestChannel <- struct{}{}
|
|
}()
|
|
}
|
|
|
|
return &s, nil
|
|
}
|
|
|
|
// initListeners initializes the configured net listeners and adds any bound
|
|
// addresses to the address manager. Returns the listeners and a NAT interface,
|
|
// which is non-nil if UPnP is in use.
|
|
func initListeners(ctx context.Context, params *chaincfg.Params, amgr *addrmgr.AddrManager, listenAddrs []string, services wire.ServiceFlag) ([]net.Listener, *upnpNAT, error) {
|
|
// Listen for TCP connections at the configured addresses
|
|
netAddrs, err := parseListeners(listenAddrs)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
var notifyAddrServer boundAddrEventServer
|
|
if cfg.BoundAddrEvents {
|
|
notifyAddrServer = newBoundAddrEventServer(outgoingPipeMessages)
|
|
}
|
|
|
|
listeners := make([]net.Listener, 0, len(netAddrs))
|
|
for _, addr := range netAddrs {
|
|
var listenConfig net.ListenConfig
|
|
listener, err := listenConfig.Listen(ctx, addr.Network(), addr.String())
|
|
if err != nil {
|
|
srvrLog.Warnf("Can't listen on %s: %v", addr, err)
|
|
continue
|
|
}
|
|
listeners = append(listeners, listener)
|
|
notifyAddrServer.notifyP2PAddress(listener.Addr().String())
|
|
}
|
|
|
|
var nat *upnpNAT
|
|
if len(cfg.ExternalIPs) != 0 {
|
|
defaultPort, err := strconv.ParseUint(params.DefaultPort, 10, 16)
|
|
if err != nil {
|
|
srvrLog.Errorf("Can not parse default port %s for active chain: %v",
|
|
params.DefaultPort, err)
|
|
return nil, nil, err
|
|
}
|
|
|
|
for _, sip := range cfg.ExternalIPs {
|
|
eport := uint16(defaultPort)
|
|
host, portstr, err := net.SplitHostPort(sip)
|
|
if err != nil {
|
|
// no port, use default.
|
|
host = sip
|
|
} else {
|
|
port, err := strconv.ParseUint(portstr, 10, 16)
|
|
if err != nil {
|
|
srvrLog.Warnf("Can not parse port from %s for "+
|
|
"externalip: %v", sip, err)
|
|
continue
|
|
}
|
|
eport = uint16(port)
|
|
}
|
|
|
|
na, err := amgr.HostToNetAddress(host, eport, services)
|
|
if err != nil {
|
|
srvrLog.Warnf("Not adding %s as externalip: %v", sip, err)
|
|
continue
|
|
}
|
|
|
|
err = amgr.AddLocalAddress(na, addrmgr.ManualPrio)
|
|
if err != nil {
|
|
amgrLog.Warnf("Skipping specified external IP: %v", err)
|
|
}
|
|
}
|
|
} else {
|
|
if cfg.Upnp {
|
|
var err error
|
|
nat, err = discover(ctx)
|
|
if err != nil {
|
|
srvrLog.Warnf("Can't discover upnp: %v", err)
|
|
}
|
|
// nil nat here is fine, just means no upnp on network.
|
|
}
|
|
|
|
// Add bound addresses to address manager to be advertised to peers.
|
|
for _, listener := range listeners {
|
|
addr := listener.Addr().String()
|
|
err := addLocalAddress(amgr, addr, services)
|
|
if err != nil {
|
|
amgrLog.Warnf("Skipping bound address %s: %v", addr, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return listeners, nat, nil
|
|
}
|
|
|
|
// addrStringToNetAddr takes an address in the form of 'host:port' and returns
|
|
// a net.Addr which maps to the original address with any host names resolved
|
|
// to IP addresses.
|
|
func addrStringToNetAddr(addr string) (net.Addr, error) {
|
|
host, strPort, err := net.SplitHostPort(addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Attempt to look up an IP address associated with the parsed host.
|
|
// The dcrdLookup function will transparently handle performing the
|
|
// lookup over Tor if necessary.
|
|
ips, err := dcrdLookup(host)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(ips) == 0 {
|
|
return nil, fmt.Errorf("no addresses found for %s", host)
|
|
}
|
|
|
|
port, err := strconv.Atoi(strPort)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &net.TCPAddr{
|
|
IP: ips[0],
|
|
Port: port,
|
|
}, nil
|
|
}
|
|
|
|
// addLocalAddress adds an address that this node is listening on to the
|
|
// address manager so that it may be relayed to peers.
|
|
func addLocalAddress(addrMgr *addrmgr.AddrManager, addr string, services wire.ServiceFlag) error {
|
|
host, portStr, err := net.SplitHostPort(addr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
port, err := strconv.ParseUint(portStr, 10, 16)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if ip := net.ParseIP(host); ip != nil && ip.IsUnspecified() {
|
|
// If bound to unspecified address, advertise all local interfaces
|
|
addrs, err := net.InterfaceAddrs()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, addr := range addrs {
|
|
ifaceIP, _, err := net.ParseCIDR(addr.String())
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
// If bound to 0.0.0.0, do not add IPv6 interfaces and if bound to
|
|
// ::, do not add IPv4 interfaces.
|
|
if (ip.To4() == nil) != (ifaceIP.To4() == nil) {
|
|
continue
|
|
}
|
|
|
|
netAddr := addrmgr.NewNetAddressIPPort(ifaceIP, uint16(port),
|
|
services)
|
|
addrMgr.AddLocalAddress(netAddr, addrmgr.BoundPrio)
|
|
}
|
|
} else {
|
|
netAddr, err := addrMgr.HostToNetAddress(host, uint16(port), services)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
addrMgr.AddLocalAddress(netAddr, addrmgr.BoundPrio)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// isWhitelisted returns whether the IP address is included in the whitelisted
|
|
// networks and IPs.
|
|
func isWhitelisted(addr net.Addr) bool {
|
|
if len(cfg.whitelists) == 0 {
|
|
return false
|
|
}
|
|
|
|
host, _, err := net.SplitHostPort(addr.String())
|
|
if err != nil {
|
|
srvrLog.Warnf("Unable to SplitHostPort on '%s': %v", addr, err)
|
|
return false
|
|
}
|
|
ip := net.ParseIP(host)
|
|
if ip == nil {
|
|
srvrLog.Warnf("Unable to parse IP '%s'", addr)
|
|
return false
|
|
}
|
|
|
|
for _, ipnet := range cfg.whitelists {
|
|
if ipnet.Contains(ip) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|