From 3ec2a0f9e0924b9ce06159916e89742a85521033 Mon Sep 17 00:00:00 2001 From: Victor Farazdagi Date: Tue, 17 Mar 2020 20:27:18 +0300 Subject: [PATCH] Refactoring of initial sync (#5096) * implements blocks queue * refactors updateCounter method * fixes deadlock on stop w/o start * refactors updateSchedulerState * more tests on schduler * parseFetchResponse tests * wraps up tests for blocks queue * eod commit * fixes data race in round robin * revamps fetcher * fixes race conditions + livelocks + deadlocks * less verbose output * fixes data race, by isolating critical sections * minor refactoring: resolves blocking calls * implements init-sync queue * udpate fetch/send buffers in blocks fetcher * blockState enum-like type alias * refactors common code into releaseTicket() * better gc * linter * minor fix to round robin * moves original round robin into its own package * adds enableInitSyncQueue flag * fixes issue with init-sync service selection * Update beacon-chain/sync/initial-sync/round_robin.go Co-Authored-By: terence tsao * initsyncv1 -> initsyncold * adds span Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> Co-authored-by: Raul Jordan Co-authored-by: terence tsao --- beacon-chain/node/BUILD.bazel | 1 + beacon-chain/node/node.go | 48 +- .../sync/initial-sync-old/BUILD.bazel | 63 +++ beacon-chain/sync/initial-sync-old/log.go | 7 + .../sync/initial-sync-old/round_robin.go | 360 ++++++++++++++ .../sync/initial-sync-old/round_robin_test.go | 449 ++++++++++++++++++ beacon-chain/sync/initial-sync-old/service.go | 191 ++++++++ .../sync/initial-sync/blocks_fetcher.go | 2 +- .../sync/initial-sync/blocks_queue.go | 4 + beacon-chain/sync/initial-sync/round_robin.go | 223 ++------- shared/featureconfig/config.go | 5 + shared/featureconfig/flags.go | 6 + 12 files changed, 1167 insertions(+), 192 deletions(-) create mode 100644 beacon-chain/sync/initial-sync-old/BUILD.bazel create mode 100644 beacon-chain/sync/initial-sync-old/log.go create mode 100644 beacon-chain/sync/initial-sync-old/round_robin.go create mode 100644 beacon-chain/sync/initial-sync-old/round_robin_test.go create mode 100644 beacon-chain/sync/initial-sync-old/service.go diff --git a/beacon-chain/node/BUILD.bazel b/beacon-chain/node/BUILD.bazel index a28b07acf..00abc01f9 100644 --- a/beacon-chain/node/BUILD.bazel +++ b/beacon-chain/node/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//beacon-chain/state/stategen:go_default_library", "//beacon-chain/sync:go_default_library", "//beacon-chain/sync/initial-sync:go_default_library", + "//beacon-chain/sync/initial-sync-old:go_default_library", "//shared:go_default_library", "//shared/cmd:go_default_library", "//shared/debug:go_default_library", diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 7d4a14ec7..a2d55448b 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -34,6 +34,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/state/stategen" prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync" initialsync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync" + initialsyncold "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync-old" "github.com/prysmaticlabs/prysm/shared" "github.com/prysmaticlabs/prysm/shared/cmd" "github.com/prysmaticlabs/prysm/shared/debug" @@ -404,9 +405,19 @@ func (b *BeaconNode) registerSyncService(ctx *cli.Context) error { return err } - var initSync *initialsync.Service - if err := b.services.FetchService(&initSync); err != nil { - return err + var initSync prysmsync.Checker + if cfg := featureconfig.Get(); cfg.EnableInitSyncQueue { + var initSyncTmp *initialsync.Service + if err := b.services.FetchService(&initSyncTmp); err != nil { + return err + } + initSync = initSyncTmp + } else { + var initSyncTmp *initialsyncold.Service + if err := b.services.FetchService(&initSyncTmp); err != nil { + return err + } + initSync = initSyncTmp } rs := prysmsync.NewRegularSync(&prysmsync.Config{ @@ -431,16 +442,25 @@ func (b *BeaconNode) registerInitialSyncService(ctx *cli.Context) error { return err } - is := initialsync.NewInitialSync(&initialsync.Config{ + if cfg := featureconfig.Get(); cfg.EnableInitSyncQueue { + is := initialsync.NewInitialSync(&initialsync.Config{ + DB: b.db, + Chain: chainService, + P2P: b.fetchP2P(ctx), + StateNotifier: b, + BlockNotifier: b, + }) + return b.services.RegisterService(is) + } + + is := initialsyncold.NewInitialSync(&initialsyncold.Config{ DB: b.db, Chain: chainService, P2P: b.fetchP2P(ctx), StateNotifier: b, BlockNotifier: b, }) - return b.services.RegisterService(is) - } func (b *BeaconNode) registerRPCService(ctx *cli.Context) error { @@ -454,9 +474,19 @@ func (b *BeaconNode) registerRPCService(ctx *cli.Context) error { return err } - var syncService *initialsync.Service - if err := b.services.FetchService(&syncService); err != nil { - return err + var syncService prysmsync.Checker + if cfg := featureconfig.Get(); cfg.EnableInitSyncQueue { + var initSyncTmp *initialsync.Service + if err := b.services.FetchService(&initSyncTmp); err != nil { + return err + } + syncService = initSyncTmp + } else { + var initSyncTmp *initialsyncold.Service + if err := b.services.FetchService(&initSyncTmp); err != nil { + return err + } + syncService = initSyncTmp } genesisValidators := ctx.GlobalUint64(flags.InteropNumValidatorsFlag.Name) diff --git a/beacon-chain/sync/initial-sync-old/BUILD.bazel b/beacon-chain/sync/initial-sync-old/BUILD.bazel new file mode 100644 index 000000000..75e7bbca8 --- /dev/null +++ b/beacon-chain/sync/initial-sync-old/BUILD.bazel @@ -0,0 +1,63 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "log.go", + "round_robin.go", + "service.go", + ], + importpath = "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync-old", + visibility = ["//beacon-chain:__subpackages__"], + deps = [ + "//beacon-chain/blockchain:go_default_library", + "//beacon-chain/core/feed:go_default_library", + "//beacon-chain/core/feed/block:go_default_library", + "//beacon-chain/core/feed/state:go_default_library", + "//beacon-chain/core/helpers:go_default_library", + "//beacon-chain/db:go_default_library", + "//beacon-chain/flags:go_default_library", + "//beacon-chain/p2p:go_default_library", + "//beacon-chain/sync:go_default_library", + "//proto/beacon/p2p/v1:go_default_library", + "//shared:go_default_library", + "//shared/bytesutil:go_default_library", + "//shared/featureconfig:go_default_library", + "//shared/mathutil:go_default_library", + "//shared/params:go_default_library", + "//shared/roughtime:go_default_library", + "@com_github_kevinms_leakybucket_go//:go_default_library", + "@com_github_libp2p_go_libp2p_core//peer:go_default_library", + "@com_github_paulbellamy_ratecounter//:go_default_library", + "@com_github_pkg_errors//:go_default_library", + "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["round_robin_test.go"], + embed = [":go_default_library"], + race = "on", + tags = ["race_on"], + deps = [ + "//beacon-chain/blockchain/testing:go_default_library", + "//beacon-chain/core/helpers:go_default_library", + "//beacon-chain/db/testing:go_default_library", + "//beacon-chain/p2p/peers:go_default_library", + "//beacon-chain/p2p/testing:go_default_library", + "//beacon-chain/state:go_default_library", + "//beacon-chain/sync:go_default_library", + "//proto/beacon/p2p/v1:go_default_library", + "//shared/hashutil:go_default_library", + "//shared/params:go_default_library", + "//shared/roughtime:go_default_library", + "//shared/sliceutil:go_default_library", + "@com_github_kevinms_leakybucket_go//:go_default_library", + "@com_github_libp2p_go_libp2p_core//network:go_default_library", + "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", + "@com_github_prysmaticlabs_go_ssz//:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + ], +) diff --git a/beacon-chain/sync/initial-sync-old/log.go b/beacon-chain/sync/initial-sync-old/log.go new file mode 100644 index 000000000..d87405d3c --- /dev/null +++ b/beacon-chain/sync/initial-sync-old/log.go @@ -0,0 +1,7 @@ +package initialsyncold + +import ( + "github.com/sirupsen/logrus" +) + +var log = logrus.WithField("prefix", "initial-sync") diff --git a/beacon-chain/sync/initial-sync-old/round_robin.go b/beacon-chain/sync/initial-sync-old/round_robin.go new file mode 100644 index 000000000..89508724e --- /dev/null +++ b/beacon-chain/sync/initial-sync-old/round_robin.go @@ -0,0 +1,360 @@ +package initialsyncold + +import ( + "context" + "fmt" + "io" + "math/rand" + "sort" + "sync/atomic" + "time" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/paulbellamy/ratecounter" + "github.com/pkg/errors" + eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/beacon-chain/flags" + prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync" + p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/featureconfig" + "github.com/prysmaticlabs/prysm/shared/mathutil" + "github.com/prysmaticlabs/prysm/shared/params" + "github.com/sirupsen/logrus" +) + +const blockBatchSize = 64 +const counterSeconds = 20 +const refreshTime = 6 * time.Second + +// Round Robin sync looks at the latest peer statuses and syncs with the highest +// finalized peer. +// +// Step 1 - Sync to finalized epoch. +// Sync with peers of lowest finalized root with epoch greater than head state. +// +// Step 2 - Sync to head from finalized epoch. +// Using the finalized root as the head_block_root and the epoch start slot +// after the finalized epoch, request blocks to head from some subset of peers +// where step = 1. +func (s *Service) roundRobinSync(genesis time.Time) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + defer s.chain.ClearCachedStates() + + if cfg := featureconfig.Get(); cfg.EnableSkipSlotsCache { + cfg.EnableSkipSlotsCache = false + featureconfig.Init(cfg) + defer func() { + cfg := featureconfig.Get() + cfg.EnableSkipSlotsCache = true + featureconfig.Init(cfg) + }() + } + + counter := ratecounter.NewRateCounter(counterSeconds * time.Second) + randGenerator := rand.New(rand.NewSource(time.Now().Unix())) + var lastEmptyRequests int + highestFinalizedSlot := helpers.StartSlot(s.highestFinalizedEpoch() + 1) + // Step 1 - Sync to end of finalized epoch. + for s.chain.HeadSlot() < highestFinalizedSlot { + root, finalizedEpoch, peers := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, helpers.SlotToEpoch(s.chain.HeadSlot())) + if len(peers) == 0 { + log.Warn("No peers; waiting for reconnect") + time.Sleep(refreshTime) + continue + } + + if len(peers) >= flags.Get().MinimumSyncPeers { + highestFinalizedSlot = helpers.StartSlot(finalizedEpoch + 1) + } + + // shuffle peers to prevent a bad peer from + // stalling sync with invalid blocks + randGenerator.Shuffle(len(peers), func(i, j int) { + peers[i], peers[j] = peers[j], peers[i] + }) + + // request a range of blocks to be requested from multiple peers. + // Example: + // - number of peers = 4 + // - range of block slots is 64...128 + // Four requests will be spread across the peers using step argument to distribute the load + // i.e. the first peer is asked for block 64, 68, 72... while the second peer is asked for + // 65, 69, 73... and so on for other peers. + var request func(start uint64, step uint64, count uint64, peers []peer.ID, remainder int) ([]*eth.SignedBeaconBlock, error) + request = func(start uint64, step uint64, count uint64, peers []peer.ID, remainder int) ([]*eth.SignedBeaconBlock, error) { + if len(peers) == 0 { + return nil, errors.WithStack(errors.New("no peers left to request blocks")) + } + var p2pRequestCount int32 + errChan := make(chan error) + blocksChan := make(chan []*eth.SignedBeaconBlock) + + // Handle block large block ranges of skipped slots. + start += count * uint64(lastEmptyRequests*len(peers)) + if count <= 1 { + step = 1 + } + + // Short circuit start far exceeding the highest finalized epoch in some infinite loop. + if start > highestFinalizedSlot { + return nil, errors.Errorf("attempted to ask for a start slot of %d which is greater than the next highest slot of %d", start, highestFinalizedSlot) + } + + atomic.AddInt32(&p2pRequestCount, int32(len(peers))) + for i, pid := range peers { + if ctx.Err() != nil { + return nil, ctx.Err() + } + start := start + uint64(i)*step + step := step * uint64(len(peers)) + count := mathutil.Min(count, (helpers.StartSlot(finalizedEpoch+1)-start)/step) + // If the count was divided by an odd number of peers, there will be some blocks + // missing from the first requests so we accommodate that scenario. + if i < remainder { + count++ + } + // asking for no blocks may cause the client to hang. This should never happen and + // the peer may return an error anyway, but we'll ask for at least one block. + if count == 0 { + count = 1 + } + req := &p2ppb.BeaconBlocksByRangeRequest{ + HeadBlockRoot: root, + StartSlot: start, + Count: count, + Step: step, + } + + go func(i int, pid peer.ID) { + defer func() { + zeroIfIAmTheLast := atomic.AddInt32(&p2pRequestCount, -1) + if zeroIfIAmTheLast == 0 { + close(blocksChan) + } + }() + + resp, err := s.requestBlocks(ctx, req, pid) + if err != nil { + // fail over to other peers by splitting this requests evenly across them. + ps := append(peers[:i], peers[i+1:]...) + log.WithError(err).WithField( + "remaining peers", + len(ps), + ).WithField( + "peer", + pid.Pretty(), + ).Debug("Request failed, trying to round robin with other peers") + if len(ps) == 0 { + errChan <- errors.WithStack(errors.New("no peers left to request blocks")) + return + } + resp, err = request(start, step, count/uint64(len(ps)) /*count*/, ps, int(count)%len(ps) /*remainder*/) + if err != nil { + errChan <- err + return + } + } + log.WithField("peer", pid).WithField("count", len(resp)).Debug("Received blocks") + blocksChan <- resp + }(i, pid) + } + + var unionRespBlocks []*eth.SignedBeaconBlock + for { + select { + case err := <-errChan: + return nil, err + case resp, ok := <-blocksChan: + if ok { + // if this synchronization becomes a bottleneck: + // think about immediately allocating space for all peers in unionRespBlocks, + // and write without synchronization + unionRespBlocks = append(unionRespBlocks, resp...) + } else { + return unionRespBlocks, nil + } + } + } + } + startBlock := s.chain.HeadSlot() + 1 + skippedBlocks := blockBatchSize * uint64(lastEmptyRequests*len(peers)) + if startBlock+skippedBlocks > helpers.StartSlot(finalizedEpoch+1) { + log.WithField("finalizedEpoch", finalizedEpoch).Debug("Requested block range is greater than the finalized epoch") + break + } + + blocks, err := request( + s.chain.HeadSlot()+1, // start + 1, // step + blockBatchSize, // count + peers, // peers + 0, // remainder + ) + if err != nil { + log.WithError(err).Error("Round robing sync request failed") + continue + } + + // Since the block responses were appended to the list, we must sort them in order to + // process sequentially. This method doesn't make much wall time compared to block + // processing. + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].Block.Slot < blocks[j].Block.Slot + }) + + for _, blk := range blocks { + s.logSyncStatus(genesis, blk.Block, peers, counter) + if !s.db.HasBlock(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) { + log.Debugf("Beacon node doesn't have a block in db with root %#x", blk.Block.ParentRoot) + continue + } + s.blockNotifier.BlockFeed().Send(&feed.Event{ + Type: blockfeed.ReceivedBlock, + Data: &blockfeed.ReceivedBlockData{SignedBlock: blk}, + }) + if featureconfig.Get().InitSyncNoVerify { + if err := s.chain.ReceiveBlockNoVerify(ctx, blk); err != nil { + return err + } + } else { + if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, blk); err != nil { + return err + } + } + } + // If there were no blocks in the last request range, increment the counter so the same + // range isn't requested again on the next loop as the headSlot didn't change. + if len(blocks) == 0 { + lastEmptyRequests++ + } else { + lastEmptyRequests = 0 + } + } + + log.Debug("Synced to finalized epoch - now syncing blocks up to current head") + + if s.chain.HeadSlot() == helpers.SlotsSince(genesis) { + return nil + } + + // Step 2 - sync to head from any single peer. + // This step might need to be improved for cases where there has been a long period since + // finality. This step is less important than syncing to finality in terms of threat + // mitigation. We are already convinced that we are on the correct finalized chain. Any blocks + // we receive there after must build on the finalized chain or be considered invalid during + // fork choice resolution / block processing. + root, _, pids := s.p2p.Peers().BestFinalized(1 /* maxPeers */, s.highestFinalizedEpoch()) + for len(pids) == 0 { + log.Info("Waiting for a suitable peer before syncing to the head of the chain") + time.Sleep(refreshTime) + root, _, pids = s.p2p.Peers().BestFinalized(1 /* maxPeers */, s.highestFinalizedEpoch()) + } + best := pids[0] + + for head := helpers.SlotsSince(genesis); s.chain.HeadSlot() < head; { + req := &p2ppb.BeaconBlocksByRangeRequest{ + HeadBlockRoot: root, + StartSlot: s.chain.HeadSlot() + 1, + Count: mathutil.Min(helpers.SlotsSince(genesis)-s.chain.HeadSlot()+1, 256), + Step: 1, + } + + log.WithField("req", req).WithField("peer", best.Pretty()).Debug( + "Sending batch block request", + ) + + resp, err := s.requestBlocks(ctx, req, best) + if err != nil { + return err + } + + for _, blk := range resp { + s.logSyncStatus(genesis, blk.Block, []peer.ID{best}, counter) + if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, blk); err != nil { + log.WithError(err).Error("Failed to process block, exiting init sync") + return nil + } + } + if len(resp) == 0 { + break + } + } + + return nil +} + +// requestBlocks by range to a specific peer. +func (s *Service) requestBlocks(ctx context.Context, req *p2ppb.BeaconBlocksByRangeRequest, pid peer.ID) ([]*eth.SignedBeaconBlock, error) { + if s.blocksRateLimiter.Remaining(pid.String()) < int64(req.Count) { + log.WithField("peer", pid).Debug("Slowing down for rate limit") + time.Sleep(s.blocksRateLimiter.TillEmpty(pid.String())) + } + s.blocksRateLimiter.Add(pid.String(), int64(req.Count)) + log.WithFields(logrus.Fields{ + "peer": pid, + "start": req.StartSlot, + "count": req.Count, + "step": req.Step, + "head": fmt.Sprintf("%#x", req.HeadBlockRoot), + }).Debug("Requesting blocks") + stream, err := s.p2p.Send(ctx, req, pid) + if err != nil { + return nil, errors.Wrap(err, "failed to send request to peer") + } + defer stream.Close() + + resp := make([]*eth.SignedBeaconBlock, 0, req.Count) + for { + blk, err := prysmsync.ReadChunkedBlock(stream, s.p2p) + if err == io.EOF { + break + } + if err != nil { + return nil, errors.Wrap(err, "failed to read chunked block") + } + resp = append(resp, blk) + } + + return resp, nil +} + +// highestFinalizedEpoch returns the absolute highest finalized epoch of all connected peers. +// Note this can be lower than our finalized epoch if we have no peers or peers that are all behind us. +func (s *Service) highestFinalizedEpoch() uint64 { + highest := uint64(0) + for _, pid := range s.p2p.Peers().Connected() { + peerChainState, err := s.p2p.Peers().ChainState(pid) + if err == nil && peerChainState != nil && peerChainState.FinalizedEpoch > highest { + highest = peerChainState.FinalizedEpoch + } + } + + return highest +} + +// logSyncStatus and increment block processing counter. +func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, syncingPeers []peer.ID, counter *ratecounter.RateCounter) { + counter.Incr(1) + rate := float64(counter.Rate()) / counterSeconds + if rate == 0 { + rate = 1 + } + timeRemaining := time.Duration(float64(helpers.SlotsSince(genesis)-blk.Slot)/rate) * time.Second + log.WithField( + "peers", + fmt.Sprintf("%d/%d", len(syncingPeers), len(s.p2p.Peers().Connected())), + ).WithField( + "blocksPerSecond", + fmt.Sprintf("%.1f", rate), + ).Infof( + "Processing block %d/%d - estimated time remaining %s", + blk.Slot, + helpers.SlotsSince(genesis), + timeRemaining, + ) +} diff --git a/beacon-chain/sync/initial-sync-old/round_robin_test.go b/beacon-chain/sync/initial-sync-old/round_robin_test.go new file mode 100644 index 000000000..05e21e2f4 --- /dev/null +++ b/beacon-chain/sync/initial-sync-old/round_robin_test.go @@ -0,0 +1,449 @@ +package initialsyncold + +import ( + "context" + "fmt" + "reflect" + "sync" + "testing" + "time" + + "github.com/kevinms/leakybucket-go" + "github.com/libp2p/go-libp2p-core/network" + eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" + mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers" + p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" + stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" + beaconsync "github.com/prysmaticlabs/prysm/beacon-chain/sync" + p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + "github.com/prysmaticlabs/prysm/shared/hashutil" + "github.com/prysmaticlabs/prysm/shared/params" + "github.com/prysmaticlabs/prysm/shared/roughtime" + "github.com/prysmaticlabs/prysm/shared/sliceutil" + "github.com/sirupsen/logrus" +) + +type testCache struct { + sync.RWMutex + rootCache map[uint64][32]byte + parentSlotCache map[uint64]uint64 +} + +var cache = &testCache{} + +type peerData struct { + blocks []uint64 // slots that peer has blocks + finalizedEpoch uint64 + headSlot uint64 + failureSlots []uint64 // slots at which the peer will return an error + forkedPeer bool +} + +func init() { + logrus.SetLevel(logrus.DebugLevel) +} + +func TestConstants(t *testing.T) { + if params.BeaconConfig().MaxPeersToSync*blockBatchSize > 1000 { + t.Fatal("rpc rejects requests over 1000 range slots") + } +} + +func TestRoundRobinSync(t *testing.T) { + + tests := []struct { + name string + currentSlot uint64 + expectedBlockSlots []uint64 + peers []*peerData + }{ + { + name: "Single peer with all blocks", + currentSlot: 131, + expectedBlockSlots: makeSequence(1, 131), + peers: []*peerData{ + { + blocks: makeSequence(1, 131), + finalizedEpoch: 1, + headSlot: 131, + }, + }, + }, + { + name: "Multiple peers with all blocks", + currentSlot: 131, + expectedBlockSlots: makeSequence(1, 131), + peers: []*peerData{ + { + blocks: makeSequence(1, 131), + finalizedEpoch: 1, + headSlot: 131, + }, + { + blocks: makeSequence(1, 131), + finalizedEpoch: 1, + headSlot: 131, + }, + { + blocks: makeSequence(1, 131), + finalizedEpoch: 1, + headSlot: 131, + }, + { + blocks: makeSequence(1, 131), + finalizedEpoch: 1, + headSlot: 131, + }, + }, + }, + { + name: "Multiple peers with failures", + currentSlot: 320, // 10 epochs + expectedBlockSlots: makeSequence(1, 320), + peers: []*peerData{ + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + }, + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + failureSlots: makeSequence(1, 32), // first epoch + }, + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + }, + { + blocks: makeSequence(1, 320), + finalizedEpoch: 8, + headSlot: 320, + }, + }, + }, + { + name: "Multiple peers with many skipped slots", + currentSlot: 640, // 10 epochs + expectedBlockSlots: append(makeSequence(1, 64), makeSequence(500, 640)...), + peers: []*peerData{ + { + blocks: append(makeSequence(1, 64), makeSequence(500, 640)...), + finalizedEpoch: 18, + headSlot: 640, + }, + { + blocks: append(makeSequence(1, 64), makeSequence(500, 640)...), + finalizedEpoch: 18, + headSlot: 640, + }, + { + blocks: append(makeSequence(1, 64), makeSequence(500, 640)...), + finalizedEpoch: 18, + headSlot: 640, + }, + }, + }, + + // TODO(3147): Handle multiple failures. + //{ + // name: "Multiple peers with multiple failures", + // currentSlot: 320, // 10 epochs + // expectedBlockSlots: makeSequence(1, 320), + // peers: []*peerData{ + // { + // blocks: makeSequence(1, 320), + // finalizedEpoch: 4, + // headSlot: 320, + // }, + // { + // blocks: makeSequence(1, 320), + // finalizedEpoch: 4, + // headSlot: 320, + // failureSlots: makeSequence(1, 320), + // }, + // { + // blocks: makeSequence(1, 320), + // finalizedEpoch: 4, + // headSlot: 320, + // failureSlots: makeSequence(1, 320), + // }, + // { + // blocks: makeSequence(1, 320), + // finalizedEpoch: 4, + // headSlot: 320, + // failureSlots: makeSequence(1, 320), + // }, + // }, + //}, + { + name: "Multiple peers with different finalized epoch", + currentSlot: 320, // 10 epochs + expectedBlockSlots: makeSequence(1, 320), + peers: []*peerData{ + { + blocks: makeSequence(1, 320), + finalizedEpoch: 4, + headSlot: 320, + }, + { + blocks: makeSequence(1, 256), + finalizedEpoch: 3, + headSlot: 256, + }, + { + blocks: makeSequence(1, 256), + finalizedEpoch: 3, + headSlot: 256, + }, + { + blocks: makeSequence(1, 192), + finalizedEpoch: 2, + headSlot: 192, + }, + }, + }, + { + name: "Multiple peers with missing parent blocks", + currentSlot: 160, // 5 epochs + expectedBlockSlots: makeSequence(1, 160), + peers: []*peerData{ + { + blocks: makeSequence(1, 160), + finalizedEpoch: 4, + headSlot: 160, + }, + { + blocks: append(makeSequence(1, 6), makeSequence(161, 165)...), + finalizedEpoch: 4, + headSlot: 160, + forkedPeer: true, + }, + { + blocks: makeSequence(1, 160), + finalizedEpoch: 4, + headSlot: 160, + }, + { + blocks: makeSequence(1, 160), + finalizedEpoch: 4, + headSlot: 160, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cache.initializeRootCache(tt.expectedBlockSlots, t) + + p := p2pt.NewTestP2P(t) + beaconDB := dbtest.SetupDB(t) + + connectPeers(t, p, tt.peers, p.Peers()) + cache.RLock() + genesisRoot := cache.rootCache[0] + cache.RUnlock() + + err := beaconDB.SaveBlock(context.Background(), ð.SignedBeaconBlock{ + Block: ð.BeaconBlock{ + Slot: 0, + }}) + if err != nil { + t.Fatal(err) + } + + st, err := stateTrie.InitializeFromProto(&p2ppb.BeaconState{}) + if err != nil { + t.Fatal(err) + } + mc := &mock.ChainService{ + State: st, + Root: genesisRoot[:], + DB: beaconDB, + } // no-op mock + s := &Service{ + chain: mc, + blockNotifier: mc.BlockNotifier(), + p2p: p, + db: beaconDB, + synced: false, + chainStarted: true, + blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */), + } + if err := s.roundRobinSync(makeGenesisTime(tt.currentSlot)); err != nil { + t.Error(err) + } + if s.chain.HeadSlot() != tt.currentSlot { + t.Errorf("Head slot (%d) is not currentSlot (%d)", s.chain.HeadSlot(), tt.currentSlot) + } + if len(mc.BlocksReceived) != len(tt.expectedBlockSlots) { + t.Errorf("Processes wrong number of blocks. Wanted %d got %d", len(tt.expectedBlockSlots), len(mc.BlocksReceived)) + } + var receivedBlockSlots []uint64 + for _, blk := range mc.BlocksReceived { + receivedBlockSlots = append(receivedBlockSlots, blk.Block.Slot) + } + if missing := sliceutil.NotUint64(sliceutil.IntersectionUint64(tt.expectedBlockSlots, receivedBlockSlots), tt.expectedBlockSlots); len(missing) > 0 { + t.Errorf("Missing blocks at slots %v", missing) + } + dbtest.TeardownDB(t, beaconDB) + }) + } +} + +// Connect peers with local host. This method sets up peer statuses and the appropriate handlers +// for each test peer. +func connectPeers(t *testing.T, host *p2pt.TestP2P, data []*peerData, peerStatus *peers.Status) { + const topic = "/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz" + + for _, d := range data { + peer := p2pt.NewTestP2P(t) + + // Copy pointer for callback scope. + var datum = d + + peer.SetStreamHandler(topic, func(stream network.Stream) { + defer stream.Close() + + req := &p2ppb.BeaconBlocksByRangeRequest{} + if err := peer.Encoding().DecodeWithLength(stream, req); err != nil { + t.Error(err) + } + + requestedBlocks := makeSequence(req.StartSlot, req.StartSlot+(req.Count*req.Step)) + + // Expected failure range + if len(sliceutil.IntersectionUint64(datum.failureSlots, requestedBlocks)) > 0 { + if _, err := stream.Write([]byte{0x01}); err != nil { + t.Error(err) + } + if _, err := peer.Encoding().EncodeWithLength(stream, "bad"); err != nil { + t.Error(err) + } + return + } + + // Determine the correct subset of blocks to return as dictated by the test scenario. + blocks := sliceutil.IntersectionUint64(datum.blocks, requestedBlocks) + + ret := make([]*eth.SignedBeaconBlock, 0) + for _, slot := range blocks { + if (slot-req.StartSlot)%req.Step != 0 { + continue + } + cache.RLock() + parentRoot := cache.rootCache[cache.parentSlotCache[slot]] + cache.RUnlock() + blk := ð.SignedBeaconBlock{ + Block: ð.BeaconBlock{ + Slot: slot, + ParentRoot: parentRoot[:], + }, + } + // If forked peer, give a different parent root. + if datum.forkedPeer { + newRoot := hashutil.Hash(parentRoot[:]) + blk.Block.ParentRoot = newRoot[:] + } + ret = append(ret, blk) + currRoot, _ := ssz.HashTreeRoot(blk.Block) + logrus.Infof("block with slot %d , signing root %#x and parent root %#x", slot, currRoot, parentRoot) + } + + if uint64(len(ret)) > req.Count { + ret = ret[:req.Count] + } + + for i := 0; i < len(ret); i++ { + if err := beaconsync.WriteChunk(stream, peer.Encoding(), ret[i]); err != nil { + t.Error(err) + } + } + }) + + peer.Connect(host) + + peerStatus.Add(peer.PeerID(), nil, network.DirOutbound) + peerStatus.SetConnectionState(peer.PeerID(), peers.PeerConnected) + peerStatus.SetChainState(peer.PeerID(), &p2ppb.Status{ + HeadForkVersion: params.BeaconConfig().GenesisForkVersion, + FinalizedRoot: []byte(fmt.Sprintf("finalized_root %d", datum.finalizedEpoch)), + FinalizedEpoch: datum.finalizedEpoch, + HeadRoot: []byte("head_root"), + HeadSlot: datum.headSlot, + }) + } +} + +// makeGenesisTime where now is the current slot. +func makeGenesisTime(currentSlot uint64) time.Time { + return roughtime.Now().Add(-1 * time.Second * time.Duration(currentSlot) * time.Duration(params.BeaconConfig().SecondsPerSlot)) +} + +// sanity test on helper function +func TestMakeGenesisTime(t *testing.T) { + currentSlot := uint64(64) + gt := makeGenesisTime(currentSlot) + if helpers.SlotsSince(gt) != currentSlot { + t.Fatalf("Wanted %d, got %d", currentSlot, helpers.SlotsSince(gt)) + } +} + +// helper function for sequences of block slots +func makeSequence(start, end uint64) []uint64 { + if end < start { + panic("cannot make sequence where end is before start") + } + seq := make([]uint64, 0, end-start+1) + for i := start; i <= end; i++ { + seq = append(seq, i) + } + return seq +} + +func (c *testCache) initializeRootCache(reqSlots []uint64, t *testing.T) { + c.Lock() + defer c.Unlock() + + c.rootCache = make(map[uint64][32]byte) + c.parentSlotCache = make(map[uint64]uint64) + parentSlot := uint64(0) + genesisBlock := ð.BeaconBlock{ + Slot: 0, + } + genesisRoot, err := ssz.HashTreeRoot(genesisBlock) + if err != nil { + t.Fatal(err) + } + c.rootCache[0] = genesisRoot + parentRoot := genesisRoot + for _, slot := range reqSlots { + currentBlock := ð.BeaconBlock{ + Slot: slot, + ParentRoot: parentRoot[:], + } + parentRoot, err = ssz.HashTreeRoot(currentBlock) + if err != nil { + t.Fatal(err) + } + c.rootCache[slot] = parentRoot + c.parentSlotCache[slot] = parentSlot + parentSlot = slot + } +} + +// sanity test on helper function +func TestMakeSequence(t *testing.T) { + got := makeSequence(3, 5) + want := []uint64{3, 4, 5} + if !reflect.DeepEqual(got, want) { + t.Fatalf("Wanted %v, got %v", want, got) + } +} diff --git a/beacon-chain/sync/initial-sync-old/service.go b/beacon-chain/sync/initial-sync-old/service.go new file mode 100644 index 000000000..572fa0d5d --- /dev/null +++ b/beacon-chain/sync/initial-sync-old/service.go @@ -0,0 +1,191 @@ +package initialsyncold + +import ( + "context" + "time" + + "github.com/kevinms/leakybucket-go" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" + "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" + blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" + statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/beacon-chain/db" + "github.com/prysmaticlabs/prysm/beacon-chain/flags" + "github.com/prysmaticlabs/prysm/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/shared" + "github.com/prysmaticlabs/prysm/shared/params" + "github.com/prysmaticlabs/prysm/shared/roughtime" + "github.com/sirupsen/logrus" +) + +var _ = shared.Service(&Service{}) + +type blockchainService interface { + blockchain.BlockReceiver + blockchain.HeadFetcher + ClearCachedStates() + blockchain.FinalizationFetcher +} + +const ( + handshakePollingInterval = 5 * time.Second // Polling interval for checking the number of received handshakes. + + allowedBlocksPerSecond = 32.0 +) + +// Config to set up the initial sync service. +type Config struct { + P2P p2p.P2P + DB db.ReadOnlyDatabase + Chain blockchainService + StateNotifier statefeed.Notifier + BlockNotifier blockfeed.Notifier +} + +// Service service. +type Service struct { + ctx context.Context + chain blockchainService + p2p p2p.P2P + db db.ReadOnlyDatabase + synced bool + chainStarted bool + stateNotifier statefeed.Notifier + blockNotifier blockfeed.Notifier + blocksRateLimiter *leakybucket.Collector +} + +// NewInitialSync configures the initial sync service responsible for bringing the node up to the +// latest head of the blockchain. +func NewInitialSync(cfg *Config) *Service { + return &Service{ + ctx: context.Background(), + chain: cfg.Chain, + p2p: cfg.P2P, + db: cfg.DB, + stateNotifier: cfg.StateNotifier, + blockNotifier: cfg.BlockNotifier, + blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksPerSecond, false /* deleteEmptyBuckets */), + } +} + +// Start the initial sync service. +func (s *Service) Start() { + var genesis time.Time + + headState, err := s.chain.HeadState(s.ctx) + if headState == nil || err != nil { + // Wait for state to be initialized. + stateChannel := make(chan *feed.Event, 1) + stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel) + defer stateSub.Unsubscribe() + genesisSet := false + for !genesisSet { + select { + case event := <-stateChannel: + if event.Type == statefeed.Initialized { + data := event.Data.(*statefeed.InitializedData) + log.WithField("starttime", data.StartTime).Debug("Received state initialized event") + genesis = data.StartTime + genesisSet = true + } + case <-s.ctx.Done(): + log.Debug("Context closed, exiting goroutine") + return + case err := <-stateSub.Err(): + log.WithError(err).Error("Subscription to state notifier failed") + return + } + } + stateSub.Unsubscribe() + } else { + genesis = time.Unix(int64(headState.GenesisTime()), 0) + } + + if genesis.After(roughtime.Now()) { + log.WithField( + "genesis time", + genesis, + ).Warn("Genesis time is in the future - waiting to start sync...") + time.Sleep(roughtime.Until(genesis)) + } + s.chainStarted = true + currentSlot := helpers.SlotsSince(genesis) + if helpers.SlotToEpoch(currentSlot) == 0 { + log.Info("Chain started within the last epoch - not syncing") + s.synced = true + return + } + log.Info("Starting initial chain sync...") + // Are we already in sync, or close to it? + if helpers.SlotToEpoch(s.chain.HeadSlot()) == helpers.SlotToEpoch(currentSlot) { + log.Info("Already synced to the current chain head") + s.synced = true + return + } + s.waitForMinimumPeers() + if err := s.roundRobinSync(genesis); err != nil { + panic(err) + } + log.Infof("Synced up to slot %d", s.chain.HeadSlot()) + s.synced = true +} + +// Stop initial sync. +func (s *Service) Stop() error { + return nil +} + +// Status of initial sync. +func (s *Service) Status() error { + if !s.synced && s.chainStarted { + return errors.New("syncing") + } + return nil +} + +// Syncing returns true if initial sync is still running. +func (s *Service) Syncing() bool { + return !s.synced +} + +// Resync allows a node to start syncing again if it has fallen +// behind the current network head. +func (s *Service) Resync() error { + // set it to false since we are syncing again + s.synced = false + defer func() { s.synced = true }() // Reset it at the end of the method. + headState, err := s.chain.HeadState(context.Background()) + if err != nil { + return errors.Wrap(err, "could not retrieve head state") + } + genesis := time.Unix(int64(headState.GenesisTime()), 0) + + s.waitForMinimumPeers() + err = s.roundRobinSync(genesis) + if err != nil { + log = log.WithError(err) + } + log.WithField("slot", s.chain.HeadSlot()).Info("Resync attempt complete") + + return nil +} + +func (s *Service) waitForMinimumPeers() { + required := params.BeaconConfig().MaxPeersToSync + if flags.Get().MinimumSyncPeers < required { + required = flags.Get().MinimumSyncPeers + } + for { + _, _, peers := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, s.chain.FinalizedCheckpt().Epoch) + if len(peers) >= required { + break + } + log.WithFields(logrus.Fields{ + "suitable": len(peers), + "required": required}).Info("Waiting for enough suitable peers before syncing") + time.Sleep(handshakePollingInterval) + } +} diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 8bdcd8790..326220f71 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -72,7 +72,7 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc rateLimiter := leakybucket.NewCollector( allowedBlocksPerSecond, /* rate */ allowedBlocksPerSecond, /* capacity */ - false /* deleteEmptyBuckets */) + false /* deleteEmptyBuckets */) return &blocksFetcher{ ctx: ctx, diff --git a/beacon-chain/sync/initial-sync/blocks_queue.go b/beacon-chain/sync/initial-sync/blocks_queue.go index a17e984e6..eae0f9ac8 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue.go +++ b/beacon-chain/sync/initial-sync/blocks_queue.go @@ -10,6 +10,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/shared/mathutil" + "go.opencensus.io/trace" ) const ( @@ -365,6 +366,9 @@ func (q *blocksQueue) sendFetchedBlocks(ctx context.Context) error { q.state.sender.Lock() defer q.state.sender.Unlock() + ctx, span := trace.StartSpan(ctx, "initialsync.sendFetchedBlocks") + defer span.End() + startSlot := q.headFetcher.HeadSlot() + 1 nonSkippedSlot := uint64(0) for slot := startSlot; slot <= q.highestExpectedSlot; slot++ { diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index 83b3b48e4..585188de3 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -4,9 +4,6 @@ import ( "context" "fmt" "io" - "math/rand" - "sort" - "sync/atomic" "time" "github.com/libp2p/go-libp2p-core/peer" @@ -16,13 +13,11 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" - "github.com/prysmaticlabs/prysm/beacon-chain/flags" prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync" p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/mathutil" - "github.com/prysmaticlabs/prysm/shared/params" "github.com/sirupsen/logrus" ) @@ -56,187 +51,31 @@ func (s *Service) roundRobinSync(genesis time.Time) error { } counter := ratecounter.NewRateCounter(counterSeconds * time.Second) - randGenerator := rand.New(rand.NewSource(time.Now().Unix())) - var lastEmptyRequests int highestFinalizedSlot := helpers.StartSlot(s.highestFinalizedEpoch() + 1) + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + p2p: s.p2p, + headFetcher: s.chain, + highestExpectedSlot: highestFinalizedSlot, + }) + if err := queue.start(); err != nil { + return err + } + // Step 1 - Sync to end of finalized epoch. - for s.chain.HeadSlot() < highestFinalizedSlot { - root, finalizedEpoch, peers := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, helpers.SlotToEpoch(s.chain.HeadSlot())) - if len(peers) == 0 { - log.Warn("No peers; waiting for reconnect") - time.Sleep(refreshTime) + for blk := range queue.fetchedBlocks { + s.logSyncStatus(genesis, blk.Block, counter) + if err := s.processBlock(ctx, blk); err != nil { + log.WithError(err).Info("Block is invalid") + queue.state.scheduler.incrementCounter(failedBlock, 1) continue } - - if len(peers) >= flags.Get().MinimumSyncPeers { - highestFinalizedSlot = helpers.StartSlot(finalizedEpoch + 1) - } - - // shuffle peers to prevent a bad peer from - // stalling sync with invalid blocks - randGenerator.Shuffle(len(peers), func(i, j int) { - peers[i], peers[j] = peers[j], peers[i] - }) - - // request a range of blocks to be requested from multiple peers. - // Example: - // - number of peers = 4 - // - range of block slots is 64...128 - // Four requests will be spread across the peers using step argument to distribute the load - // i.e. the first peer is asked for block 64, 68, 72... while the second peer is asked for - // 65, 69, 73... and so on for other peers. - var request func(start uint64, step uint64, count uint64, peers []peer.ID, remainder int) ([]*eth.SignedBeaconBlock, error) - request = func(start uint64, step uint64, count uint64, peers []peer.ID, remainder int) ([]*eth.SignedBeaconBlock, error) { - if len(peers) == 0 { - return nil, errors.WithStack(errors.New("no peers left to request blocks")) - } - var p2pRequestCount int32 - errChan := make(chan error) - blocksChan := make(chan []*eth.SignedBeaconBlock) - - // Handle block large block ranges of skipped slots. - start += count * uint64(lastEmptyRequests*len(peers)) - if count <= 1 { - step = 1 - } - - // Short circuit start far exceeding the highest finalized epoch in some infinite loop. - if start > highestFinalizedSlot { - return nil, errors.Errorf("attempted to ask for a start slot of %d which is greater than the next highest slot of %d", start, highestFinalizedSlot) - } - - atomic.AddInt32(&p2pRequestCount, int32(len(peers))) - for i, pid := range peers { - if ctx.Err() != nil { - return nil, ctx.Err() - } - start := start + uint64(i)*step - step := step * uint64(len(peers)) - count := mathutil.Min(count, (helpers.StartSlot(finalizedEpoch+1)-start)/step) - // If the count was divided by an odd number of peers, there will be some blocks - // missing from the first requests so we accommodate that scenario. - if i < remainder { - count++ - } - // asking for no blocks may cause the client to hang. This should never happen and - // the peer may return an error anyway, but we'll ask for at least one block. - if count == 0 { - count = 1 - } - req := &p2ppb.BeaconBlocksByRangeRequest{ - HeadBlockRoot: root, - StartSlot: start, - Count: count, - Step: step, - } - - go func(i int, pid peer.ID) { - defer func() { - zeroIfIAmTheLast := atomic.AddInt32(&p2pRequestCount, -1) - if zeroIfIAmTheLast == 0 { - close(blocksChan) - } - }() - - resp, err := s.requestBlocks(ctx, req, pid) - if err != nil { - // fail over to other peers by splitting this requests evenly across them. - ps := append(peers[:i], peers[i+1:]...) - log.WithError(err).WithField( - "remaining peers", - len(ps), - ).WithField( - "peer", - pid.Pretty(), - ).Debug("Request failed, trying to round robin with other peers") - if len(ps) == 0 { - errChan <- errors.WithStack(errors.New("no peers left to request blocks")) - return - } - resp, err = request(start, step, count/uint64(len(ps)) /*count*/, ps, int(count)%len(ps) /*remainder*/) - if err != nil { - errChan <- err - return - } - } - log.WithField("peer", pid).WithField("count", len(resp)).Debug("Received blocks") - blocksChan <- resp - }(i, pid) - } - - var unionRespBlocks []*eth.SignedBeaconBlock - for { - select { - case err := <-errChan: - return nil, err - case resp, ok := <-blocksChan: - if ok { - // if this synchronization becomes a bottleneck: - // think about immediately allocating space for all peers in unionRespBlocks, - // and write without synchronization - unionRespBlocks = append(unionRespBlocks, resp...) - } else { - return unionRespBlocks, nil - } - } - } - } - startBlock := s.chain.HeadSlot() + 1 - skippedBlocks := blockBatchSize * uint64(lastEmptyRequests*len(peers)) - if startBlock+skippedBlocks > helpers.StartSlot(finalizedEpoch+1) { - log.WithField("finalizedEpoch", finalizedEpoch).Debug("Requested block range is greater than the finalized epoch") - break - } - - blocks, err := request( - s.chain.HeadSlot()+1, // start - 1, // step - blockBatchSize, // count - peers, // peers - 0, // remainder - ) - if err != nil { - log.WithError(err).Error("Round robing sync request failed") - continue - } - - // Since the block responses were appended to the list, we must sort them in order to - // process sequentially. This method doesn't make much wall time compared to block - // processing. - sort.Slice(blocks, func(i, j int) bool { - return blocks[i].Block.Slot < blocks[j].Block.Slot - }) - - for _, blk := range blocks { - s.logSyncStatus(genesis, blk.Block, peers, counter) - if !s.db.HasBlock(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) { - log.Debugf("Beacon node doesn't have a block in db with root %#x", blk.Block.ParentRoot) - continue - } - s.blockNotifier.BlockFeed().Send(&feed.Event{ - Type: blockfeed.ReceivedBlock, - Data: &blockfeed.ReceivedBlockData{SignedBlock: blk}, - }) - if featureconfig.Get().InitSyncNoVerify { - if err := s.chain.ReceiveBlockNoVerify(ctx, blk); err != nil { - return err - } - } else { - if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, blk); err != nil { - return err - } - } - } - // If there were no blocks in the last request range, increment the counter so the same - // range isn't requested again on the next loop as the headSlot didn't change. - if len(blocks) == 0 { - lastEmptyRequests++ - } else { - lastEmptyRequests = 0 - } + queue.state.scheduler.incrementCounter(validBlock, 1) } log.Debug("Synced to finalized epoch - now syncing blocks up to current head") + if err := queue.stop(); err != nil { + log.WithError(err).Debug("Error stopping queue") + } if s.chain.HeadSlot() == helpers.SlotsSince(genesis) { return nil @@ -274,7 +113,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error { } for _, blk := range resp { - s.logSyncStatus(genesis, blk.Block, []peer.ID{best}, counter) + s.logSyncStatus(genesis, blk.Block, counter) if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, blk); err != nil { log.WithError(err).Error("Failed to process block, exiting init sync") return nil @@ -338,7 +177,7 @@ func (s *Service) highestFinalizedEpoch() uint64 { } // logSyncStatus and increment block processing counter. -func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, syncingPeers []peer.ID, counter *ratecounter.RateCounter) { +func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, counter *ratecounter.RateCounter) { counter.Incr(1) rate := float64(counter.Rate()) / counterSeconds if rate == 0 { @@ -347,7 +186,7 @@ func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, syncing timeRemaining := time.Duration(float64(helpers.SlotsSince(genesis)-blk.Slot)/rate) * time.Second log.WithField( "peers", - fmt.Sprintf("%d/%d", len(syncingPeers), len(s.p2p.Peers().Connected())), + len(s.p2p.Peers().Connected()), ).WithField( "blocksPerSecond", fmt.Sprintf("%.1f", rate), @@ -358,3 +197,23 @@ func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, syncing timeRemaining, ) } + +func (s *Service) processBlock(ctx context.Context, blk *eth.SignedBeaconBlock) error { + if !s.db.HasBlock(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) { + return fmt.Errorf("beacon node doesn't have a block in db with root %#x", blk.Block.ParentRoot) + } + s.blockNotifier.BlockFeed().Send(&feed.Event{ + Type: blockfeed.ReceivedBlock, + Data: &blockfeed.ReceivedBlockData{SignedBlock: blk}, + }) + if featureconfig.Get().InitSyncNoVerify { + if err := s.chain.ReceiveBlockNoVerify(ctx, blk); err != nil { + return err + } + } else { + if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, blk); err != nil { + return err + } + } + return nil +} diff --git a/shared/featureconfig/config.go b/shared/featureconfig/config.go index 93aa63c00..b9bed2e95 100644 --- a/shared/featureconfig/config.go +++ b/shared/featureconfig/config.go @@ -49,6 +49,7 @@ type Flags struct { EnableNoise bool // EnableNoise enables the beacon node to use NOISE instead of SECIO when performing a handshake with another peer. DontPruneStateStartUp bool // DontPruneStateStartUp disables pruning state upon beacon node start up. NewStateMgmt bool // NewStateMgmt enables the new experimental state mgmt service. + EnableInitSyncQueue bool // EnableInitSyncQueue enables the new initial sync implementation. // DisableForkChoice disables using LMD-GHOST fork choice to update // the head of the chain based on attestations and instead accepts any valid received block // as the chain head. UNSAFE, use with caution. @@ -170,6 +171,10 @@ func ConfigureBeaconChain(ctx *cli.Context) { log.Warn("Enabling experimental state management service") cfg.NewStateMgmt = true } + if ctx.GlobalBool(enableInitSyncQueue.Name) { + log.Warn("Enabling initial sync queue") + cfg.EnableInitSyncQueue = true + } Init(cfg) } diff --git a/shared/featureconfig/flags.go b/shared/featureconfig/flags.go index 6f19c19d0..c35d301a8 100644 --- a/shared/featureconfig/flags.go +++ b/shared/featureconfig/flags.go @@ -125,6 +125,10 @@ var ( Name: "new-state-mgmt", Usage: "This enables the usage of experimental state mgmt service across Prysm", } + enableInitSyncQueue = cli.BoolFlag{ + Name: "enable-initial-sync-queue", + Usage: "Enables concurrent fetching and processing of blocks on initial sync.", + } ) // Deprecated flags list. @@ -303,6 +307,7 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{ dontPruneStateStartUp, broadcastSlashingFlag, newStateMgmt, + enableInitSyncQueue, }...) // E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E. @@ -314,4 +319,5 @@ var E2EBeaconChainFlags = []string{ "--enable-byte-mempool", "--enable-state-gen-sig-verify", "--check-head-state", + "--enable-initial-sync-queue", }