mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-31 16:21:21 +00:00
6fb810adaa
Lookup calls would spin out of control when network connectivity was lost. The throttling that was in place only took effect when the table returned zero results, which doesn't happen very often. The new throttling should not have a negative impact when the host is online. Lookups against the network take some time and dials for all results must complete or hit the cache before a new one is started. This usually takes longer than four seconds, leaving online lookups unaffected. Fixes #1296
277 lines
7.0 KiB
Go
277 lines
7.0 KiB
Go
package p2p
|
|
|
|
import (
|
|
"container/heap"
|
|
"crypto/rand"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/logger"
|
|
"github.com/ethereum/go-ethereum/logger/glog"
|
|
"github.com/ethereum/go-ethereum/p2p/discover"
|
|
)
|
|
|
|
const (
|
|
// This is the amount of time spent waiting in between
|
|
// redialing a certain node.
|
|
dialHistoryExpiration = 30 * time.Second
|
|
|
|
// Discovery lookups are throttled and can only run
|
|
// once every few seconds.
|
|
lookupInterval = 4 * time.Second
|
|
)
|
|
|
|
// dialstate schedules dials and discovery lookups.
|
|
// it get's a chance to compute new tasks on every iteration
|
|
// of the main loop in Server.run.
|
|
type dialstate struct {
|
|
maxDynDials int
|
|
ntab discoverTable
|
|
|
|
lookupRunning bool
|
|
bootstrapped bool
|
|
|
|
dialing map[discover.NodeID]connFlag
|
|
lookupBuf []*discover.Node // current discovery lookup results
|
|
randomNodes []*discover.Node // filled from Table
|
|
static map[discover.NodeID]*discover.Node
|
|
hist *dialHistory
|
|
}
|
|
|
|
type discoverTable interface {
|
|
Self() *discover.Node
|
|
Close()
|
|
Bootstrap([]*discover.Node)
|
|
Lookup(target discover.NodeID) []*discover.Node
|
|
ReadRandomNodes([]*discover.Node) int
|
|
}
|
|
|
|
// the dial history remembers recent dials.
|
|
type dialHistory []pastDial
|
|
|
|
// pastDial is an entry in the dial history.
|
|
type pastDial struct {
|
|
id discover.NodeID
|
|
exp time.Time
|
|
}
|
|
|
|
type task interface {
|
|
Do(*Server)
|
|
}
|
|
|
|
// A dialTask is generated for each node that is dialed.
|
|
type dialTask struct {
|
|
flags connFlag
|
|
dest *discover.Node
|
|
}
|
|
|
|
// discoverTask runs discovery table operations.
|
|
// Only one discoverTask is active at any time.
|
|
//
|
|
// If bootstrap is true, the task runs Table.Bootstrap,
|
|
// otherwise it performs a random lookup and leaves the
|
|
// results in the task.
|
|
type discoverTask struct {
|
|
bootstrap bool
|
|
results []*discover.Node
|
|
}
|
|
|
|
// A waitExpireTask is generated if there are no other tasks
|
|
// to keep the loop in Server.run ticking.
|
|
type waitExpireTask struct {
|
|
time.Duration
|
|
}
|
|
|
|
func newDialState(static []*discover.Node, ntab discoverTable, maxdyn int) *dialstate {
|
|
s := &dialstate{
|
|
maxDynDials: maxdyn,
|
|
ntab: ntab,
|
|
static: make(map[discover.NodeID]*discover.Node),
|
|
dialing: make(map[discover.NodeID]connFlag),
|
|
randomNodes: make([]*discover.Node, maxdyn/2),
|
|
hist: new(dialHistory),
|
|
}
|
|
for _, n := range static {
|
|
s.static[n.ID] = n
|
|
}
|
|
return s
|
|
}
|
|
|
|
func (s *dialstate) addStatic(n *discover.Node) {
|
|
s.static[n.ID] = n
|
|
}
|
|
|
|
func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
|
|
var newtasks []task
|
|
addDial := func(flag connFlag, n *discover.Node) bool {
|
|
_, dialing := s.dialing[n.ID]
|
|
if dialing || peers[n.ID] != nil || s.hist.contains(n.ID) {
|
|
return false
|
|
}
|
|
s.dialing[n.ID] = flag
|
|
newtasks = append(newtasks, &dialTask{flags: flag, dest: n})
|
|
return true
|
|
}
|
|
|
|
// Compute number of dynamic dials necessary at this point.
|
|
needDynDials := s.maxDynDials
|
|
for _, p := range peers {
|
|
if p.rw.is(dynDialedConn) {
|
|
needDynDials--
|
|
}
|
|
}
|
|
for _, flag := range s.dialing {
|
|
if flag&dynDialedConn != 0 {
|
|
needDynDials--
|
|
}
|
|
}
|
|
|
|
// Expire the dial history on every invocation.
|
|
s.hist.expire(now)
|
|
|
|
// Create dials for static nodes if they are not connected.
|
|
for _, n := range s.static {
|
|
addDial(staticDialedConn, n)
|
|
}
|
|
|
|
// Use random nodes from the table for half of the necessary
|
|
// dynamic dials.
|
|
randomCandidates := needDynDials / 2
|
|
if randomCandidates > 0 && s.bootstrapped {
|
|
n := s.ntab.ReadRandomNodes(s.randomNodes)
|
|
for i := 0; i < randomCandidates && i < n; i++ {
|
|
if addDial(dynDialedConn, s.randomNodes[i]) {
|
|
needDynDials--
|
|
}
|
|
}
|
|
}
|
|
// Create dynamic dials from random lookup results, removing tried
|
|
// items from the result buffer.
|
|
i := 0
|
|
for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {
|
|
if addDial(dynDialedConn, s.lookupBuf[i]) {
|
|
needDynDials--
|
|
}
|
|
}
|
|
s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]
|
|
// Launch a discovery lookup if more candidates are needed. The
|
|
// first discoverTask bootstraps the table and won't return any
|
|
// results.
|
|
if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
|
|
s.lookupRunning = true
|
|
newtasks = append(newtasks, &discoverTask{bootstrap: !s.bootstrapped})
|
|
}
|
|
|
|
// Launch a timer to wait for the next node to expire if all
|
|
// candidates have been tried and no task is currently active.
|
|
// This should prevent cases where the dialer logic is not ticked
|
|
// because there are no pending events.
|
|
if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {
|
|
t := &waitExpireTask{s.hist.min().exp.Sub(now)}
|
|
newtasks = append(newtasks, t)
|
|
}
|
|
return newtasks
|
|
}
|
|
|
|
func (s *dialstate) taskDone(t task, now time.Time) {
|
|
switch t := t.(type) {
|
|
case *dialTask:
|
|
s.hist.add(t.dest.ID, now.Add(dialHistoryExpiration))
|
|
delete(s.dialing, t.dest.ID)
|
|
case *discoverTask:
|
|
if t.bootstrap {
|
|
s.bootstrapped = true
|
|
}
|
|
s.lookupRunning = false
|
|
s.lookupBuf = append(s.lookupBuf, t.results...)
|
|
}
|
|
}
|
|
|
|
func (t *dialTask) Do(srv *Server) {
|
|
addr := &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}
|
|
glog.V(logger.Debug).Infof("dialing %v\n", t.dest)
|
|
fd, err := srv.Dialer.Dial("tcp", addr.String())
|
|
if err != nil {
|
|
glog.V(logger.Detail).Infof("dial error: %v", err)
|
|
return
|
|
}
|
|
srv.setupConn(fd, t.flags, t.dest)
|
|
}
|
|
func (t *dialTask) String() string {
|
|
return fmt.Sprintf("%v %x %v:%d", t.flags, t.dest.ID[:8], t.dest.IP, t.dest.TCP)
|
|
}
|
|
|
|
func (t *discoverTask) Do(srv *Server) {
|
|
if t.bootstrap {
|
|
srv.ntab.Bootstrap(srv.BootstrapNodes)
|
|
return
|
|
}
|
|
// newTasks generates a lookup task whenever dynamic dials are
|
|
// necessary. Lookups need to take some time, otherwise the
|
|
// event loop spins too fast.
|
|
next := srv.lastLookup.Add(lookupInterval)
|
|
if now := time.Now(); now.Before(next) {
|
|
time.Sleep(next.Sub(now))
|
|
}
|
|
srv.lastLookup = time.Now()
|
|
var target discover.NodeID
|
|
rand.Read(target[:])
|
|
t.results = srv.ntab.Lookup(target)
|
|
}
|
|
|
|
func (t *discoverTask) String() (s string) {
|
|
if t.bootstrap {
|
|
s = "discovery bootstrap"
|
|
} else {
|
|
s = "discovery lookup"
|
|
}
|
|
if len(t.results) > 0 {
|
|
s += fmt.Sprintf(" (%d results)", len(t.results))
|
|
}
|
|
return s
|
|
}
|
|
|
|
func (t waitExpireTask) Do(*Server) {
|
|
time.Sleep(t.Duration)
|
|
}
|
|
func (t waitExpireTask) String() string {
|
|
return fmt.Sprintf("wait for dial hist expire (%v)", t.Duration)
|
|
}
|
|
|
|
// Use only these methods to access or modify dialHistory.
|
|
func (h dialHistory) min() pastDial {
|
|
return h[0]
|
|
}
|
|
func (h *dialHistory) add(id discover.NodeID, exp time.Time) {
|
|
heap.Push(h, pastDial{id, exp})
|
|
}
|
|
func (h dialHistory) contains(id discover.NodeID) bool {
|
|
for _, v := range h {
|
|
if v.id == id {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
func (h *dialHistory) expire(now time.Time) {
|
|
for h.Len() > 0 && h.min().exp.Before(now) {
|
|
heap.Pop(h)
|
|
}
|
|
}
|
|
|
|
// heap.Interface boilerplate
|
|
func (h dialHistory) Len() int { return len(h) }
|
|
func (h dialHistory) Less(i, j int) bool { return h[i].exp.Before(h[j].exp) }
|
|
func (h dialHistory) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
func (h *dialHistory) Push(x interface{}) {
|
|
*h = append(*h, x.(pastDial))
|
|
}
|
|
func (h *dialHistory) Pop() interface{} {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return x
|
|
}
|