mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-12 20:50:05 +00:00
Init sync: conditional syncing to finalized slot (#7999)
* extract sync methods * add unit test * gazelle * Update beacon-chain/sync/initial-sync/round_robin.go Co-authored-by: Shay Zluf <thezluf@gmail.com> * better set back step Co-authored-by: Shay Zluf <thezluf@gmail.com>
This commit is contained in:
parent
821620c520
commit
3092f75ec2
@ -138,6 +138,7 @@ go_test(
|
||||
"@com_github_libp2p_go_libp2p_core//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//network:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
|
||||
"@com_github_paulbellamy_ratecounter//:go_default_library",
|
||||
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
||||
|
@ -43,10 +43,41 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
|
||||
defer state.SkipSlotCache.Enable()
|
||||
|
||||
s.counter = ratecounter.NewRateCounter(counterSeconds * time.Second)
|
||||
|
||||
// Step 1 - Sync to end of finalized epoch.
|
||||
if err := s.syncToFinalizedEpoch(ctx, genesis); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Already at head, no need for 2nd phase.
|
||||
if s.chain.HeadSlot() == helpers.SlotsSince(genesis) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Step 2 - sync to head from majority of peers (from no less than MinimumSyncPeers*2 peers)
|
||||
// having the same world view on non-finalized epoch.
|
||||
if err := s.syncToNonFinalizedEpoch(ctx, genesis); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncToFinalizedEpoch sync from head to best known finalized epoch.
|
||||
func (s *Service) syncToFinalizedEpoch(ctx context.Context, genesis time.Time) error {
|
||||
highestFinalizedSlot, err := helpers.StartSlot(s.highestFinalizedEpoch() + 1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Step back one slot, to the last slot of the finalized epoch (we're at first slot of the next epoch).
|
||||
if highestFinalizedSlot > 0 {
|
||||
highestFinalizedSlot--
|
||||
}
|
||||
if s.chain.HeadSlot() >= highestFinalizedSlot {
|
||||
// No need to sync, already synced to the finalized slot.
|
||||
log.Debug("Already synced to finalized epoch")
|
||||
return nil
|
||||
}
|
||||
queue := newBlocksQueue(ctx, &blocksQueueConfig{
|
||||
p2p: s.p2p,
|
||||
db: s.db,
|
||||
@ -58,7 +89,6 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Step 1 - Sync to end of finalized epoch.
|
||||
for data := range queue.fetchedData {
|
||||
s.processFetchedData(ctx, genesis, s.chain.HeadSlot(), data)
|
||||
}
|
||||
@ -71,14 +101,13 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
|
||||
log.WithError(err).Debug("Error stopping queue")
|
||||
}
|
||||
|
||||
// Already at head, no need for 2nd phase.
|
||||
if s.chain.HeadSlot() == helpers.SlotsSince(genesis) {
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Step 2 - sync to head from majority of peers (from no less than MinimumSyncPeers*2 peers) having the same
|
||||
// world view on non-finalized epoch.
|
||||
queue = newBlocksQueue(ctx, &blocksQueueConfig{
|
||||
// syncToNonFinalizedEpoch sync from head to best known non-finalized epoch supported by majority
|
||||
// of peers (no less than MinimumSyncPeers*2 peers).
|
||||
func (s *Service) syncToNonFinalizedEpoch(ctx context.Context, genesis time.Time) error {
|
||||
queue := newBlocksQueue(ctx, &blocksQueueConfig{
|
||||
p2p: s.p2p,
|
||||
db: s.db,
|
||||
chain: s.chain,
|
||||
|
@ -3,7 +3,9 @@ package initialsync
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/paulbellamy/ratecounter"
|
||||
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
|
||||
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
|
||||
@ -13,6 +15,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil/require"
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
)
|
||||
|
||||
func TestService_roundRobinSync(t *testing.T) {
|
||||
@ -561,3 +564,67 @@ func TestService_blockProviderScoring(t *testing.T) {
|
||||
assert.Equal(t, true, score2 < score3, "Incorrect score (%v) for peer: %v (must be lower than %v)", score2, peer2, score3)
|
||||
assert.Equal(t, true, scorer.ProcessedBlocks(peer3) > 100, "Not enough blocks returned by healthy peer: %d", scorer.ProcessedBlocks(peer3))
|
||||
}
|
||||
|
||||
func TestService_syncToFinalizedEpoch(t *testing.T) {
|
||||
cache.initializeRootCache(makeSequence(1, 640), t)
|
||||
|
||||
p := p2pt.NewTestP2P(t)
|
||||
beaconDB, _ := dbtest.SetupDB(t)
|
||||
cache.RLock()
|
||||
genesisRoot := cache.rootCache[0]
|
||||
cache.RUnlock()
|
||||
|
||||
err := beaconDB.SaveBlock(context.Background(), testutil.NewBeaconBlock())
|
||||
require.NoError(t, err)
|
||||
|
||||
st := testutil.NewBeaconState()
|
||||
require.NoError(t, err)
|
||||
mc := &mock.ChainService{
|
||||
State: st,
|
||||
Root: genesisRoot[:],
|
||||
DB: beaconDB,
|
||||
FinalizedCheckPoint: ð.Checkpoint{
|
||||
Epoch: 0,
|
||||
Root: make([]byte, 32),
|
||||
},
|
||||
}
|
||||
s := &Service{
|
||||
ctx: context.Background(),
|
||||
chain: mc,
|
||||
p2p: p,
|
||||
db: beaconDB,
|
||||
synced: abool.New(),
|
||||
chainStarted: abool.NewBool(true),
|
||||
counter: ratecounter.NewRateCounter(counterSeconds * time.Second),
|
||||
}
|
||||
expectedBlockSlots := makeSequence(1, 191)
|
||||
currentSlot := uint64(191)
|
||||
|
||||
// Sync to finalized epoch.
|
||||
hook := logTest.NewGlobal()
|
||||
connectPeer(t, p, &peerData{
|
||||
blocks: makeSequence(1, 240),
|
||||
finalizedEpoch: 5,
|
||||
headSlot: 195,
|
||||
}, p.Peers())
|
||||
genesis := makeGenesisTime(currentSlot)
|
||||
assert.NoError(t, s.syncToFinalizedEpoch(context.Background(), genesis))
|
||||
if s.chain.HeadSlot() < currentSlot {
|
||||
t.Errorf("Head slot (%d) is less than expected currentSlot (%d)", s.chain.HeadSlot(), currentSlot)
|
||||
}
|
||||
assert.Equal(t, true, len(expectedBlockSlots) <= len(mc.BlocksReceived), "Processes wrong number of blocks")
|
||||
var receivedBlockSlots []uint64
|
||||
for _, blk := range mc.BlocksReceived {
|
||||
receivedBlockSlots = append(receivedBlockSlots, blk.Block.Slot)
|
||||
}
|
||||
missing := sliceutil.NotUint64(sliceutil.IntersectionUint64(expectedBlockSlots, receivedBlockSlots), expectedBlockSlots)
|
||||
if len(missing) > 0 {
|
||||
t.Errorf("Missing blocks at slots %v", missing)
|
||||
}
|
||||
assert.LogsDoNotContain(t, hook, "Already synced to finalized epoch")
|
||||
|
||||
// Try to re-sync, should be exited immediately (node is already synced to finalized epoch).
|
||||
hook.Reset()
|
||||
assert.NoError(t, s.syncToFinalizedEpoch(context.Background(), genesis))
|
||||
assert.LogsContain(t, hook, "Already synced to finalized epoch")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user