diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index e74efee2d..5df817278 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -81,7 +81,7 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) { return nil, err } - if err := beacon.registerSyncService(); err != nil { + if err := beacon.registerSyncService(ctx); err != nil { return nil, err } @@ -226,7 +226,7 @@ func (b *BeaconNode) registerPOWChainService(ctx *cli.Context) error { return b.services.RegisterService(web3Service) } -func (b *BeaconNode) registerSyncService() error { +func (b *BeaconNode) registerSyncService(ctx *cli.Context) error { var chainService *blockchain.ChainService if err := b.services.FetchService(&chainService); err != nil { return err @@ -242,11 +242,20 @@ func (b *BeaconNode) registerSyncService() error { return err } + var web3Service *powchain.Web3Service + var enablePOWChain = ctx.GlobalBool(utils.EnablePOWChain.Name) + if enablePOWChain { + if err := b.services.FetchService(&web3Service); err != nil { + return err + } + } + cfg := &rbcsync.Config{ ChainService: chainService, P2P: p2pService, BeaconDB: b.db, OperationService: operationService, + PowChainService: web3Service, } syncService := rbcsync.NewSyncService(context.Background(), cfg) diff --git a/beacon-chain/sync/querier.go b/beacon-chain/sync/querier.go index 350b15379..2666ee4cd 100644 --- a/beacon-chain/sync/querier.go +++ b/beacon-chain/sync/querier.go @@ -4,6 +4,8 @@ import ( "context" "time" + "github.com/prysmaticlabs/prysm/shared/event" + "github.com/prysmaticlabs/prysm/beacon-chain/db" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/p2p" @@ -12,12 +14,18 @@ import ( var queryLog = logrus.WithField("prefix", "syncQuerier") +type powChainService interface { + HasChainStartLogOccurred() (bool, uint64, error) + ChainStartFeed() *event.Feed +} + // QuerierConfig defines the configurable properties of SyncQuerier. type QuerierConfig struct { ResponseBufferSize int P2P p2pAPI BeaconDB *db.BeaconDB CurentHeadSlot uint64 + PowChain powChainService } // DefaultQuerierConfig provides the default configuration for a sync service. @@ -38,6 +46,9 @@ type Querier struct { curentHeadSlot uint64 currentHeadHash []byte responseBuf chan p2p.Message + chainStartBuf chan time.Time + powchain powChainService + chainStarted bool } // NewQuerierService constructs a new Sync Querier Service. @@ -56,11 +67,23 @@ func NewQuerierService(ctx context.Context, db: cfg.BeaconDB, responseBuf: responseBuf, curentHeadSlot: cfg.CurentHeadSlot, + chainStarted: false, + powchain: cfg.PowChain, + chainStartBuf: make(chan time.Time, 1), } } // Start begins the goroutine. func (q *Querier) Start() { + hasChainStarted, _, err := q.powchain.HasChainStartLogOccurred() + if err != nil { + queryLog.Errorf("Unable to get current state of the deposit contract %v", err) + return + } + if !hasChainStarted { + q.listenForChainStart() + return + } q.run() } @@ -71,6 +94,25 @@ func (q *Querier) Stop() error { return nil } +func (q *Querier) listenForChainStart() { + + sub := q.powchain.ChainStartFeed().Subscribe(q.chainStartBuf) + defer sub.Unsubscribe() + for { + select { + case <-q.chainStartBuf: + q.chainStarted = true + return + case <-sub.Err(): + log.Fatal("Subscriber closed, unable to continue on with sync") + return + case <-q.ctx.Done(): + log.Debug("RPC context closed, exiting goroutine") + return + } + } +} + func (q *Querier) run() { responseSub := q.p2p.Subscribe(&pb.ChainHeadResponse{}, q.responseBuf) @@ -116,6 +158,9 @@ func (q *Querier) RequestLatestHead() { // IsSynced checks if the node is cuurently synced with the // rest of the network. func (q *Querier) IsSynced() (bool, error) { + if q.chainStarted { + return true, nil + } block, err := q.db.ChainHead() if err != nil { return false, err diff --git a/beacon-chain/sync/querier_test.go b/beacon-chain/sync/querier_test.go index 8fcf2bf0a..41ed1db66 100644 --- a/beacon-chain/sync/querier_test.go +++ b/beacon-chain/sync/querier_test.go @@ -4,6 +4,9 @@ import ( "context" "fmt" "testing" + "time" + + "github.com/prysmaticlabs/prysm/shared/event" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/p2p" @@ -11,11 +14,36 @@ import ( logTest "github.com/sirupsen/logrus/hooks/test" ) +type genesisPowChain struct { + feed *event.Feed +} + +func (mp *genesisPowChain) HasChainStartLogOccurred() (bool, uint64, error) { + return false, 0, nil +} + +func (mp *genesisPowChain) ChainStartFeed() *event.Feed { + return mp.feed +} + +type afterGenesisPowChain struct { + feed *event.Feed +} + +func (mp *afterGenesisPowChain) HasChainStartLogOccurred() (bool, uint64, error) { + return true, 0, nil +} + +func (mp *afterGenesisPowChain) ChainStartFeed() *event.Feed { + return mp.feed +} + func TestStartStop(t *testing.T) { hook := logTest.NewGlobal() cfg := &QuerierConfig{ P2P: &mockP2P{}, ResponseBufferSize: 100, + PowChain: &afterGenesisPowChain{}, } sq := NewQuerierService(context.Background(), cfg) @@ -38,11 +66,59 @@ func TestStartStop(t *testing.T) { hook.Reset() } +func TestListenForChainStart_ContextCancelled(t *testing.T) { + cfg := &QuerierConfig{ + P2P: &mockP2P{}, + ResponseBufferSize: 100, + PowChain: &afterGenesisPowChain{ + feed: new(event.Feed), + }, + } + sq := NewQuerierService(context.Background(), cfg) + exitRoutine := make(chan bool) + + defer func() { + close(exitRoutine) + }() + + go func() { + sq.listenForChainStart() + exitRoutine <- true + }() + + sq.cancel() + <-exitRoutine + + if sq.ctx.Done() == nil { + t.Error("Despite context being cancelled, the done channel is nil") + } +} + +func TestListenForChainStart(t *testing.T) { + cfg := &QuerierConfig{ + P2P: &mockP2P{}, + ResponseBufferSize: 100, + PowChain: &afterGenesisPowChain{ + feed: new(event.Feed), + }, + } + sq := NewQuerierService(context.Background(), cfg) + + sq.chainStartBuf <- time.Now() + sq.listenForChainStart() + + if !sq.chainStarted { + t.Fatal("ChainStart in the querier service is not true despite the log being fired") + } + sq.cancel() +} + func TestChainReqResponse(t *testing.T) { hook := logTest.NewGlobal() cfg := &QuerierConfig{ P2P: &mockP2P{}, ResponseBufferSize: 100, + PowChain: &afterGenesisPowChain{}, } sq := NewQuerierService(context.Background(), cfg) diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 10f621194..1c6fbea3e 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -23,6 +23,7 @@ type Config struct { BeaconDB *db.BeaconDB P2P p2pAPI OperationService operationService + PowChainService powChainService } // NewSyncService creates a new instance of SyncService using the config @@ -32,6 +33,7 @@ func NewSyncService(ctx context.Context, cfg *Config) *Service { sqCfg := DefaultQuerierConfig() sqCfg.BeaconDB = cfg.BeaconDB sqCfg.P2P = cfg.P2P + sqCfg.PowChain = cfg.PowChainService isCfg := initialsync.DefaultConfig() isCfg.BeaconDB = cfg.BeaconDB