prysm-pulse/beacon-chain/sync/querier.go

238 lines
6.8 KiB
Go
Raw Normal View History

package sync
import (
"context"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
peer "github.com/libp2p/go-libp2p-peer"
2018-12-01 22:09:12 +00:00
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
2019-03-03 17:31:29 +00:00
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/p2p"
Allow 8 Validator Multinode Cluster to Run Indefinitely (#2050) * plug forkchoice to blockchain service's block processing * fixed tests * more fixes... * clean ups * fixed test * Update beacon-chain/blockchain/block_processing.go * merged with 2006 and started fixing tests * remove prints * fixed tests * lint * include ops service * if there's a skip slot, slot-- * fixed typo * started working on test * no fork choice in propose * bleh, need to fix state generator first * state gen takes input slot * feedback * fixed tests * preston's feedback * fmt * removed extra logging * add more logs * fixed validator attest * builds * fixed save block * children fix * removed verbose logs * fix fork choice * right logs * Add Prometheus Counter for Reorg (#2051) * fetch every slot (#2052) * test Fixes * lint * only regenerate state if there was a reorg * better logging * fixed seed * better logging * process skip slots in assignment requests * fix lint * disable state root computation * filter attestations in regular sync * log important items * better info logs * added spans to stategen * span in stategen * set validator deadline * randao stuff * disable sig verify * lint * lint * save only using historical states * use new goroutine for handling sync messages * change default buffer sizes * better p2p * rem some useless logs * lint * sync tests complete * complete tests * tests fixed * lint * fix flakey att service * PR feedback * undo k8s changes * Update beacon-chain/blockchain/block_processing.go * Update beacon-chain/sync/regular_sync.go * Add feature flag to enable compute state root * add comment * gazelle lint fix
2019-03-25 15:21:21 +00:00
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
)
var queryLog = logrus.WithField("prefix", "syncQuerier")
type powChainService interface {
HasChainStartLogOccurred() (bool, uint64, error)
BlockExists(ctx context.Context, hash common.Hash) (bool, *big.Int, error)
ChainStartFeed() *event.Feed
}
// QuerierConfig defines the configurable properties of SyncQuerier.
type QuerierConfig struct {
ResponseBufferSize int
P2P p2pAPI
2018-12-01 22:09:12 +00:00
BeaconDB *db.BeaconDB
PowChain powChainService
CurrentHeadSlot uint64
ChainService chainService
}
// DefaultQuerierConfig provides the default configuration for a sync service.
// ResponseBufferSize determines that buffer size of the `responseBuf` channel.
func DefaultQuerierConfig() *QuerierConfig {
return &QuerierConfig{
Allow 8 Validator Multinode Cluster to Run Indefinitely (#2050) * plug forkchoice to blockchain service's block processing * fixed tests * more fixes... * clean ups * fixed test * Update beacon-chain/blockchain/block_processing.go * merged with 2006 and started fixing tests * remove prints * fixed tests * lint * include ops service * if there's a skip slot, slot-- * fixed typo * started working on test * no fork choice in propose * bleh, need to fix state generator first * state gen takes input slot * feedback * fixed tests * preston's feedback * fmt * removed extra logging * add more logs * fixed validator attest * builds * fixed save block * children fix * removed verbose logs * fix fork choice * right logs * Add Prometheus Counter for Reorg (#2051) * fetch every slot (#2052) * test Fixes * lint * only regenerate state if there was a reorg * better logging * fixed seed * better logging * process skip slots in assignment requests * fix lint * disable state root computation * filter attestations in regular sync * log important items * better info logs * added spans to stategen * span in stategen * set validator deadline * randao stuff * disable sig verify * lint * lint * save only using historical states * use new goroutine for handling sync messages * change default buffer sizes * better p2p * rem some useless logs * lint * sync tests complete * complete tests * tests fixed * lint * fix flakey att service * PR feedback * undo k8s changes * Update beacon-chain/blockchain/block_processing.go * Update beacon-chain/sync/regular_sync.go * Add feature flag to enable compute state root * add comment * gazelle lint fix
2019-03-25 15:21:21 +00:00
ResponseBufferSize: params.BeaconConfig().DefaultBufferSize,
}
}
// Querier defines the main class in this package.
// See the package comments for a general description of the service's functions.
type Querier struct {
ctx context.Context
cancel context.CancelFunc
p2p p2pAPI
db *db.BeaconDB
chainService chainService
currentHeadSlot uint64
currentStateRoot []byte
currentFinalizedStateRoot [32]byte
responseBuf chan p2p.Message
chainStartBuf chan time.Time
powchain powChainService
chainStarted bool
atGenesis bool
bestPeer peer.ID
chainHeadResponses map[peer.ID]*pb.ChainHeadResponse
canonicalBlockRoot []byte
finalizedBlockRoot []byte
}
// NewQuerierService constructs a new Sync Querier Service.
// This method is normally called by the main node.
func NewQuerierService(ctx context.Context,
cfg *QuerierConfig,
) *Querier {
ctx, cancel := context.WithCancel(ctx)
responseBuf := make(chan p2p.Message, cfg.ResponseBufferSize)
return &Querier{
ctx: ctx,
cancel: cancel,
p2p: cfg.P2P,
db: cfg.BeaconDB,
chainService: cfg.ChainService,
responseBuf: responseBuf,
currentHeadSlot: cfg.CurrentHeadSlot,
chainStarted: false,
atGenesis: true,
powchain: cfg.PowChain,
chainStartBuf: make(chan time.Time, 1),
chainHeadResponses: make(map[peer.ID]*pb.ChainHeadResponse),
}
}
// Start begins the goroutine.
func (q *Querier) Start() {
hasChainStarted, _, err := q.powchain.HasChainStartLogOccurred()
if err != nil {
queryLog.Errorf("Unable to get current state of the deposit contract %v", err)
return
}
q.chainStarted = hasChainStarted
q.atGenesis = !hasChainStarted
bState, err := q.db.HeadState(q.ctx)
if err != nil {
queryLog.Errorf("Unable to retrieve beacon state %v", err)
}
// we handle both the cases where either chainstart has not occurred or
// if beacon state has been initialized. If chain start has occurred but
// beacon state has not been initialized we wait for the POW chain service
// to accumulate all the deposits and process them.
if !hasChainStarted || bState == nil {
q.listenForStateInitialization()
// Return, if the node is at genesis.
if q.atGenesis {
return
}
}
q.run()
}
// Stop kills the sync querier goroutine.
func (q *Querier) Stop() error {
queryLog.Info("Stopping service")
q.cancel()
return nil
}
func (q *Querier) listenForStateInitialization() {
sub := q.chainService.StateInitializedFeed().Subscribe(q.chainStartBuf)
defer sub.Unsubscribe()
for {
select {
case <-q.chainStartBuf:
2019-04-30 21:55:59 +00:00
queryLog.Info("State has been initialized")
q.chainStarted = true
return
case <-sub.Err():
log.Fatal("Subscriber closed, unable to continue on with sync")
return
case <-q.ctx.Done():
log.Debug("RPC context closed, exiting goroutine")
return
}
}
}
func (q *Querier) run() {
responseSub := q.p2p.Subscribe(&pb.ChainHeadResponse{}, q.responseBuf)
// Ticker so that service will keep on requesting for chain head
// until they get a response.
ticker := time.NewTicker(1 * time.Second)
defer func() {
responseSub.Unsubscribe()
close(q.responseBuf)
ticker.Stop()
}()
2019-04-30 21:55:59 +00:00
log.Info("Polling peers for latest chain head...")
hasReceivedResponse := false
var timeout <-chan time.Time
for {
select {
case <-q.ctx.Done():
2019-04-30 21:55:59 +00:00
queryLog.Info("Finished querying state of the network, importing blocks...")
return
case <-ticker.C:
q.RequestLatestHead()
case <-timeout:
queryLog.WithField("peerID", q.bestPeer.Pretty()).Info("Peer with highest canonical head")
queryLog.Infof(
"Latest chain head is at slot: %d and state root: %#x",
q.currentHeadSlot-params.BeaconConfig().GenesisSlot, q.currentStateRoot,
)
ticker.Stop()
responseSub.Unsubscribe()
q.cancel()
case msg := <-q.responseBuf:
// If this is the first response a node receives, we start
// a timeout that will keep listening for more responses over a
// certain time interval to ensure we get the best head from our peers.
if !hasReceivedResponse {
timeout = time.After(10 * time.Second)
hasReceivedResponse = true
}
response := msg.Data.(*pb.ChainHeadResponse)
if _, ok := q.chainHeadResponses[msg.Peer]; !ok {
2019-05-04 04:11:34 +00:00
queryLog.WithFields(logrus.Fields{
"peerID": msg.Peer.Pretty(),
"highestSlot": response.CanonicalSlot - params.BeaconConfig().GenesisSlot,
}).Info("Received chain head from peer")
q.chainHeadResponses[msg.Peer] = response
2019-05-04 04:11:34 +00:00
}
if response.CanonicalSlot > q.currentHeadSlot {
q.currentHeadSlot = response.CanonicalSlot
q.currentStateRoot = response.CanonicalStateRootHash32
q.currentFinalizedStateRoot = bytesutil.ToBytes32(response.FinalizedStateRootHash32S)
q.canonicalBlockRoot = response.CanonicalBlockRoot
q.finalizedBlockRoot = response.FinalizedBlockRoot
}
}
}
}
// RequestLatestHead broadcasts a request for
// the latest chain head slot and state root to a peer.
func (q *Querier) RequestLatestHead() {
request := &pb.ChainHeadRequest{}
q.p2p.Broadcast(context.Background(), request)
}
// IsSynced checks if the node is currently synced with the
// rest of the network.
func (q *Querier) IsSynced() (bool, error) {
if !q.chainStarted {
return true, nil
}
if q.atGenesis {
return true, nil
}
block, err := q.db.ChainHead()
if err != nil {
return false, err
}
if block == nil {
return false, nil
}
if block.Slot >= q.currentHeadSlot {
return true, nil
}
return false, err
}