mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-17 15:28:45 +00:00
7f7866ff2a
* Starting a quick PoC * Rate limit to one epoch worth of blocks in memory * Proof of concept working * Quick comment out * Save previous finalized checkpoint * Test * Minor fixes * More run time fixes * Remove panic * Feature flag * Removed unused methods * Fixed tests * E2e test * comment * Compatible with current initial sync * Starting * New cache * Cache getters and setters * It should be part of state gen * Need to use cache for DB * Don't have to use finalized state * Rm unused file * some changes to memory mgmt when using mempool * More run time fixes * Can sync to head * Feedback * Revert "some changes to memory mgmt when using mempool" This reverts commit f5b3e7ff4714fef9f0397007f519a45fa259ad24. * Fixed sync tests * Fixed existing tests * Test for state summary getter * Gaz * Fix kafka passthrough * Fixed inputs * Gaz * Fixed build * Fixed visibility * Trying without the ignore * Didn't work.. * Fix kafka Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> Co-authored-by: Preston Van Loon <preston@prysmaticlabs.com>
165 lines
5.5 KiB
Go
165 lines
5.5 KiB
Go
package sync
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/kevinms/leakybucket-go"
|
|
"github.com/pkg/errors"
|
|
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
|
|
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
|
|
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/db"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/operations/voluntaryexits"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
|
|
"github.com/prysmaticlabs/prysm/shared"
|
|
"github.com/prysmaticlabs/prysm/shared/params"
|
|
"github.com/prysmaticlabs/prysm/shared/runutil"
|
|
)
|
|
|
|
var _ = shared.Service(&Service{})
|
|
|
|
const allowedBlocksPerSecond = 32.0
|
|
const allowedBlocksBurst = 10 * allowedBlocksPerSecond
|
|
|
|
// refresh enr every quarter of an epoch
|
|
var refreshRate = (params.BeaconConfig().SecondsPerSlot * params.BeaconConfig().SlotsPerEpoch) / 4
|
|
|
|
// Config to set up the regular sync service.
|
|
type Config struct {
|
|
P2P p2p.P2P
|
|
DB db.NoHeadAccessDatabase
|
|
AttPool attestations.Pool
|
|
ExitPool *voluntaryexits.Pool
|
|
SlashingPool *slashings.Pool
|
|
Chain blockchainService
|
|
InitialSync Checker
|
|
StateNotifier statefeed.Notifier
|
|
BlockNotifier blockfeed.Notifier
|
|
AttestationNotifier operation.Notifier
|
|
StateSummaryCache *cache.StateSummaryCache
|
|
}
|
|
|
|
// This defines the interface for interacting with block chain service
|
|
type blockchainService interface {
|
|
blockchain.BlockReceiver
|
|
blockchain.HeadFetcher
|
|
blockchain.FinalizationFetcher
|
|
blockchain.ForkFetcher
|
|
blockchain.AttestationReceiver
|
|
blockchain.TimeFetcher
|
|
}
|
|
|
|
// NewRegularSync service.
|
|
func NewRegularSync(cfg *Config) *Service {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
r := &Service{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
db: cfg.DB,
|
|
p2p: cfg.P2P,
|
|
attPool: cfg.AttPool,
|
|
exitPool: cfg.ExitPool,
|
|
slashingPool: cfg.SlashingPool,
|
|
chain: cfg.Chain,
|
|
initialSync: cfg.InitialSync,
|
|
attestationNotifier: cfg.AttestationNotifier,
|
|
slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock),
|
|
seenPendingBlocks: make(map[[32]byte]bool),
|
|
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof),
|
|
stateNotifier: cfg.StateNotifier,
|
|
blockNotifier: cfg.BlockNotifier,
|
|
stateSummaryCache: cfg.StateSummaryCache,
|
|
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */),
|
|
}
|
|
|
|
r.registerRPCHandlers()
|
|
r.registerSubscribers()
|
|
|
|
return r
|
|
}
|
|
|
|
// Service is responsible for handling all run time p2p related operations as the
|
|
// main entry point for network messages.
|
|
type Service struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
p2p p2p.P2P
|
|
db db.NoHeadAccessDatabase
|
|
attPool attestations.Pool
|
|
exitPool *voluntaryexits.Pool
|
|
slashingPool *slashings.Pool
|
|
chain blockchainService
|
|
slotToPendingBlocks map[uint64]*ethpb.SignedBeaconBlock
|
|
seenPendingBlocks map[[32]byte]bool
|
|
blkRootToPendingAtts map[[32]byte][]*ethpb.AggregateAttestationAndProof
|
|
pendingAttsLock sync.RWMutex
|
|
pendingQueueLock sync.RWMutex
|
|
chainStarted bool
|
|
initialSync Checker
|
|
validateBlockLock sync.RWMutex
|
|
stateNotifier statefeed.Notifier
|
|
blockNotifier blockfeed.Notifier
|
|
blocksRateLimiter *leakybucket.Collector
|
|
attestationNotifier operation.Notifier
|
|
stateSummaryCache *cache.StateSummaryCache
|
|
}
|
|
|
|
// Start the regular sync service.
|
|
func (r *Service) Start() {
|
|
r.p2p.AddConnectionHandler(r.sendRPCStatusRequest)
|
|
r.p2p.AddDisconnectionHandler(r.removeDisconnectedPeerStatus)
|
|
r.processPendingBlocksQueue()
|
|
r.processPendingAttsQueue()
|
|
r.maintainPeerStatuses()
|
|
r.resyncIfBehind()
|
|
r.refreshENR()
|
|
}
|
|
|
|
// Stop the regular sync service.
|
|
func (r *Service) Stop() error {
|
|
defer r.cancel()
|
|
return nil
|
|
}
|
|
|
|
// Status of the currently running regular sync service.
|
|
func (r *Service) Status() error {
|
|
if r.chainStarted {
|
|
if r.initialSync.Syncing() {
|
|
return errors.New("waiting for initial sync")
|
|
}
|
|
// If our head slot is on a previous epoch and our peers are reporting their head block are
|
|
// in the most recent epoch, then we might be out of sync.
|
|
if headEpoch := helpers.SlotToEpoch(r.chain.HeadSlot()); headEpoch < helpers.SlotToEpoch(r.chain.CurrentSlot())-1 &&
|
|
headEpoch < r.p2p.Peers().CurrentEpoch()-1 {
|
|
return errors.New("out of sync")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Checker defines a struct which can verify whether a node is currently
|
|
// synchronizing a chain with the rest of peers in the network.
|
|
type Checker interface {
|
|
Syncing() bool
|
|
Status() error
|
|
Resync() error
|
|
}
|
|
|
|
// This runs every epoch to refresh the current node's ENR.
|
|
func (r *Service) refreshENR() {
|
|
ctx := context.Background()
|
|
refreshTime := time.Duration(refreshRate) * time.Second
|
|
runutil.RunEvery(ctx, refreshTime, func() {
|
|
currentEpoch := helpers.SlotToEpoch(helpers.SlotsSince(r.chain.GenesisTime()))
|
|
r.p2p.RefreshENR(currentEpoch)
|
|
})
|
|
}
|