From 0b7d185d1f0d2e3281a9614e6f4986de2e718fcb Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Tue, 11 May 2021 13:58:43 +0100 Subject: [PATCH] Trace filter 5 (and sentry improvement) (#1914) * Append/AppendDup * Print appends * Remove printing * Increase permit on announcements Co-authored-by: Alex Sharp Co-authored-by: Alexey Sharp --- cmd/headers/download/sentry.go | 28 ++++++++++++++-------------- eth/stagedsync/stage_execute.go | 10 ++++++++-- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/cmd/headers/download/sentry.go b/cmd/headers/download/sentry.go index 759809adb..11752378d 100644 --- a/cmd/headers/download/sentry.go +++ b/cmd/headers/download/sentry.go @@ -312,6 +312,7 @@ func runPeer( msg.Discard() return fmt.Errorf("message is too large %d, limit %d", msg.Size, eth.ProtocolMaxMsgSize) } + increasePermit := false switch msg.Code { case eth.StatusMsg: msg.Discard() @@ -324,13 +325,7 @@ func runPeer( } trySend(receiveUploadCh, &StreamMsg{b, peerID, "GetBlockHeadersMsg", proto_sentry.MessageId_GetBlockHeaders}) case eth.BlockHeadersMsg: - // Peer responded or sent message - reset the "back off" timer - permitsRaw, _ := peerPermitMap.Load(peerID) - permits, _ := permitsRaw.(int) - if permits < maxPermitsPerPeer { - permits++ - peerPermitMap.Store(peerID, permits) - } + increasePermit = true b := make([]byte, msg.Size) if _, err := io.ReadFull(msg.Payload, b); err != nil { log.Error(fmt.Sprintf("%s: reading msg into bytes: %v", peerID, err)) @@ -343,13 +338,7 @@ func runPeer( } trySend(receiveUploadCh, &StreamMsg{b, peerID, "GetBlockBodiesMsg", proto_sentry.MessageId_GetBlockBodies}) case eth.BlockBodiesMsg: - // Peer responded or sent message - reset the "back off" timer - permitsRaw, _ := peerPermitMap.Load(peerID) - permits, _ := permitsRaw.(int) - if permits < maxPermitsPerPeer { - permits++ - peerPermitMap.Store(peerID, permits) - } + increasePermit = true b := make([]byte, msg.Size) if _, err := io.ReadFull(msg.Payload, b); err != nil { log.Error(fmt.Sprintf("%s: reading msg into bytes: %v", peerID, err)) @@ -362,12 +351,14 @@ func runPeer( case eth.ReceiptsMsg: //log.Info(fmt.Sprintf("[%s] ReceiptsMsg", peerID)) case eth.NewBlockHashesMsg: + increasePermit = true b := make([]byte, msg.Size) if _, err := io.ReadFull(msg.Payload, b); err != nil { log.Error(fmt.Sprintf("%s: reading msg into bytes: %v", peerID, err)) } trySend(receiveCh, &StreamMsg{b, peerID, "NewBlockHashesMsg", proto_sentry.MessageId_NewBlockHashes}) case eth.NewBlockMsg: + increasePermit = true b := make([]byte, msg.Size) if _, err := io.ReadFull(msg.Payload, b); err != nil { log.Error(fmt.Sprintf("%s: reading msg into bytes: %v", peerID, err)) @@ -428,6 +419,15 @@ func runPeer( log.Error(fmt.Sprintf("[%s] Unknown message code: %d", peerID, msg.Code)) } msg.Discard() + if increasePermit { + // Peer responded or sent message - reset the "back off" timer + permitsRaw, _ := peerPermitMap.Load(peerID) + permits, _ := permitsRaw.(int) + if permits < maxPermitsPerPeer { + permits++ + peerPermitMap.Store(peerID, permits) + } + } } } diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 840ff8e1d..52613c93c 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -174,8 +174,14 @@ func executeBlockWithGo(block *types.Block, tx ethdb.RwTx, batch ethdb.Database, if _, ok := callTracer.tos[addr]; ok { v[common.AddressLength] |= 2 } - if err = traceCursor.AppendDup(blockNumEnc[:], v[:]); err != nil { - return err + if j == 0 { + if err = traceCursor.Append(blockNumEnc[:], v[:]); err != nil { + return err + } + } else { + if err = traceCursor.AppendDup(blockNumEnc[:], v[:]); err != nil { + return err + } } copy(prev[:], addr[:]) }