package sync import ( "context" "math/big" "time" "github.com/ethereum/go-ethereum/common" peer "github.com/libp2p/go-libp2p-peer" "github.com/prysmaticlabs/prysm/beacon-chain/db" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/bytesutil" deprecatedp2p "github.com/prysmaticlabs/prysm/shared/deprecated-p2p" "github.com/prysmaticlabs/prysm/shared/event" "github.com/prysmaticlabs/prysm/shared/params" "github.com/sirupsen/logrus" ) var queryLog = logrus.WithField("prefix", "syncQuerier") var logQueryInterval = 1 * time.Second type powChainService interface { HasChainStarted() bool BlockExists(ctx context.Context, hash common.Hash) (bool, *big.Int, error) ChainStartFeed() *event.Feed AreAllDepositsProcessed() (bool, error) } // QuerierConfig defines the configurable properties of SyncQuerier. type QuerierConfig struct { ResponseBufferSize int P2P p2pAPI BeaconDB *db.BeaconDB PowChain powChainService CurrentHeadSlot uint64 ChainService chainService } // DefaultQuerierConfig provides the default configuration for a sync service. // ResponseBufferSize determines that buffer size of the `responseBuf` channel. func DefaultQuerierConfig() *QuerierConfig { return &QuerierConfig{ ResponseBufferSize: params.BeaconConfig().DefaultBufferSize, } } // Querier defines the main class in this package. // See the package comments for a general description of the service's functions. type Querier struct { ctx context.Context cancel context.CancelFunc p2p p2pAPI db *db.BeaconDB chainService chainService currentHeadSlot uint64 currentStateRoot []byte currentFinalizedStateRoot [32]byte responseBuf chan deprecatedp2p.Message chainStartBuf chan time.Time powchain powChainService chainStarted bool atGenesis bool bestPeer peer.ID chainHeadResponses map[peer.ID]*pb.ChainHeadResponse canonicalBlockRoot []byte finalizedBlockRoot []byte } // NewQuerierService constructs a new Sync Querier Service. // This method is normally called by the main node. func NewQuerierService(ctx context.Context, cfg *QuerierConfig, ) *Querier { ctx, cancel := context.WithCancel(ctx) responseBuf := make(chan deprecatedp2p.Message, cfg.ResponseBufferSize) return &Querier{ ctx: ctx, cancel: cancel, p2p: cfg.P2P, db: cfg.BeaconDB, chainService: cfg.ChainService, responseBuf: responseBuf, currentHeadSlot: cfg.CurrentHeadSlot, chainStarted: false, atGenesis: true, powchain: cfg.PowChain, chainStartBuf: make(chan time.Time, 1), chainHeadResponses: make(map[peer.ID]*pb.ChainHeadResponse), } } // Start begins the goroutine. func (q *Querier) Start() { q.waitForAllDepositsToBeProcessed() hasChainStarted := q.powchain.HasChainStarted() q.chainStarted = hasChainStarted q.atGenesis = !hasChainStarted bState, err := q.db.HeadState(q.ctx) if err != nil { queryLog.Errorf("Unable to retrieve beacon state %v", err) } // we handle both the cases where either chainstart has not occurred or // if beacon state has been initialized. If chain start has occurred but // beacon state has not been initialized we wait for the POW chain service // to accumulate all the deposits and process them. if !hasChainStarted || bState == nil { q.listenForStateInitialization() // Return, if the node is at genesis. if q.atGenesis { return } } q.run() } // Stop kills the sync querier goroutine. func (q *Querier) Stop() error { queryLog.Info("Stopping service") q.cancel() return nil } func (q *Querier) listenForStateInitialization() { sub := q.chainService.StateInitializedFeed().Subscribe(q.chainStartBuf) defer sub.Unsubscribe() for { select { case <-q.chainStartBuf: queryLog.Info("State has been initialized") q.chainStarted = true return case <-sub.Err(): log.Fatal("Subscriber closed, unable to continue on with sync") return case <-q.ctx.Done(): log.Debug("RPC context closed, exiting goroutine") return } } } func (q *Querier) run() { responseSub := q.p2p.Subscribe(&pb.ChainHeadResponse{}, q.responseBuf) // Ticker so that service will keep on requesting for chain head // until they get a response. ticker := time.NewTicker(1 * time.Second) defer func() { responseSub.Unsubscribe() close(q.responseBuf) ticker.Stop() }() queryLog.Info("Polling peers for latest chain head...") hasReceivedResponse := false var timeout <-chan time.Time for { select { case <-q.ctx.Done(): queryLog.Info("Finished querying state of the network, importing blocks...") return case <-ticker.C: q.RequestLatestHead() case <-timeout: queryLog.WithField("peerID", q.bestPeer.Pretty()).Info("Peer with highest canonical head") queryLog.Infof( "Latest chain head is at slot: %d and state root: %#x", q.currentHeadSlot, q.currentStateRoot, ) ticker.Stop() responseSub.Unsubscribe() q.cancel() case msg := <-q.responseBuf: // If this is the first response a node receives, we start // a timeout that will keep listening for more responses over a // certain time interval to ensure we get the best head from our peers. if !hasReceivedResponse { timeout = time.After(10 * time.Second) hasReceivedResponse = true } response := msg.Data.(*pb.ChainHeadResponse) if _, ok := q.chainHeadResponses[msg.Peer]; !ok { queryLog.WithFields(logrus.Fields{ "peerID": msg.Peer.Pretty(), "highestSlot": response.CanonicalSlot, }).Info("Received chain head from peer") q.chainHeadResponses[msg.Peer] = response } if response.CanonicalSlot > q.currentHeadSlot { q.bestPeer = msg.Peer q.currentHeadSlot = response.CanonicalSlot q.currentStateRoot = response.CanonicalStateRootHash32 q.currentFinalizedStateRoot = bytesutil.ToBytes32(response.FinalizedStateRootHash32S) q.canonicalBlockRoot = response.CanonicalBlockRoot q.finalizedBlockRoot = response.FinalizedBlockRoot } } } } func (q *Querier) waitForAllDepositsToBeProcessed() { for { processed, err := q.powchain.AreAllDepositsProcessed() if err != nil { queryLog.Errorf("Could not check status of deposits %v", err) continue } if processed { break } time.Sleep(logQueryInterval) } } // RequestLatestHead broadcasts a request for // the latest chain head slot and state root to a peer. func (q *Querier) RequestLatestHead() { request := &pb.ChainHeadRequest{} q.p2p.Broadcast(context.Background(), request) } // IsSynced checks if the node is currently synced with the // rest of the network. func (q *Querier) IsSynced() (bool, error) { if !q.chainStarted { return true, nil } if q.atGenesis { return true, nil } block, err := q.db.ChainHead() if err != nil { return false, err } if block == nil { return false, nil } return block.Slot >= q.currentHeadSlot, nil }