more robust processing of invalid head slot (#7990)

This commit is contained in:
Victor Farazdagi 2020-11-30 00:03:25 +03:00 committed by GitHub
parent 9d174d5927
commit 54a42ce4a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 125 additions and 0 deletions

View File

@ -36,6 +36,10 @@ const (
noRequiredPeersErrRefreshInterval = 15 * time.Second
// maxResetAttempts number of times stale FSM is reset, before backtracking is triggered.
maxResetAttempts = 4
// startBackSlots defines number of slots before the current head, which defines a start position
// of the initial machine. This allows more robustness in case of normal sync sets head to some
// orphaned block: in that case starting earlier and re-fetching blocks allows to reorganize chain.
startBackSlots = 32
)
var (
@ -173,6 +177,9 @@ func (q *blocksQueue) loop() {
// Define initial state machines.
startSlot := q.chain.HeadSlot()
if startSlot > startBackSlots {
startSlot -= startBackSlots
}
blocksPerRequest := q.blocksFetcher.blocksPerSecond
for i := startSlot; i < startSlot+blocksPerRequest*lookaheadSteps; i += blocksPerRequest {
q.smm.addStateMachine(i)

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"testing"
"time"
"github.com/kevinms/leakybucket-go"
"github.com/libp2p/go-libp2p-core/peer"
@ -1255,3 +1256,120 @@ func TestBlocksQueue_stuckInUnfavourableFork(t *testing.T) {
}
})
}
func TestBlocksQueue_stuckWhenHeadIsSetToOrphanedBlock(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
beaconDB, _ := dbtest.SetupDB(t)
p2p := p2pt.NewTestP2P(t)
chain := extendBlockSequence(t, []*eth.SignedBeaconBlock{}, 128)
finalizedSlot := uint64(82)
finalizedEpoch := helpers.SlotToEpoch(finalizedSlot)
genesisBlock := chain[0]
require.NoError(t, beaconDB.SaveBlock(context.Background(), genesisBlock))
genesisRoot, err := genesisBlock.Block.HashTreeRoot()
require.NoError(t, err)
st := testutil.NewBeaconState()
mc := &mock.ChainService{
State: st,
Root: genesisRoot[:],
DB: beaconDB,
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: finalizedEpoch,
Root: []byte(fmt.Sprintf("finalized_root %d", finalizedEpoch)),
},
}
// Populate database with blocks with part of the chain, orphaned block will be added on top.
for _, blk := range chain[1:84] {
parentRoot := bytesutil.ToBytes32(blk.Block.ParentRoot)
// Save block only if parent root is already in database or cache.
if beaconDB.HasBlock(ctx, parentRoot) || mc.HasInitSyncBlock(parentRoot) {
require.NoError(t, beaconDB.SaveBlock(ctx, blk))
require.NoError(t, st.SetSlot(blk.Block.Slot))
}
}
require.Equal(t, uint64(83), mc.HeadSlot())
require.Equal(t, chain[83].Block.Slot, mc.HeadSlot())
// Set head to slot 85, while we do not have block with slot 84 in DB, so block is orphaned.
// Moreover, block with slot 85 is a forked block and should be replaced, with block from peer.
orphanedBlock := testutil.NewBeaconBlock()
orphanedBlock.Block.Slot = 85
orphanedBlock.Block.StateRoot = testutil.Random32Bytes(t)
require.NoError(t, beaconDB.SaveBlock(ctx, orphanedBlock))
require.NoError(t, st.SetSlot(orphanedBlock.Block.Slot))
require.Equal(t, uint64(85), mc.HeadSlot())
fetcher := newBlocksFetcher(
ctx,
&blocksFetcherConfig{
chain: mc,
p2p: p2p,
db: beaconDB,
},
)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false)
// Connect peer that has all the blocks available.
allBlocksPeer := connectPeerHavingBlocks(t, p2p, chain, finalizedSlot, p2p.Peers())
defer func() {
p2p.Peers().SetConnectionState(allBlocksPeer, peers.PeerDisconnected)
}()
// Queue should be able to fetch whole chain (including slot which comes before the currently set head).
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
chain: mc,
highestExpectedSlot: uint64(len(chain) - 1),
mode: modeNonConstrained,
})
require.NoError(t, queue.start())
isProcessedBlock := func(ctx context.Context, blk *eth.SignedBeaconBlock, blkRoot [32]byte) bool {
finalizedSlot, err := helpers.StartSlot(mc.FinalizedCheckpt().Epoch)
if err != nil {
return false
}
if blk.Block.Slot <= finalizedSlot || (beaconDB.HasBlock(ctx, blkRoot) || mc.HasInitSyncBlock(blkRoot)) {
return true
}
return false
}
select {
case <-time.After(3 * time.Second):
t.Fatal("test takes to long to complete")
case data := <-queue.fetchedData:
for _, blk := range data.blocks {
blkRoot, err := blk.Block.HashTreeRoot()
require.NoError(t, err)
if isProcessedBlock(ctx, blk, blkRoot) {
log.Errorf("slot: %d , root %#x: %v", blk.Block.Slot, blkRoot, errBlockAlreadyProcessed)
continue
}
parentRoot := bytesutil.ToBytes32(blk.Block.ParentRoot)
if !beaconDB.HasBlock(ctx, parentRoot) && !mc.HasInitSyncBlock(parentRoot) {
log.Errorf("%v: %#x", errParentDoesNotExist, blk.Block.ParentRoot)
continue
}
// Block is not already processed, and parent exists in database - process.
require.NoError(t, beaconDB.SaveBlock(ctx, blk))
require.NoError(t, st.SetSlot(blk.Block.Slot))
}
}
require.NoError(t, queue.stop())
// Check that all blocks available in chain are produced by queue.
for _, blk := range chain[:orphanedBlock.Block.Slot+32] {
blkRoot, err := blk.Block.HashTreeRoot()
require.NoError(t, err)
require.Equal(t, true, beaconDB.HasBlock(ctx, blkRoot) || mc.HasInitSyncBlock(blkRoot), "slot %d", blk.Block.Slot)
}
}