prysm-pulse/beacon-chain/sync/service.go
terence tsao 7f7866ff2a
Micro optimizations on new-state-mgmt service for initial syncing (#5241)
* Starting a quick PoC

* Rate limit to one epoch worth of blocks in memory

* Proof of concept working

* Quick comment out

* Save previous finalized checkpoint

* Test

* Minor fixes

* More run time fixes

* Remove panic

* Feature flag

* Removed unused methods

* Fixed tests

* E2e test

* comment

* Compatible with current initial sync

* Starting

* New cache

* Cache getters and setters

* It should be part of state gen

* Need to use cache for DB

* Don't have to use finalized state

* Rm unused file

* some changes to memory mgmt when using mempool

* More run time fixes

* Can sync to head

* Feedback

* Revert "some changes to memory mgmt when using mempool"

This reverts commit f5b3e7ff4714fef9f0397007f519a45fa259ad24.

* Fixed sync tests

* Fixed existing tests

* Test for state summary getter

* Gaz

* Fix kafka passthrough

* Fixed inputs

* Gaz

* Fixed build

* Fixed visibility

* Trying without the ignore

* Didn't work..

* Fix kafka

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
2020-03-30 17:10:45 -05:00

165 lines
5.5 KiB
Go

package sync
import (
"context"
"sync"
"time"
"github.com/kevinms/leakybucket-go"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/voluntaryexits"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/runutil"
)
var _ = shared.Service(&Service{})
const allowedBlocksPerSecond = 32.0
const allowedBlocksBurst = 10 * allowedBlocksPerSecond
// refresh enr every quarter of an epoch
var refreshRate = (params.BeaconConfig().SecondsPerSlot * params.BeaconConfig().SlotsPerEpoch) / 4
// Config to set up the regular sync service.
type Config struct {
P2P p2p.P2P
DB db.NoHeadAccessDatabase
AttPool attestations.Pool
ExitPool *voluntaryexits.Pool
SlashingPool *slashings.Pool
Chain blockchainService
InitialSync Checker
StateNotifier statefeed.Notifier
BlockNotifier blockfeed.Notifier
AttestationNotifier operation.Notifier
StateSummaryCache *cache.StateSummaryCache
}
// This defines the interface for interacting with block chain service
type blockchainService interface {
blockchain.BlockReceiver
blockchain.HeadFetcher
blockchain.FinalizationFetcher
blockchain.ForkFetcher
blockchain.AttestationReceiver
blockchain.TimeFetcher
}
// NewRegularSync service.
func NewRegularSync(cfg *Config) *Service {
ctx, cancel := context.WithCancel(context.Background())
r := &Service{
ctx: ctx,
cancel: cancel,
db: cfg.DB,
p2p: cfg.P2P,
attPool: cfg.AttPool,
exitPool: cfg.ExitPool,
slashingPool: cfg.SlashingPool,
chain: cfg.Chain,
initialSync: cfg.InitialSync,
attestationNotifier: cfg.AttestationNotifier,
slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof),
stateNotifier: cfg.StateNotifier,
blockNotifier: cfg.BlockNotifier,
stateSummaryCache: cfg.StateSummaryCache,
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */),
}
r.registerRPCHandlers()
r.registerSubscribers()
return r
}
// Service is responsible for handling all run time p2p related operations as the
// main entry point for network messages.
type Service struct {
ctx context.Context
cancel context.CancelFunc
p2p p2p.P2P
db db.NoHeadAccessDatabase
attPool attestations.Pool
exitPool *voluntaryexits.Pool
slashingPool *slashings.Pool
chain blockchainService
slotToPendingBlocks map[uint64]*ethpb.SignedBeaconBlock
seenPendingBlocks map[[32]byte]bool
blkRootToPendingAtts map[[32]byte][]*ethpb.AggregateAttestationAndProof
pendingAttsLock sync.RWMutex
pendingQueueLock sync.RWMutex
chainStarted bool
initialSync Checker
validateBlockLock sync.RWMutex
stateNotifier statefeed.Notifier
blockNotifier blockfeed.Notifier
blocksRateLimiter *leakybucket.Collector
attestationNotifier operation.Notifier
stateSummaryCache *cache.StateSummaryCache
}
// Start the regular sync service.
func (r *Service) Start() {
r.p2p.AddConnectionHandler(r.sendRPCStatusRequest)
r.p2p.AddDisconnectionHandler(r.removeDisconnectedPeerStatus)
r.processPendingBlocksQueue()
r.processPendingAttsQueue()
r.maintainPeerStatuses()
r.resyncIfBehind()
r.refreshENR()
}
// Stop the regular sync service.
func (r *Service) Stop() error {
defer r.cancel()
return nil
}
// Status of the currently running regular sync service.
func (r *Service) Status() error {
if r.chainStarted {
if r.initialSync.Syncing() {
return errors.New("waiting for initial sync")
}
// If our head slot is on a previous epoch and our peers are reporting their head block are
// in the most recent epoch, then we might be out of sync.
if headEpoch := helpers.SlotToEpoch(r.chain.HeadSlot()); headEpoch < helpers.SlotToEpoch(r.chain.CurrentSlot())-1 &&
headEpoch < r.p2p.Peers().CurrentEpoch()-1 {
return errors.New("out of sync")
}
}
return nil
}
// Checker defines a struct which can verify whether a node is currently
// synchronizing a chain with the rest of peers in the network.
type Checker interface {
Syncing() bool
Status() error
Resync() error
}
// This runs every epoch to refresh the current node's ENR.
func (r *Service) refreshENR() {
ctx := context.Background()
refreshTime := time.Duration(refreshRate) * time.Second
runutil.RunEvery(ctx, refreshTime, func() {
currentEpoch := helpers.SlotToEpoch(helpers.SlotsSince(r.chain.GenesisTime()))
r.p2p.RefreshENR(currentEpoch)
})
}