prysm-pulse/beacon-chain/sync/sync-querier/service.go
Nishant Das 16b04699d0 Allow Initial Sync to Work with Simulator (#669)
* polling interval

* adding proto message

* changing proto messages

* changing naming

* adding slot functionality

* initial sync working

* new changes

* more sync fixes

* its working now

* finally working

* add tests

* fix tests

* tests

* adding tests

* lint

* log checks

* making changes to simulator

* update logs

* fix tests

* get sync to work with crystallized state

* fixing race

* making requested changes

* unexport

* documentation

* gazelle and fix merge conflicts

* adding repeated requests

* fix lint

* adding new clock , db methods, and util func

* revert change to test

* gazelle

* add in test

* gazelle

* finally working

* save slot

* fix lint and constant
2018-11-21 10:00:36 -08:00

142 lines
3.3 KiB
Go

package syncquerier
import (
"context"
"time"
"github.com/golang/protobuf/proto"
"github.com/prysmaticlabs/prysm/beacon-chain/types"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/p2p"
"github.com/sirupsen/logrus"
)
var log = logrus.WithField("prefix", "syncQuerier")
// Config defines the configurable properties of SyncQuerier.
//
type Config struct {
ResponseBufferSize int
P2P p2pAPI
BeaconDB beaconDB
}
// DefaultConfig provides the default configuration for a sync service.
// ResponseBufferSize determines that buffer size of the `responseBuf` channel.
func DefaultConfig() Config {
return Config{
ResponseBufferSize: 100,
}
}
type p2pAPI interface {
Subscribe(msg proto.Message, channel chan p2p.Message) event.Subscription
Send(msg proto.Message, peer p2p.Peer)
Broadcast(msg proto.Message)
}
type beaconDB interface {
SaveBlock(*types.Block) error
GetChainHead() (*types.Block, error)
}
// SyncQuerier defines the main class in this package.
// See the package comments for a general description of the service's functions.
type SyncQuerier struct {
ctx context.Context
cancel context.CancelFunc
p2p p2pAPI
db beaconDB
curentHeadSlot uint64
currentHeadHash []byte
responseBuf chan p2p.Message
}
// NewSyncQuerierService constructs a new Sync Querier Service.
// This method is normally called by the main node.
func NewSyncQuerierService(ctx context.Context,
cfg *Config,
) *SyncQuerier {
ctx, cancel := context.WithCancel(ctx)
responseBuf := make(chan p2p.Message, cfg.ResponseBufferSize)
return &SyncQuerier{
ctx: ctx,
cancel: cancel,
p2p: cfg.P2P,
db: cfg.BeaconDB,
responseBuf: responseBuf,
}
}
// Start begins the goroutine.
func (s *SyncQuerier) Start() {
s.run()
}
// Stop kills the sync querier goroutine.
func (s *SyncQuerier) Stop() error {
log.Info("Stopping service")
s.cancel()
return nil
}
func (s *SyncQuerier) run() {
responseSub := s.p2p.Subscribe(&pb.ChainHeadResponse{}, s.responseBuf)
// Ticker so that service will keep on requesting for chain head
// until they get a response.
ticker := time.NewTicker(1 * time.Second)
defer func() {
responseSub.Unsubscribe()
close(s.responseBuf)
ticker.Stop()
}()
s.RequestLatestHead()
for {
select {
case <-s.ctx.Done():
log.Info("Exiting goroutine")
return
case <-ticker.C:
s.RequestLatestHead()
case msg := <-s.responseBuf:
response := msg.Data.(*pb.ChainHeadResponse)
log.Infof("Latest Chain head is at slot: %d and hash %#x", response.Slot, response.Hash)
s.curentHeadSlot = response.Slot
s.currentHeadHash = response.Hash
ticker.Stop()
responseSub.Unsubscribe()
s.cancel()
}
}
}
// RequestLatestHead broadcasts out a request for all
// the latest chain heads from the node's peers.
func (s *SyncQuerier) RequestLatestHead() {
request := &pb.ChainHeadRequest{}
s.p2p.Broadcast(request)
}
// IsSynced checks if the node is cuurently synced with the
// rest of the network.
func (s *SyncQuerier) IsSynced() (bool, error) {
block, err := s.db.GetChainHead()
if err != nil {
return false, err
}
if block.SlotNumber() >= s.curentHeadSlot {
return true, nil
}
return false, nil
}