Trace filter 5 (and sentry improvement) (#1914)

* Append/AppendDup

* Print appends

* Remove printing

* Increase permit on announcements

Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
This commit is contained in:
ledgerwatch 2021-05-11 13:58:43 +01:00 committed by GitHub
parent 1351383f68
commit 0b7d185d1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 16 deletions

View File

@ -312,6 +312,7 @@ func runPeer(
msg.Discard()
return fmt.Errorf("message is too large %d, limit %d", msg.Size, eth.ProtocolMaxMsgSize)
}
increasePermit := false
switch msg.Code {
case eth.StatusMsg:
msg.Discard()
@ -324,13 +325,7 @@ func runPeer(
}
trySend(receiveUploadCh, &StreamMsg{b, peerID, "GetBlockHeadersMsg", proto_sentry.MessageId_GetBlockHeaders})
case eth.BlockHeadersMsg:
// Peer responded or sent message - reset the "back off" timer
permitsRaw, _ := peerPermitMap.Load(peerID)
permits, _ := permitsRaw.(int)
if permits < maxPermitsPerPeer {
permits++
peerPermitMap.Store(peerID, permits)
}
increasePermit = true
b := make([]byte, msg.Size)
if _, err := io.ReadFull(msg.Payload, b); err != nil {
log.Error(fmt.Sprintf("%s: reading msg into bytes: %v", peerID, err))
@ -343,13 +338,7 @@ func runPeer(
}
trySend(receiveUploadCh, &StreamMsg{b, peerID, "GetBlockBodiesMsg", proto_sentry.MessageId_GetBlockBodies})
case eth.BlockBodiesMsg:
// Peer responded or sent message - reset the "back off" timer
permitsRaw, _ := peerPermitMap.Load(peerID)
permits, _ := permitsRaw.(int)
if permits < maxPermitsPerPeer {
permits++
peerPermitMap.Store(peerID, permits)
}
increasePermit = true
b := make([]byte, msg.Size)
if _, err := io.ReadFull(msg.Payload, b); err != nil {
log.Error(fmt.Sprintf("%s: reading msg into bytes: %v", peerID, err))
@ -362,12 +351,14 @@ func runPeer(
case eth.ReceiptsMsg:
//log.Info(fmt.Sprintf("[%s] ReceiptsMsg", peerID))
case eth.NewBlockHashesMsg:
increasePermit = true
b := make([]byte, msg.Size)
if _, err := io.ReadFull(msg.Payload, b); err != nil {
log.Error(fmt.Sprintf("%s: reading msg into bytes: %v", peerID, err))
}
trySend(receiveCh, &StreamMsg{b, peerID, "NewBlockHashesMsg", proto_sentry.MessageId_NewBlockHashes})
case eth.NewBlockMsg:
increasePermit = true
b := make([]byte, msg.Size)
if _, err := io.ReadFull(msg.Payload, b); err != nil {
log.Error(fmt.Sprintf("%s: reading msg into bytes: %v", peerID, err))
@ -428,6 +419,15 @@ func runPeer(
log.Error(fmt.Sprintf("[%s] Unknown message code: %d", peerID, msg.Code))
}
msg.Discard()
if increasePermit {
// Peer responded or sent message - reset the "back off" timer
permitsRaw, _ := peerPermitMap.Load(peerID)
permits, _ := permitsRaw.(int)
if permits < maxPermitsPerPeer {
permits++
peerPermitMap.Store(peerID, permits)
}
}
}
}

View File

@ -174,8 +174,14 @@ func executeBlockWithGo(block *types.Block, tx ethdb.RwTx, batch ethdb.Database,
if _, ok := callTracer.tos[addr]; ok {
v[common.AddressLength] |= 2
}
if err = traceCursor.AppendDup(blockNumEnc[:], v[:]); err != nil {
return err
if j == 0 {
if err = traceCursor.Append(blockNumEnc[:], v[:]); err != nil {
return err
}
} else {
if err = traceCursor.AppendDup(blockNumEnc[:], v[:]); err != nil {
return err
}
}
copy(prev[:], addr[:])
}