Add ChainStart Check To Sync (#1671)

* Add chainstart to sync service

* adding changes to node

* fixing tests

* comments

* add test

* add another test

* remove test in build.bzl
This commit is contained in:
Nishant Das 2019-02-21 12:27:04 +05:30 committed by GitHub
parent ff9174eefd
commit 5e5a8e75e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 134 additions and 2 deletions

View File

@ -81,7 +81,7 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) {
return nil, err return nil, err
} }
if err := beacon.registerSyncService(); err != nil { if err := beacon.registerSyncService(ctx); err != nil {
return nil, err return nil, err
} }
@ -226,7 +226,7 @@ func (b *BeaconNode) registerPOWChainService(ctx *cli.Context) error {
return b.services.RegisterService(web3Service) return b.services.RegisterService(web3Service)
} }
func (b *BeaconNode) registerSyncService() error { func (b *BeaconNode) registerSyncService(ctx *cli.Context) error {
var chainService *blockchain.ChainService var chainService *blockchain.ChainService
if err := b.services.FetchService(&chainService); err != nil { if err := b.services.FetchService(&chainService); err != nil {
return err return err
@ -242,11 +242,20 @@ func (b *BeaconNode) registerSyncService() error {
return err return err
} }
var web3Service *powchain.Web3Service
var enablePOWChain = ctx.GlobalBool(utils.EnablePOWChain.Name)
if enablePOWChain {
if err := b.services.FetchService(&web3Service); err != nil {
return err
}
}
cfg := &rbcsync.Config{ cfg := &rbcsync.Config{
ChainService: chainService, ChainService: chainService,
P2P: p2pService, P2P: p2pService,
BeaconDB: b.db, BeaconDB: b.db,
OperationService: operationService, OperationService: operationService,
PowChainService: web3Service,
} }
syncService := rbcsync.NewSyncService(context.Background(), cfg) syncService := rbcsync.NewSyncService(context.Background(), cfg)

View File

@ -4,6 +4,8 @@ import (
"context" "context"
"time" "time"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/db"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/p2p" "github.com/prysmaticlabs/prysm/shared/p2p"
@ -12,12 +14,18 @@ import (
var queryLog = logrus.WithField("prefix", "syncQuerier") var queryLog = logrus.WithField("prefix", "syncQuerier")
type powChainService interface {
HasChainStartLogOccurred() (bool, uint64, error)
ChainStartFeed() *event.Feed
}
// QuerierConfig defines the configurable properties of SyncQuerier. // QuerierConfig defines the configurable properties of SyncQuerier.
type QuerierConfig struct { type QuerierConfig struct {
ResponseBufferSize int ResponseBufferSize int
P2P p2pAPI P2P p2pAPI
BeaconDB *db.BeaconDB BeaconDB *db.BeaconDB
CurentHeadSlot uint64 CurentHeadSlot uint64
PowChain powChainService
} }
// DefaultQuerierConfig provides the default configuration for a sync service. // DefaultQuerierConfig provides the default configuration for a sync service.
@ -38,6 +46,9 @@ type Querier struct {
curentHeadSlot uint64 curentHeadSlot uint64
currentHeadHash []byte currentHeadHash []byte
responseBuf chan p2p.Message responseBuf chan p2p.Message
chainStartBuf chan time.Time
powchain powChainService
chainStarted bool
} }
// NewQuerierService constructs a new Sync Querier Service. // NewQuerierService constructs a new Sync Querier Service.
@ -56,11 +67,23 @@ func NewQuerierService(ctx context.Context,
db: cfg.BeaconDB, db: cfg.BeaconDB,
responseBuf: responseBuf, responseBuf: responseBuf,
curentHeadSlot: cfg.CurentHeadSlot, curentHeadSlot: cfg.CurentHeadSlot,
chainStarted: false,
powchain: cfg.PowChain,
chainStartBuf: make(chan time.Time, 1),
} }
} }
// Start begins the goroutine. // Start begins the goroutine.
func (q *Querier) Start() { func (q *Querier) Start() {
hasChainStarted, _, err := q.powchain.HasChainStartLogOccurred()
if err != nil {
queryLog.Errorf("Unable to get current state of the deposit contract %v", err)
return
}
if !hasChainStarted {
q.listenForChainStart()
return
}
q.run() q.run()
} }
@ -71,6 +94,25 @@ func (q *Querier) Stop() error {
return nil return nil
} }
func (q *Querier) listenForChainStart() {
sub := q.powchain.ChainStartFeed().Subscribe(q.chainStartBuf)
defer sub.Unsubscribe()
for {
select {
case <-q.chainStartBuf:
q.chainStarted = true
return
case <-sub.Err():
log.Fatal("Subscriber closed, unable to continue on with sync")
return
case <-q.ctx.Done():
log.Debug("RPC context closed, exiting goroutine")
return
}
}
}
func (q *Querier) run() { func (q *Querier) run() {
responseSub := q.p2p.Subscribe(&pb.ChainHeadResponse{}, q.responseBuf) responseSub := q.p2p.Subscribe(&pb.ChainHeadResponse{}, q.responseBuf)
@ -116,6 +158,9 @@ func (q *Querier) RequestLatestHead() {
// IsSynced checks if the node is cuurently synced with the // IsSynced checks if the node is cuurently synced with the
// rest of the network. // rest of the network.
func (q *Querier) IsSynced() (bool, error) { func (q *Querier) IsSynced() (bool, error) {
if q.chainStarted {
return true, nil
}
block, err := q.db.ChainHead() block, err := q.db.ChainHead()
if err != nil { if err != nil {
return false, err return false, err

View File

@ -4,6 +4,9 @@ import (
"context" "context"
"fmt" "fmt"
"testing" "testing"
"time"
"github.com/prysmaticlabs/prysm/shared/event"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/p2p" "github.com/prysmaticlabs/prysm/shared/p2p"
@ -11,11 +14,36 @@ import (
logTest "github.com/sirupsen/logrus/hooks/test" logTest "github.com/sirupsen/logrus/hooks/test"
) )
type genesisPowChain struct {
feed *event.Feed
}
func (mp *genesisPowChain) HasChainStartLogOccurred() (bool, uint64, error) {
return false, 0, nil
}
func (mp *genesisPowChain) ChainStartFeed() *event.Feed {
return mp.feed
}
type afterGenesisPowChain struct {
feed *event.Feed
}
func (mp *afterGenesisPowChain) HasChainStartLogOccurred() (bool, uint64, error) {
return true, 0, nil
}
func (mp *afterGenesisPowChain) ChainStartFeed() *event.Feed {
return mp.feed
}
func TestStartStop(t *testing.T) { func TestStartStop(t *testing.T) {
hook := logTest.NewGlobal() hook := logTest.NewGlobal()
cfg := &QuerierConfig{ cfg := &QuerierConfig{
P2P: &mockP2P{}, P2P: &mockP2P{},
ResponseBufferSize: 100, ResponseBufferSize: 100,
PowChain: &afterGenesisPowChain{},
} }
sq := NewQuerierService(context.Background(), cfg) sq := NewQuerierService(context.Background(), cfg)
@ -38,11 +66,59 @@ func TestStartStop(t *testing.T) {
hook.Reset() hook.Reset()
} }
func TestListenForChainStart_ContextCancelled(t *testing.T) {
cfg := &QuerierConfig{
P2P: &mockP2P{},
ResponseBufferSize: 100,
PowChain: &afterGenesisPowChain{
feed: new(event.Feed),
},
}
sq := NewQuerierService(context.Background(), cfg)
exitRoutine := make(chan bool)
defer func() {
close(exitRoutine)
}()
go func() {
sq.listenForChainStart()
exitRoutine <- true
}()
sq.cancel()
<-exitRoutine
if sq.ctx.Done() == nil {
t.Error("Despite context being cancelled, the done channel is nil")
}
}
func TestListenForChainStart(t *testing.T) {
cfg := &QuerierConfig{
P2P: &mockP2P{},
ResponseBufferSize: 100,
PowChain: &afterGenesisPowChain{
feed: new(event.Feed),
},
}
sq := NewQuerierService(context.Background(), cfg)
sq.chainStartBuf <- time.Now()
sq.listenForChainStart()
if !sq.chainStarted {
t.Fatal("ChainStart in the querier service is not true despite the log being fired")
}
sq.cancel()
}
func TestChainReqResponse(t *testing.T) { func TestChainReqResponse(t *testing.T) {
hook := logTest.NewGlobal() hook := logTest.NewGlobal()
cfg := &QuerierConfig{ cfg := &QuerierConfig{
P2P: &mockP2P{}, P2P: &mockP2P{},
ResponseBufferSize: 100, ResponseBufferSize: 100,
PowChain: &afterGenesisPowChain{},
} }
sq := NewQuerierService(context.Background(), cfg) sq := NewQuerierService(context.Background(), cfg)

View File

@ -23,6 +23,7 @@ type Config struct {
BeaconDB *db.BeaconDB BeaconDB *db.BeaconDB
P2P p2pAPI P2P p2pAPI
OperationService operationService OperationService operationService
PowChainService powChainService
} }
// NewSyncService creates a new instance of SyncService using the config // NewSyncService creates a new instance of SyncService using the config
@ -32,6 +33,7 @@ func NewSyncService(ctx context.Context, cfg *Config) *Service {
sqCfg := DefaultQuerierConfig() sqCfg := DefaultQuerierConfig()
sqCfg.BeaconDB = cfg.BeaconDB sqCfg.BeaconDB = cfg.BeaconDB
sqCfg.P2P = cfg.P2P sqCfg.P2P = cfg.P2P
sqCfg.PowChain = cfg.PowChainService
isCfg := initialsync.DefaultConfig() isCfg := initialsync.DefaultConfig()
isCfg.BeaconDB = cfg.BeaconDB isCfg.BeaconDB = cfg.BeaconDB