From 0aabf85c1d012b11b596cdae5e45567b4b002119 Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Wed, 12 May 2021 09:36:43 +0100 Subject: [PATCH] Sentry fixes (for very frequent `peer not found` error) (#1918) * Extra prints * Remove printouts * Printing back * Fix the stuck stage 1 * Not allow peers to connect more than once Co-authored-by: Alex Sharp --- cmd/headers/download/downloader.go | 1 + cmd/headers/download/sentry.go | 18 ++++++------------ eth/stagedsync/stage_headers_new.go | 8 ++++---- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/cmd/headers/download/downloader.go b/cmd/headers/download/downloader.go index 2687a2996..f2413fa18 100644 --- a/cmd/headers/download/downloader.go +++ b/cmd/headers/download/downloader.go @@ -444,6 +444,7 @@ func NewControlServer(db ethdb.Database, nodeName string, chainConfig *params.Ch } func (cs *ControlServerImpl) newBlockHashes(ctx context.Context, req *proto_sentry.InboundMessage, sentry proto_sentry.SentryClient) error { + //log.Info(fmt.Sprintf("NewBlockHashes from [%s]", gointerfaces.ConvertH512ToBytes(req.PeerId))) var request eth.NewBlockHashesPacket if err := rlp.DecodeBytes(req.Data, &request); err != nil { return fmt.Errorf("decode NewBlockHashes: %v", err) diff --git a/cmd/headers/download/sentry.go b/cmd/headers/download/sentry.go index 1b333346f..8c696db49 100644 --- a/cmd/headers/download/sentry.go +++ b/cmd/headers/download/sentry.go @@ -191,6 +191,10 @@ func MakeProtocols(ctx context.Context, DialCandidates: dialCandidates, Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error { peerID := peer.ID().String() + if _, ok := peers.Load(peerID); ok { + log.Debug(fmt.Sprintf("[%s] Peer already has connection", peerID)) + return nil + } log.Debug(fmt.Sprintf("[%s] Start with peer", peerID)) if err := handShake(ctx, statusFn(), peerID, rw, eth.ProtocolVersions[0], eth.ProtocolVersions[0]); err != nil { return fmt.Errorf("handshake to peer %s: %v", peerID, err) @@ -778,12 +782,7 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p return true } if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(req.Data.Data)), Payload: bytes.NewReader(req.Data.Data)}); err != nil { - if x, ok := ss.Peers.Load(peerID); ok { - peerInfo := x.(*PeerInfo) - if peerInfo != nil { - peerInfo.Remove() - } - } + peerInfo.Remove() ss.Peers.Delete(peerID) innerErr = err return false @@ -818,12 +817,7 @@ func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sen return true } if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(req.Data)), Payload: bytes.NewReader(req.Data)}); err != nil { - if x, ok := ss.Peers.Load(peerID); ok { - peerInfo := x.(*PeerInfo) - if peerInfo != nil { - peerInfo.Remove() - } - } + peerInfo.Remove() ss.Peers.Delete(peerID) innerErr = err return false diff --git a/eth/stagedsync/stage_headers_new.go b/eth/stagedsync/stage_headers_new.go index bd25460db..ba7f2603f 100644 --- a/eth/stagedsync/stage_headers_new.go +++ b/eth/stagedsync/stage_headers_new.go @@ -196,6 +196,10 @@ func HeadersForward( // if this is not an initial cycle, we need to react quickly when new headers are coming in break } + if initialCycle && cfg.hd.InSync() { + log.Debug("Top seen", "height", cfg.hd.TopSeenHeight()) + break + } timer = time.NewTimer(1 * time.Second) select { case <-ctx.Done(): @@ -209,10 +213,6 @@ func HeadersForward( case <-cfg.hd.DeliveryNotify: log.Debug("headerLoop woken up by the incoming request") } - if initialCycle && cfg.hd.InSync() { - log.Debug("Top seen", "height", cfg.hd.TopSeenHeight()) - break - } } if headerInserter.AnythingDone() { if err := s.Update(batch, headerInserter.GetHighest()); err != nil {