Fix Initial Sync in Restarts (#2197)

* set chainstart value

* revert change in IsSynced and add regression test

* change status

* make a better status query

* allow status check to work on restarts

* spacing
This commit is contained in:
Nishant Das 2019-04-10 02:18:23 +08:00 committed by Raul Jordan
parent f2b9269650
commit 90fed79610
4 changed files with 97 additions and 11 deletions

View File

@ -180,6 +180,11 @@ func (s *InitialSync) InitializeFinalizedStateRoot(root [32]byte) {
s.finalizedStateRoot = root
}
// HighestObservedSlot returns the highest observed slot.
func (s *InitialSync) HighestObservedSlot() uint64 {
return s.highestObservedSlot
}
// NodeIsSynced checks that the node has been caught up with the network.
func (s *InitialSync) NodeIsSynced() (bool, uint64) {
return s.nodeIsSynced, s.currentSlot

View File

@ -90,6 +90,7 @@ func (q *Querier) Start() {
return
}
q.chainStarted = hasChainStarted
q.atGenesis = !hasChainStarted
bState, err := q.db.HeadState(q.ctx)

View File

@ -166,3 +166,81 @@ func TestQuerier_ChainReqResponse(t *testing.T) {
close(exitRoutine)
hook.Reset()
}
func TestSyncedInGenesis(t *testing.T) {
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
cfg := &QuerierConfig{
P2P: &mockP2P{},
ResponseBufferSize: 100,
ChainService: &mockChainService{},
BeaconDB: db,
PowChain: &genesisPowChain{},
}
sq := NewQuerierService(context.Background(), cfg)
sq.chainStartBuf <- time.Now()
sq.Start()
synced, err := sq.IsSynced()
if err != nil {
t.Fatalf("Unable to check if the node is synced")
}
if !synced {
t.Errorf("node is not synced when it is supposed to be")
}
sq.cancel()
}
func TestSyncedInRestarts(t *testing.T) {
db := internal.SetupDB(t)
defer internal.TeardownDB(t, db)
cfg := &QuerierConfig{
P2P: &mockP2P{},
ResponseBufferSize: 100,
ChainService: &mockChainService{},
BeaconDB: db,
PowChain: &afterGenesisPowChain{},
}
sq := NewQuerierService(context.Background(), cfg)
bState := &pb.BeaconState{Slot: 0}
blk := &pb.BeaconBlock{Slot: 0}
if err := db.SaveState(context.Background(), bState); err != nil {
t.Fatalf("Could not save state: %v", err)
}
if err := db.SaveBlock(blk); err != nil {
t.Fatalf("Could not save state: %v", err)
}
if err := db.UpdateChainHead(context.Background(), blk, bState); err != nil {
t.Fatalf("Could not update chainhead: %v", err)
}
exitRoutine := make(chan bool)
go func() {
sq.Start()
exitRoutine <- true
}()
response := &pb.ChainHeadResponse{
CanonicalSlot: 10,
CanonicalStateRootHash32: []byte{'a', 'b'},
}
msg := p2p.Message{
Data: response,
}
sq.responseBuf <- msg
<-exitRoutine
synced, err := sq.IsSynced()
if err != nil {
t.Fatalf("Unable to check if the node is synced; %v", err)
}
if synced {
t.Errorf("node is synced when it is not supposed to be in a restart")
}
sq.cancel()
}

View File

@ -97,12 +97,13 @@ func (ss *Service) Status() error {
if ss.Querier.atGenesis {
return nil
}
synced, currentSyncedSlot := ss.InitialSync.NodeIsSynced()
if !synced {
return fmt.Errorf(
"node not yet synced, currently at slot: %v",
currentSyncedSlot-params.BeaconConfig().GenesisSlot,
)
blk, err := ss.Querier.db.ChainHead()
if err != nil {
return fmt.Errorf("could not retrieve chain head %v", err)
}
if blk.Slot < ss.InitialSync.HighestObservedSlot() {
return fmt.Errorf("node is not synced as the current chain head is at slot %d", blk.Slot-params.BeaconConfig().GenesisSlot)
}
return nil
}
@ -114,15 +115,16 @@ func (ss *Service) run() {
slog.Fatalf("Unable to retrieve result from sync querier %v", err)
}
if synced {
ss.RegularSync.Start()
return
}
// Sets the highest observed slot from querier.
ss.InitialSync.InitializeObservedSlot(ss.Querier.currentHeadSlot)
ss.InitialSync.InitializeObservedStateRoot(bytesutil.ToBytes32(ss.Querier.currentStateRoot))
// Sets the state root of the highest observed slot.
ss.InitialSync.InitializeFinalizedStateRoot(ss.Querier.currentFinalizedStateRoot)
if synced {
ss.RegularSync.Start()
return
}
ss.InitialSync.Start()
}