mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-22 03:30:35 +00:00
download checkpoint sync origin blobs in init-sync (#13665)
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
This commit is contained in:
parent
d9d2ee75de
commit
0132c1b17d
@ -45,6 +45,7 @@ go_library(
|
||||
"//math:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//runtime:go_default_library",
|
||||
"//runtime/version:go_default_library",
|
||||
"//time:go_default_library",
|
||||
"//time/slots:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
|
||||
|
@ -5,22 +5,31 @@ package initialsync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/paulbellamy/ratecounter"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/async/abool"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
|
||||
blockfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/block"
|
||||
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
|
||||
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
|
||||
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v5/crypto/rand"
|
||||
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/runtime"
|
||||
"github.com/prysmaticlabs/prysm/v5/runtime/version"
|
||||
prysmTime "github.com/prysmaticlabs/prysm/v5/time"
|
||||
"github.com/prysmaticlabs/prysm/v5/time/slots"
|
||||
"github.com/sirupsen/logrus"
|
||||
@ -58,6 +67,7 @@ type Service struct {
|
||||
clock *startup.Clock
|
||||
verifierWaiter *verification.InitializerWaiter
|
||||
newBlobVerifier verification.NewBlobVerifier
|
||||
ctxMap sync.ContextByteVersions
|
||||
}
|
||||
|
||||
// Option is a functional option for the initial-sync Service.
|
||||
@ -124,6 +134,13 @@ func (s *Service) Start() {
|
||||
}
|
||||
s.clock = clock
|
||||
log.Info("Received state initialized event")
|
||||
ctxMap, err := sync.ContextByteVersionsForValRoot(clock.GenesisValidatorsRoot())
|
||||
if err != nil {
|
||||
log.WithField("genesisValidatorRoot", clock.GenesisValidatorsRoot()).
|
||||
WithError(err).Error("unable to initialize context version map using genesis validator")
|
||||
return
|
||||
}
|
||||
s.ctxMap = ctxMap
|
||||
|
||||
v, err := s.verifierWaiter.WaitForInitializer(s.ctx)
|
||||
if err != nil {
|
||||
@ -162,7 +179,15 @@ func (s *Service) Start() {
|
||||
s.markSynced()
|
||||
return
|
||||
}
|
||||
s.waitForMinimumPeers()
|
||||
peers, err := s.waitForMinimumPeers()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Error waiting for minimum number of peers")
|
||||
return
|
||||
}
|
||||
if err := s.fetchOriginBlobs(peers); err != nil {
|
||||
log.WithError(err).Error("Failed to fetch missing blobs for checkpoint origin")
|
||||
return
|
||||
}
|
||||
if err := s.roundRobinSync(gt); err != nil {
|
||||
if errors.Is(s.ctx.Err(), context.Canceled) {
|
||||
return
|
||||
@ -215,7 +240,10 @@ func (s *Service) Resync() error {
|
||||
defer func() { s.synced.Set() }() // Reset it at the end of the method.
|
||||
genesis := time.Unix(int64(headState.GenesisTime()), 0) // lint:ignore uintcast -- Genesis time will not exceed int64 in your lifetime.
|
||||
|
||||
s.waitForMinimumPeers()
|
||||
_, err = s.waitForMinimumPeers()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = s.roundRobinSync(genesis); err != nil {
|
||||
log = log.WithError(err)
|
||||
}
|
||||
@ -223,16 +251,19 @@ func (s *Service) Resync() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) waitForMinimumPeers() {
|
||||
func (s *Service) waitForMinimumPeers() ([]peer.ID, error) {
|
||||
required := params.BeaconConfig().MaxPeersToSync
|
||||
if flags.Get().MinimumSyncPeers < required {
|
||||
required = flags.Get().MinimumSyncPeers
|
||||
}
|
||||
for {
|
||||
if s.ctx.Err() != nil {
|
||||
return nil, s.ctx.Err()
|
||||
}
|
||||
cp := s.cfg.Chain.FinalizedCheckpt()
|
||||
_, peers := s.cfg.P2P.Peers().BestNonFinalized(flags.Get().MinimumSyncPeers, cp.Epoch)
|
||||
if len(peers) >= required {
|
||||
break
|
||||
return peers, nil
|
||||
}
|
||||
log.WithFields(logrus.Fields{
|
||||
"suitable": len(peers),
|
||||
@ -247,3 +278,75 @@ func (s *Service) markSynced() {
|
||||
s.synced.Set()
|
||||
close(s.cfg.InitialSyncComplete)
|
||||
}
|
||||
|
||||
func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
|
||||
r, err := s.cfg.DB.OriginCheckpointBlockRoot(s.ctx)
|
||||
if errors.Is(err, db.ErrNotFoundOriginBlockRoot) {
|
||||
return nil
|
||||
}
|
||||
blk, err := s.cfg.DB.Block(s.ctx, r)
|
||||
if err != nil {
|
||||
log.WithField("root", r).Error("Block for checkpoint sync origin root not found in db")
|
||||
return err
|
||||
}
|
||||
if blk.Version() < version.Deneb {
|
||||
return nil
|
||||
}
|
||||
cmts, err := blk.Block().Body().BlobKzgCommitments()
|
||||
if err != nil {
|
||||
log.WithField("root", r).Error("Error reading commitments from checkpoint sync origin block")
|
||||
return err
|
||||
}
|
||||
if len(cmts) == 0 {
|
||||
return nil
|
||||
}
|
||||
rob, err := blocks.NewROBlockWithRoot(blk, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
onDisk, err := s.cfg.BlobStorage.Indices(r)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error checking existing blobs for checkpoint sync bloc root %#x", r)
|
||||
}
|
||||
req := make(p2ptypes.BlobSidecarsByRootReq, 0, len(cmts))
|
||||
for i := range cmts {
|
||||
if onDisk[i] {
|
||||
continue
|
||||
}
|
||||
req = append(req, ð.BlobIdentifier{BlockRoot: r[:], Index: uint64(i)})
|
||||
}
|
||||
if len(req) == 0 {
|
||||
log.WithField("nBlobs", len(cmts)).WithField("root", fmt.Sprintf("%#x", r)).Debug("All checkpoint block blobs are present")
|
||||
return nil
|
||||
}
|
||||
shufflePeers(pids)
|
||||
for i := range pids {
|
||||
sidecars, err := sync.SendBlobSidecarByRoot(s.ctx, s.clock, s.cfg.P2P, pids[i], s.ctxMap, &req)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if len(sidecars) != len(req) {
|
||||
continue
|
||||
}
|
||||
bv := newBlobBatchVerifier(s.newBlobVerifier)
|
||||
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
|
||||
current := s.clock.CurrentSlot()
|
||||
if err := avs.Persist(current, sidecars...); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := avs.IsDataAvailable(s.ctx, current, rob); err != nil {
|
||||
log.WithField("root", fmt.Sprintf("%#x", r)).WithField("peerID", pids[i]).Warn("Blobs from peer for origin block were unusable")
|
||||
continue
|
||||
}
|
||||
log.WithField("nBlobs", len(sidecars)).WithField("root", fmt.Sprintf("%#x", r)).Info("Successfully downloaded blobs for checkpoint sync block")
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("no connected peer able to provide blobs for checkpoint sync block %#x", r)
|
||||
}
|
||||
|
||||
func shufflePeers(pids []peer.ID) {
|
||||
rg := rand.NewGenerator()
|
||||
rg.Shuffle(len(pids), func(i, j int) {
|
||||
pids[i], pids[j] = pids[j], pids[i]
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user