prysm-pulse/beacon-chain/sync/initial-sync/round_robin.go
Raul Jordan cc741ed8af
Ensure New State Type Tests Pass in Prysm (#4646)
* begin state service

* begin on the state trie idea

* created beacon state structure

* add in the full clone getter

* return by value instead

* add all setters

* new state setters are being completed

* arrays roots exposed

*  close to finishing all these headerssss

* functionality complete

* added in proto benchmark test

* test for compatibility

* add test for compat

* comments fixed

* add clone

* add clone

* remove underlying copies

* make it immutable

* integrate it into chainservice

* revert

* wrap up comments for package

* address all comments and godocs

* address all comments

* clone the pending attestation properly

* properly clone remaining items

* tests pass fixed bug

* begin using it instead of head state

* prevent nil pointer exceptions

* begin using new struct in db

* integrated new type into db package

* add proper nil checks

* using new state in archiver

* refactored much of core

* editing all the precompute functions

* done with most core refactor

* fixed up some bugs in the clone comparisons

* append current epoch atts

* add missing setters

* add new setters

* fix other core methods

* fix up transition

* main service and forkchoice

* fix rpc

* integrated to powchain

* some more changes

* fix build

* improve processing of deposits

* fix error

* prevent panic

* comment

* fix process att

* gaz

* fix up att process

* resolve existing review comments

* resolve another batch of gh comments

* resolve broken cpt state

* revise testutil to use the new state

* begin updating the state transition func to pass in more compartmentalized args

* finish editing transition function to return errors

* block operations pretty much done with refactor

* state transition fully refactored

* got epoch processing completed

* fix build in fork choice

* fixing more of the build

* fix up broken sync package

* it builds nowww it buildssss

* revert registry changes

* Recompute on Read (#4627)

* compute on read

* fix up eth1 data votes

* looking into slashings bug introduced in core/

* able to advance more slots

* add logging

* can now sync with testnet yay

* remove the leaves algorithm and other merkle imports

* expose initialize unsafe funcs

* Update beacon-chain/db/kv/state.go

* lint

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>

* More Optimizations for New State (#4641)

* map optimization

* more optimizations

* use a custom hasher

* comment

* block operations optimizations

* Update beacon-chain/state/types.go

Co-Authored-By: Raul Jordan <raul@prysmaticlabs.com>

* fixed up various operations to use the validator index map access

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>

* archiver tests pass

* fixing cache tests

* cache tests passing

* edited validator tests

* powchain tests passing

* halfway thru sync tests

* more sync test fixes

* add in tests for state/

* working through rpc tests

* assignments tests passed

* almost done with rpc/beacon tests

* resolved painful validator test

* fixed up even more tests

* resolve tests

* fix build

* reduce a randao mixes copy

* fixes under //beacon-chain/blockchain/...

* build //beacon-chain/core/...

* fixes

* Runtime Optimizations (#4648)

* parallelize shuffling

* clean up

* lint

* fix build

* use callback to read from registry

* fix array roots and size map

* new improvements

* reduce hash allocs

* improved shuffling

* terence's review

* use different method

* raul's comment

* new array roots

* remove clone in pre-compute

* Update beacon-chain/state/types.go

Co-Authored-By: Raul Jordan <raul@prysmaticlabs.com>

* raul's review

* lint

* fix build issues

* fix visibility

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>

* fix visibility

* build works for all

* fix blockchain test

* fix a few tests

* fix more tests

* update validator in slashing

* archiver passing

* fixed rpc/validator

* progress on core tests

* resolve broken rpc tests

* blockchain tests passed

* fix up some tests in core

* fix message diff

* remove unnecessary save

* Save validator after slashing

* Update validators one by one

* another update

* fix everything

* fix more precompute tests

* fix blocks tests

* more elegant fix

* more helper fixes

* change back ?

* fix test

* fix skip slot

* fix test

* reset caches

* fix testutil

* raceoff fixed

* passing

* Retrieve cached state in the beginning

* lint

* Fixed tests part 1

* Fixed rest of the tests

* Minor changes to avoid copying, small refactor to reduce deplicated code

* Handle att req for slot 0

* New beacon state: Only populate merkle layers as needed, copy merkle layers on copy/clone. (#4689)

* Only populate merkle layers as needed, copy merkle layers on copy/clone.

* use custom copy

* Make maps of correct size

* slightly fast, doesn't wait for lock

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>

* Target root can't be 0x00

* Don't use cache for current slot (may not be the right fix)

* fixed up tests

* Remove some copy for init sync. Not sure if it is safe enough for runtime though... testing...

* Align with prev logic for process slots cachedState.Slot() < slot

* Fix Initial Sync Flag (#4692)

* fixes

* fix up some test failures due to lack of nil checks

* fix up some test failures due to lack of nil checks

* fix up imports

* revert some changes

* imports

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>

* resolving further conflicts

* Better skip slot cache (#4694)

* Return copy of skip slot cache state, disable skip slot cache on sync

* fix

* Fix pruning

* fix up issues with broken tests

Co-authored-by: Nishant Das <nish1993@hotmail.com>
Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
Co-authored-by: shayzluf <thezluf@gmail.com>
Co-authored-by: terence tsao <terence@prysmaticlabs.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
2020-01-31 12:57:01 -08:00

364 lines
12 KiB
Go

package initialsync
import (
"context"
"fmt"
"io"
"math/rand"
"sort"
"sync/atomic"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/paulbellamy/ratecounter"
"github.com/pkg/errors"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/mathutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
)
const blockBatchSize = 64
const counterSeconds = 20
const refreshTime = 6 * time.Second
// Round Robin sync looks at the latest peer statuses and syncs with the highest
// finalized peer.
//
// Step 1 - Sync to finalized epoch.
// Sync with peers of lowest finalized root with epoch greater than head state.
//
// Step 2 - Sync to head from finalized epoch.
// Using the finalized root as the head_block_root and the epoch start slot
// after the finalized epoch, request blocks to head from some subset of peers
// where step = 1.
func (s *Service) roundRobinSync(genesis time.Time) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if cfg := featureconfig.Get(); cfg.EnableSkipSlotsCache {
cfg.EnableSkipSlotsCache = false
featureconfig.Init(cfg)
defer func() {
cfg := featureconfig.Get()
cfg.EnableSkipSlotsCache = true
featureconfig.Init(cfg)
}()
}
counter := ratecounter.NewRateCounter(counterSeconds * time.Second)
randGenerator := rand.New(rand.NewSource(time.Now().Unix()))
var lastEmptyRequests int
highestFinalizedSlot := helpers.StartSlot(s.highestFinalizedEpoch() + 1)
// Step 1 - Sync to end of finalized epoch.
for s.chain.HeadSlot() < highestFinalizedSlot {
root, finalizedEpoch, peers := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, helpers.SlotToEpoch(s.chain.HeadSlot()))
if len(peers) == 0 {
log.Warn("No peers; waiting for reconnect")
time.Sleep(refreshTime)
continue
}
if len(peers) >= flags.Get().MinimumSyncPeers {
highestFinalizedSlot = helpers.StartSlot(finalizedEpoch + 1)
}
// shuffle peers to prevent a bad peer from
// stalling sync with invalid blocks
randGenerator.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})
// request a range of blocks to be requested from multiple peers.
// Example:
// - number of peers = 4
// - range of block slots is 64...128
// Four requests will be spread across the peers using step argument to distribute the load
// i.e. the first peer is asked for block 64, 68, 72... while the second peer is asked for
// 65, 69, 73... and so on for other peers.
var request func(start uint64, step uint64, count uint64, peers []peer.ID, remainder int) ([]*eth.SignedBeaconBlock, error)
request = func(start uint64, step uint64, count uint64, peers []peer.ID, remainder int) ([]*eth.SignedBeaconBlock, error) {
if len(peers) == 0 {
return nil, errors.WithStack(errors.New("no peers left to request blocks"))
}
var p2pRequestCount int32
errChan := make(chan error)
blocksChan := make(chan []*eth.SignedBeaconBlock)
// Handle block large block ranges of skipped slots.
start += count * uint64(lastEmptyRequests*len(peers))
if count <= 1 {
step = 1
}
// Short circuit start far exceeding the highest finalized epoch in some infinite loop.
if start > highestFinalizedSlot {
return nil, errors.Errorf("attempted to ask for a start slot of %d which is greater than the next highest slot of %d", start, highestFinalizedSlot)
}
atomic.AddInt32(&p2pRequestCount, int32(len(peers)))
for i, pid := range peers {
if ctx.Err() != nil {
return nil, ctx.Err()
}
start := start + uint64(i)*step
step := step * uint64(len(peers))
count := mathutil.Min(count, (helpers.StartSlot(finalizedEpoch+1)-start)/step)
// If the count was divided by an odd number of peers, there will be some blocks
// missing from the first requests so we accommodate that scenario.
if i < remainder {
count++
}
// asking for no blocks may cause the client to hang. This should never happen and
// the peer may return an error anyway, but we'll ask for at least one block.
if count == 0 {
count = 1
}
req := &p2ppb.BeaconBlocksByRangeRequest{
HeadBlockRoot: root,
StartSlot: start,
Count: count,
Step: step,
}
go func(i int, pid peer.ID) {
defer func() {
zeroIfIAmTheLast := atomic.AddInt32(&p2pRequestCount, -1)
if zeroIfIAmTheLast == 0 {
close(blocksChan)
}
}()
resp, err := s.requestBlocks(ctx, req, pid)
if err != nil {
// fail over to other peers by splitting this requests evenly across them.
ps := append(peers[:i], peers[i+1:]...)
log.WithError(err).WithField(
"remaining peers",
len(ps),
).WithField(
"peer",
pid.Pretty(),
).Debug("Request failed, trying to round robin with other peers")
if len(ps) == 0 {
errChan <- errors.WithStack(errors.New("no peers left to request blocks"))
return
}
resp, err = request(start, step, count/uint64(len(ps)) /*count*/, ps, int(count)%len(ps) /*remainder*/)
if err != nil {
errChan <- err
return
}
}
log.WithField("peer", pid).WithField("count", len(resp)).Debug("Received blocks")
blocksChan <- resp
}(i, pid)
}
var unionRespBlocks []*eth.SignedBeaconBlock
for {
select {
case err := <-errChan:
return nil, err
case resp, ok := <-blocksChan:
if ok {
// if this synchronization becomes a bottleneck:
// think about immediately allocating space for all peers in unionRespBlocks,
// and write without synchronization
unionRespBlocks = append(unionRespBlocks, resp...)
} else {
return unionRespBlocks, nil
}
}
}
}
startBlock := s.chain.HeadSlot() + 1
skippedBlocks := blockBatchSize * uint64(lastEmptyRequests*len(peers))
if startBlock+skippedBlocks > helpers.StartSlot(finalizedEpoch+1) {
log.WithField("finalizedEpoch", finalizedEpoch).Debug("Requested block range is greater than the finalized epoch")
break
}
blocks, err := request(
s.chain.HeadSlot()+1, // start
1, // step
blockBatchSize, // count
peers, // peers
0, // remainder
)
if err != nil {
log.WithError(err).Error("Round robing sync request failed")
continue
}
// Since the block responses were appended to the list, we must sort them in order to
// process sequentially. This method doesn't make much wall time compared to block
// processing.
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Block.Slot < blocks[j].Block.Slot
})
for _, blk := range blocks {
s.logSyncStatus(genesis, blk.Block, peers, counter)
if !s.db.HasBlock(ctx, bytesutil.ToBytes32(blk.Block.ParentRoot)) {
log.Debugf("Beacon node doesn't have a block in db with root %#x", blk.Block.ParentRoot)
continue
}
if featureconfig.Get().InitSyncNoVerify {
if err := s.chain.ReceiveBlockNoVerify(ctx, blk); err != nil {
return err
}
} else {
if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, blk); err != nil {
return err
}
}
}
// If there were no blocks in the last request range, increment the counter so the same
// range isn't requested again on the next loop as the headSlot didn't change.
if len(blocks) == 0 {
lastEmptyRequests++
} else {
lastEmptyRequests = 0
}
}
log.Debug("Synced to finalized epoch - now syncing blocks up to current head")
if s.chain.HeadSlot() == helpers.SlotsSince(genesis) {
return nil
}
// Step 2 - sync to head from any single peer.
// This step might need to be improved for cases where there has been a long period since
// finality. This step is less important than syncing to finality in terms of threat
// mitigation. We are already convinced that we are on the correct finalized chain. Any blocks
// we receive there after must build on the finalized chain or be considered invalid during
// fork choice resolution / block processing.
best := s.bestPeer()
root, _, _ := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, helpers.SlotToEpoch(s.chain.HeadSlot()))
// if no best peer exists, retry until a new best peer is found.
for len(best) == 0 {
time.Sleep(refreshTime)
best = s.bestPeer()
root, _, _ = s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, helpers.SlotToEpoch(s.chain.HeadSlot()))
}
for head := helpers.SlotsSince(genesis); s.chain.HeadSlot() < head; {
req := &p2ppb.BeaconBlocksByRangeRequest{
HeadBlockRoot: root,
StartSlot: s.chain.HeadSlot() + 1,
Count: mathutil.Min(helpers.SlotsSince(genesis)-s.chain.HeadSlot()+1, 256),
Step: 1,
}
log.WithField("req", req).WithField("peer", best.Pretty()).Debug(
"Sending batch block request",
)
resp, err := s.requestBlocks(ctx, req, best)
if err != nil {
return err
}
for _, blk := range resp {
s.logSyncStatus(genesis, blk.Block, []peer.ID{best}, counter)
if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, blk); err != nil {
log.WithError(err).Error("Failed to process block, exiting init sync")
return nil
}
}
if len(resp) == 0 {
break
}
}
return nil
}
// requestBlocks by range to a specific peer.
func (s *Service) requestBlocks(ctx context.Context, req *p2ppb.BeaconBlocksByRangeRequest, pid peer.ID) ([]*eth.SignedBeaconBlock, error) {
if s.blocksRateLimiter.Remaining(pid.String()) < int64(req.Count) {
log.WithField("peer", pid).Debug("Slowing down for rate limit")
time.Sleep(s.blocksRateLimiter.TillEmpty(pid.String()))
}
s.blocksRateLimiter.Add(pid.String(), int64(req.Count))
log.WithFields(logrus.Fields{
"peer": pid,
"start": req.StartSlot,
"count": req.Count,
"step": req.Step,
"head": fmt.Sprintf("%#x", req.HeadBlockRoot),
}).Debug("Requesting blocks")
stream, err := s.p2p.Send(ctx, req, pid)
if err != nil {
return nil, errors.Wrap(err, "failed to send request to peer")
}
defer stream.Close()
resp := make([]*eth.SignedBeaconBlock, 0, req.Count)
for {
blk, err := prysmsync.ReadChunkedBlock(stream, s.p2p)
if err == io.EOF {
break
}
if err != nil {
return nil, errors.Wrap(err, "failed to read chunked block")
}
resp = append(resp, blk)
}
return resp, nil
}
// highestFinalizedEpoch as reported by peers. This is the absolute highest finalized epoch as
// reported by peers.
func (s *Service) highestFinalizedEpoch() uint64 {
_, epoch, _ := s.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, helpers.SlotToEpoch(s.chain.HeadSlot()))
return epoch
}
// bestPeer returns the peer ID of the peer reporting the highest head slot.
func (s *Service) bestPeer() peer.ID {
var best peer.ID
var bestSlot uint64
for _, k := range s.p2p.Peers().Connected() {
peerChainState, err := s.p2p.Peers().ChainState(k)
if err == nil && peerChainState != nil && peerChainState.HeadSlot >= bestSlot {
bestSlot = peerChainState.HeadSlot
best = k
}
}
return best
}
// logSyncStatus and increment block processing counter.
func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, syncingPeers []peer.ID, counter *ratecounter.RateCounter) {
counter.Incr(1)
rate := float64(counter.Rate()) / counterSeconds
if rate == 0 {
rate = 1
}
timeRemaining := time.Duration(float64(helpers.SlotsSince(genesis)-blk.Slot)/rate) * time.Second
log.WithField(
"peers",
fmt.Sprintf("%d/%d", len(syncingPeers), len(s.p2p.Peers().Connected())),
).WithField(
"blocksPerSecond",
fmt.Sprintf("%.1f", rate),
).Infof(
"Processing block %d/%d - estimated time remaining %s",
blk.Slot,
helpers.SlotsSince(genesis),
timeRemaining,
)
}