mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2025-01-11 12:10:05 +00:00
16b04699d0
* polling interval * adding proto message * changing proto messages * changing naming * adding slot functionality * initial sync working * new changes * more sync fixes * its working now * finally working * add tests * fix tests * tests * adding tests * lint * log checks * making changes to simulator * update logs * fix tests * get sync to work with crystallized state * fixing race * making requested changes * unexport * documentation * gazelle and fix merge conflicts * adding repeated requests * fix lint * adding new clock , db methods, and util func * revert change to test * gazelle * add in test * gazelle * finally working * save slot * fix lint and constant
142 lines
3.3 KiB
Go
142 lines
3.3 KiB
Go
package syncquerier
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/prysmaticlabs/prysm/beacon-chain/types"
|
|
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
|
"github.com/prysmaticlabs/prysm/shared/event"
|
|
"github.com/prysmaticlabs/prysm/shared/p2p"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
var log = logrus.WithField("prefix", "syncQuerier")
|
|
|
|
// Config defines the configurable properties of SyncQuerier.
|
|
//
|
|
type Config struct {
|
|
ResponseBufferSize int
|
|
P2P p2pAPI
|
|
BeaconDB beaconDB
|
|
}
|
|
|
|
// DefaultConfig provides the default configuration for a sync service.
|
|
// ResponseBufferSize determines that buffer size of the `responseBuf` channel.
|
|
func DefaultConfig() Config {
|
|
return Config{
|
|
ResponseBufferSize: 100,
|
|
}
|
|
}
|
|
|
|
type p2pAPI interface {
|
|
Subscribe(msg proto.Message, channel chan p2p.Message) event.Subscription
|
|
Send(msg proto.Message, peer p2p.Peer)
|
|
Broadcast(msg proto.Message)
|
|
}
|
|
|
|
type beaconDB interface {
|
|
SaveBlock(*types.Block) error
|
|
GetChainHead() (*types.Block, error)
|
|
}
|
|
|
|
// SyncQuerier defines the main class in this package.
|
|
// See the package comments for a general description of the service's functions.
|
|
type SyncQuerier struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
p2p p2pAPI
|
|
db beaconDB
|
|
curentHeadSlot uint64
|
|
currentHeadHash []byte
|
|
responseBuf chan p2p.Message
|
|
}
|
|
|
|
// NewSyncQuerierService constructs a new Sync Querier Service.
|
|
// This method is normally called by the main node.
|
|
func NewSyncQuerierService(ctx context.Context,
|
|
cfg *Config,
|
|
) *SyncQuerier {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
responseBuf := make(chan p2p.Message, cfg.ResponseBufferSize)
|
|
|
|
return &SyncQuerier{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
p2p: cfg.P2P,
|
|
db: cfg.BeaconDB,
|
|
responseBuf: responseBuf,
|
|
}
|
|
}
|
|
|
|
// Start begins the goroutine.
|
|
func (s *SyncQuerier) Start() {
|
|
s.run()
|
|
}
|
|
|
|
// Stop kills the sync querier goroutine.
|
|
func (s *SyncQuerier) Stop() error {
|
|
log.Info("Stopping service")
|
|
s.cancel()
|
|
return nil
|
|
}
|
|
|
|
func (s *SyncQuerier) run() {
|
|
responseSub := s.p2p.Subscribe(&pb.ChainHeadResponse{}, s.responseBuf)
|
|
|
|
// Ticker so that service will keep on requesting for chain head
|
|
// until they get a response.
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
|
|
defer func() {
|
|
responseSub.Unsubscribe()
|
|
close(s.responseBuf)
|
|
ticker.Stop()
|
|
}()
|
|
|
|
s.RequestLatestHead()
|
|
|
|
for {
|
|
select {
|
|
case <-s.ctx.Done():
|
|
log.Info("Exiting goroutine")
|
|
return
|
|
case <-ticker.C:
|
|
s.RequestLatestHead()
|
|
case msg := <-s.responseBuf:
|
|
response := msg.Data.(*pb.ChainHeadResponse)
|
|
log.Infof("Latest Chain head is at slot: %d and hash %#x", response.Slot, response.Hash)
|
|
s.curentHeadSlot = response.Slot
|
|
s.currentHeadHash = response.Hash
|
|
|
|
ticker.Stop()
|
|
responseSub.Unsubscribe()
|
|
s.cancel()
|
|
}
|
|
}
|
|
}
|
|
|
|
// RequestLatestHead broadcasts out a request for all
|
|
// the latest chain heads from the node's peers.
|
|
func (s *SyncQuerier) RequestLatestHead() {
|
|
request := &pb.ChainHeadRequest{}
|
|
s.p2p.Broadcast(request)
|
|
}
|
|
|
|
// IsSynced checks if the node is cuurently synced with the
|
|
// rest of the network.
|
|
func (s *SyncQuerier) IsSynced() (bool, error) {
|
|
block, err := s.db.GetChainHead()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if block.SlotNumber() >= s.curentHeadSlot {
|
|
return true, nil
|
|
}
|
|
|
|
return false, nil
|
|
}
|