2018-08-22 01:52:03 +00:00
// Package initialsync is run by the beacon node when the local chain is
// behind the network's longest chain. Initial sync works as follows:
// The node requests for the slot number of the most recent finalized block.
// The node then builds from the most recent finalized block by requesting for subsequent
// blocks by slot number. Once the service detects that the local chain is caught up with
// the network, the service hands over control to the regular sync service.
// Note: The behavior of initialsync will likely change as the specification changes.
// The most significant and highly probable change will be determining where to sync from.
// The beacon chain may sync from a block in the pasts X months in order to combat long-range attacks
// (see here: https://github.com/ethereum/wiki/wiki/Proof-of-Stake-FAQs#what-is-weak-subjectivity)
package initialsync
import (
"context"
"errors"
"fmt"
2019-03-27 16:15:29 +00:00
"math/big"
2019-03-20 14:17:44 +00:00
"runtime/debug"
2019-03-11 22:07:16 +00:00
"strings"
2019-02-26 05:37:28 +00:00
"sync"
2018-08-22 01:52:03 +00:00
"time"
2019-03-27 16:15:29 +00:00
"github.com/ethereum/go-ethereum/common"
2018-12-23 20:34:59 +00:00
"github.com/gogo/protobuf/proto"
2019-03-08 04:54:41 +00:00
peer "github.com/libp2p/go-libp2p-peer"
2018-12-01 22:09:12 +00:00
"github.com/prysmaticlabs/prysm/beacon-chain/db"
2018-08-22 01:52:03 +00:00
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
2019-01-31 02:53:58 +00:00
"github.com/prysmaticlabs/prysm/shared/bytesutil"
2018-10-09 05:58:54 +00:00
"github.com/prysmaticlabs/prysm/shared/event"
2019-03-03 17:31:29 +00:00
"github.com/prysmaticlabs/prysm/shared/hashutil"
2018-08-22 01:52:03 +00:00
"github.com/prysmaticlabs/prysm/shared/p2p"
2018-11-21 18:00:36 +00:00
"github.com/prysmaticlabs/prysm/shared/params"
2018-08-22 01:52:03 +00:00
"github.com/sirupsen/logrus"
2019-03-05 17:31:19 +00:00
"go.opencensus.io/trace"
2018-08-22 01:52:03 +00:00
)
2018-09-21 21:02:17 +00:00
var log = logrus . WithField ( "prefix" , "initial-sync" )
2019-03-11 22:07:16 +00:00
var debugError = "debug:"
2018-08-22 01:52:03 +00:00
// Config defines the configurable properties of InitialSync.
//
type Config struct {
2018-12-01 22:09:12 +00:00
SyncPollingInterval time . Duration
BlockBufferSize int
BlockAnnounceBufferSize int
2018-12-26 14:16:46 +00:00
BatchedBlockBufferSize int
2018-12-01 22:09:12 +00:00
StateBufferSize int
BeaconDB * db . BeaconDB
P2P p2pAPI
SyncService syncService
2019-03-29 21:51:58 +00:00
ChainService chainService
2019-03-27 16:15:29 +00:00
PowChain powChainService
2018-08-22 01:52:03 +00:00
}
// DefaultConfig provides the default configuration for a sync service.
// SyncPollingInterval determines how frequently the service checks that initial sync is complete.
// BlockBufferSize determines that buffer size of the `blockBuf` channel.
2019-03-20 05:13:14 +00:00
// StateBufferSize determines the buffer size of the `stateBuf` channel.
2018-12-10 05:26:44 +00:00
func DefaultConfig ( ) * Config {
return & Config {
2018-12-01 22:09:12 +00:00
SyncPollingInterval : time . Duration ( params . BeaconConfig ( ) . SyncPollingInterval ) * time . Second ,
2019-03-25 15:21:21 +00:00
BlockBufferSize : params . BeaconConfig ( ) . DefaultBufferSize ,
BatchedBlockBufferSize : params . BeaconConfig ( ) . DefaultBufferSize ,
BlockAnnounceBufferSize : params . BeaconConfig ( ) . DefaultBufferSize ,
StateBufferSize : params . BeaconConfig ( ) . DefaultBufferSize ,
2018-08-22 01:52:03 +00:00
}
}
2018-10-09 05:58:54 +00:00
type p2pAPI interface {
2019-03-17 02:56:05 +00:00
p2p . Broadcaster
2019-03-08 02:37:15 +00:00
p2p . Sender
2018-10-09 05:58:54 +00:00
Subscribe ( msg proto . Message , channel chan p2p . Message ) event . Subscription
}
2019-03-29 21:51:58 +00:00
type chainService interface {
ReceiveBlock ( ctx context . Context , block * pb . BeaconBlock ) ( * pb . BeaconState , error )
ApplyForkChoiceRule ( ctx context . Context , block * pb . BeaconBlock , computedState * pb . BeaconState ) error
}
2019-03-27 16:15:29 +00:00
type powChainService interface {
BlockExists ( ctx context . Context , hash common . Hash ) ( bool , * big . Int , error )
}
2018-08-22 01:52:03 +00:00
// SyncService is the interface for the Sync service.
// InitialSync calls `Start` when initial sync completes.
2018-10-05 17:14:50 +00:00
type syncService interface {
2018-08-22 01:52:03 +00:00
Start ( )
2018-11-19 01:59:11 +00:00
ResumeSync ( )
}
2018-08-22 01:52:03 +00:00
// InitialSync defines the main class in this package.
// See the package comments for a general description of the service's functions.
type InitialSync struct {
2019-02-26 05:37:28 +00:00
ctx context . Context
cancel context . CancelFunc
p2p p2pAPI
syncService syncService
2019-03-29 21:51:58 +00:00
chainService chainService
2019-02-26 05:37:28 +00:00
db * db . BeaconDB
2019-03-27 16:15:29 +00:00
powchain powChainService
2019-02-26 05:37:28 +00:00
blockAnnounceBuf chan p2p . Message
batchedBlockBuf chan p2p . Message
blockBuf chan p2p . Message
stateBuf chan p2p . Message
currentSlot uint64
highestObservedSlot uint64
2019-03-03 17:03:47 +00:00
beaconStateSlot uint64
2019-02-26 05:37:28 +00:00
syncPollingInterval time . Duration
inMemoryBlocks map [ uint64 ] * pb . BeaconBlock
syncedFeed * event . Feed
2019-03-05 16:59:49 +00:00
reqState bool
2019-02-26 05:37:28 +00:00
stateRootOfHighestObservedSlot [ 32 ] byte
mutex * sync . Mutex
2018-08-22 01:52:03 +00:00
}
// NewInitialSyncService constructs a new InitialSyncService.
// This method is normally called by the main node.
func NewInitialSyncService ( ctx context . Context ,
2018-12-10 05:26:44 +00:00
cfg * Config ,
2018-08-22 01:52:03 +00:00
) * InitialSync {
ctx , cancel := context . WithCancel ( ctx )
blockBuf := make ( chan p2p . Message , cfg . BlockBufferSize )
2018-12-01 22:09:12 +00:00
stateBuf := make ( chan p2p . Message , cfg . StateBufferSize )
2018-11-21 18:00:36 +00:00
blockAnnounceBuf := make ( chan p2p . Message , cfg . BlockAnnounceBufferSize )
2018-12-26 14:16:46 +00:00
batchedBlockBuf := make ( chan p2p . Message , cfg . BatchedBlockBufferSize )
2018-08-22 01:52:03 +00:00
return & InitialSync {
2019-02-26 05:37:28 +00:00
ctx : ctx ,
cancel : cancel ,
p2p : cfg . P2P ,
syncService : cfg . SyncService ,
2019-03-29 21:51:58 +00:00
chainService : cfg . ChainService ,
2019-02-26 05:37:28 +00:00
db : cfg . BeaconDB ,
2019-03-27 16:15:29 +00:00
powchain : cfg . PowChain ,
2019-02-26 05:37:28 +00:00
currentSlot : params . BeaconConfig ( ) . GenesisSlot ,
highestObservedSlot : params . BeaconConfig ( ) . GenesisSlot ,
2019-03-03 17:03:47 +00:00
beaconStateSlot : params . BeaconConfig ( ) . GenesisSlot ,
2019-02-26 05:37:28 +00:00
blockBuf : blockBuf ,
stateBuf : stateBuf ,
batchedBlockBuf : batchedBlockBuf ,
blockAnnounceBuf : blockAnnounceBuf ,
syncPollingInterval : cfg . SyncPollingInterval ,
inMemoryBlocks : map [ uint64 ] * pb . BeaconBlock { } ,
syncedFeed : new ( event . Feed ) ,
2019-03-05 16:59:49 +00:00
reqState : false ,
2019-02-26 05:37:28 +00:00
stateRootOfHighestObservedSlot : [ 32 ] byte { } ,
mutex : new ( sync . Mutex ) ,
2018-08-22 01:52:03 +00:00
}
}
// Start begins the goroutine.
func ( s * InitialSync ) Start ( ) {
2019-02-26 05:37:28 +00:00
cHead , err := s . db . ChainHead ( )
if err != nil {
log . Errorf ( "Unable to get chain head %v" , err )
}
2019-03-05 16:59:49 +00:00
s . currentSlot = cHead . Slot
2019-02-26 05:37:28 +00:00
2019-03-29 21:51:58 +00:00
var reqState bool
// setting genesis bool
if cHead . Slot == params . BeaconConfig ( ) . GenesisSlot || s . isSlotDiffLarge ( ) {
reqState = true
}
s . reqState = reqState
2018-08-22 01:52:03 +00:00
go func ( ) {
ticker := time . NewTicker ( s . syncPollingInterval )
s . run ( ticker . C )
ticker . Stop ( )
} ( )
2018-12-26 14:16:46 +00:00
go s . checkInMemoryBlocks ( )
2018-08-22 01:52:03 +00:00
}
// Stop kills the initial sync goroutine.
func ( s * InitialSync ) Stop ( ) error {
log . Info ( "Stopping service" )
s . cancel ( )
return nil
}
2019-02-23 06:06:20 +00:00
// InitializeObservedSlot sets the highest observed slot.
func ( s * InitialSync ) InitializeObservedSlot ( slot uint64 ) {
s . highestObservedSlot = slot
}
2019-02-26 05:37:28 +00:00
// InitializeStateRoot sets the state root of the highest observed slot.
func ( s * InitialSync ) InitializeStateRoot ( root [ 32 ] byte ) {
s . stateRootOfHighestObservedSlot = root
}
2019-02-23 06:06:20 +00:00
// SyncedFeed returns a feed which fires a message once the node is synced
func ( s * InitialSync ) SyncedFeed ( ) * event . Feed {
return s . syncedFeed
}
2018-08-22 01:52:03 +00:00
// run is the main goroutine for the initial sync service.
// delayChan is explicitly passed into this function to facilitate tests that don't require a timeout.
// It is assumed that the goroutine `run` is only called once per instance.
2018-12-01 22:09:12 +00:00
func ( s * InitialSync ) run ( delayChan <- chan time . Time ) {
2019-03-29 21:51:58 +00:00
2018-09-09 22:15:24 +00:00
blockSub := s . p2p . Subscribe ( & pb . BeaconBlockResponse { } , s . blockBuf )
2018-12-26 14:16:46 +00:00
batchedBlocksub := s . p2p . Subscribe ( & pb . BatchedBeaconBlockResponse { } , s . batchedBlockBuf )
2018-11-21 18:00:36 +00:00
blockAnnounceSub := s . p2p . Subscribe ( & pb . BeaconBlockAnnounce { } , s . blockAnnounceBuf )
2018-12-01 22:09:12 +00:00
beaconStateSub := s . p2p . Subscribe ( & pb . BeaconStateResponse { } , s . stateBuf )
2018-08-22 01:52:03 +00:00
defer func ( ) {
blockSub . Unsubscribe ( )
2018-11-21 18:00:36 +00:00
blockAnnounceSub . Unsubscribe ( )
2018-12-01 22:09:12 +00:00
beaconStateSub . Unsubscribe ( )
2018-12-26 14:16:46 +00:00
batchedBlocksub . Unsubscribe ( )
close ( s . batchedBlockBuf )
2018-08-22 01:52:03 +00:00
close ( s . blockBuf )
2018-12-01 22:09:12 +00:00
close ( s . stateBuf )
2018-08-22 01:52:03 +00:00
} ( )
2019-03-29 21:51:58 +00:00
if s . reqState {
if err := s . requestStateFromPeer ( s . ctx , s . stateRootOfHighestObservedSlot [ : ] , p2p . AnyPeer ) ; err != nil {
log . Errorf ( "Could not request state from peer %v" , err )
}
} else {
// Send out a batch request
s . requestBatchedBlocks ( s . currentSlot + 1 , s . highestObservedSlot )
2019-03-03 17:03:47 +00:00
}
2019-02-26 05:37:28 +00:00
2018-08-22 01:52:03 +00:00
for {
select {
case <- s . ctx . Done ( ) :
log . Debug ( "Exiting goroutine" )
return
2018-12-01 22:09:12 +00:00
case <- delayChan :
2019-03-05 17:31:19 +00:00
if s . checkSyncStatus ( ) {
2018-08-22 01:52:03 +00:00
return
}
2018-11-21 18:00:36 +00:00
case msg := <- s . blockAnnounceBuf :
2019-03-10 22:53:28 +00:00
safelyHandleMessage ( s . processBlockAnnounce , msg )
2018-08-22 01:52:03 +00:00
case msg := <- s . blockBuf :
2019-03-10 22:53:28 +00:00
safelyHandleMessage ( func ( message p2p . Message ) {
data := message . Data . ( * pb . BeaconBlockResponse )
s . processBlock ( message . Ctx , data . Block , message . Peer )
} , msg )
2018-12-01 22:09:12 +00:00
case msg := <- s . stateBuf :
2019-03-10 22:53:28 +00:00
safelyHandleMessage ( s . processState , msg )
2018-12-26 14:16:46 +00:00
case msg := <- s . batchedBlockBuf :
2019-03-10 22:53:28 +00:00
safelyHandleMessage ( s . processBatchedBlocks , msg )
2018-12-26 14:16:46 +00:00
}
}
}
2019-03-10 22:53:28 +00:00
// safelyHandleMessage will recover and log any panic that occurs from the
// function argument.
func safelyHandleMessage ( fn func ( p2p . Message ) , msg p2p . Message ) {
defer func ( ) {
if r := recover ( ) ; r != nil {
printedMsg := "message contains no data"
if msg . Data != nil {
printedMsg = proto . MarshalTextString ( msg . Data )
}
log . WithFields ( logrus . Fields {
"r" : r ,
"msg" : printedMsg ,
} ) . Error ( "Panicked when handling p2p message! Recovering..." )
2019-03-20 14:17:44 +00:00
debug . PrintStack ( )
2019-03-10 22:53:28 +00:00
if msg . Ctx == nil {
return
}
if span := trace . FromContext ( msg . Ctx ) ; span != nil {
span . SetStatus ( trace . Status {
Code : trace . StatusCodeInternal ,
Message : fmt . Sprintf ( "Panic: %v" , r ) ,
} )
}
}
} ( )
// Fingers crossed that it doesn't panic...
fn ( msg )
}
2018-12-26 14:16:46 +00:00
// checkInMemoryBlocks is another routine which will run concurrently with the
// main routine for initial sync, where it checks the blocks saved in memory regularly
// to see if the blocks are valid enough to be processed.
func ( s * InitialSync ) checkInMemoryBlocks ( ) {
for {
select {
case <- s . ctx . Done ( ) :
return
default :
if s . currentSlot == s . highestObservedSlot {
return
}
2019-02-26 05:37:28 +00:00
s . mutex . Lock ( )
2018-12-26 14:16:46 +00:00
if block , ok := s . inMemoryBlocks [ s . currentSlot + 1 ] ; ok && s . currentSlot + 1 <= s . highestObservedSlot {
2019-03-08 02:37:15 +00:00
s . processBlock ( s . ctx , block , p2p . AnyPeer )
2018-12-26 14:16:46 +00:00
}
2019-02-26 05:37:28 +00:00
s . mutex . Unlock ( )
2018-12-26 14:16:46 +00:00
}
}
}
2019-03-05 17:31:19 +00:00
// checkSyncStatus verifies if the beacon node is correctly synced with its peers up to their
// latest canonical head. If not, then it requests batched blocks up to the highest observed slot.
func ( s * InitialSync ) checkSyncStatus ( ) bool {
if s . reqState {
2019-03-08 02:37:15 +00:00
if err := s . requestStateFromPeer ( s . ctx , s . stateRootOfHighestObservedSlot [ : ] , p2p . AnyPeer ) ; err != nil {
2019-03-05 17:31:19 +00:00
log . Errorf ( "Could not request state from peer %v" , err )
}
return false
}
if s . highestObservedSlot == s . currentSlot {
log . Info ( "Exiting initial sync and starting normal sync" )
s . syncedFeed . Send ( s . currentSlot )
s . syncService . ResumeSync ( )
return true
}
// requests multiple blocks so as to save and sync quickly.
s . requestBatchedBlocks ( s . currentSlot + 1 , s . highestObservedSlot )
return false
}
func ( s * InitialSync ) processBlockAnnounce ( msg p2p . Message ) {
ctx , span := trace . StartSpan ( msg . Ctx , "beacon-chain.sync.initial-sync.processBlockAnnounce" )
defer span . End ( )
data := msg . Data . ( * pb . BeaconBlockAnnounce )
recBlockAnnounce . Inc ( )
if s . reqState {
2019-03-08 02:37:15 +00:00
if err := s . requestStateFromPeer ( ctx , s . stateRootOfHighestObservedSlot [ : ] , msg . Peer ) ; err != nil {
2019-03-05 17:31:19 +00:00
log . Errorf ( "Could not request state from peer %v" , err )
}
return
}
if data . SlotNumber > s . highestObservedSlot {
s . highestObservedSlot = data . SlotNumber
}
s . requestBatchedBlocks ( s . currentSlot + 1 , s . highestObservedSlot )
2019-03-29 21:51:58 +00:00
log . Debugf ( "Successfully requested the next block with slot: %d" , data . SlotNumber - params . BeaconConfig ( ) . GenesisSlot )
2019-03-05 17:31:19 +00:00
}
2018-12-26 14:16:46 +00:00
// processBlock is the main method that validates each block which is received
// for initial sync. It checks if the blocks are valid and then will continue to
// process and save it into the db.
2019-03-08 02:37:15 +00:00
func ( s * InitialSync ) processBlock ( ctx context . Context , block * pb . BeaconBlock , peerID peer . ID ) {
2019-03-05 17:31:19 +00:00
ctx , span := trace . StartSpan ( ctx , "beacon-chain.sync.initial-sync.processBlock" )
defer span . End ( )
recBlock . Inc ( )
2019-01-05 03:58:19 +00:00
if block . Slot > s . highestObservedSlot {
s . highestObservedSlot = block . Slot
2018-12-26 14:16:46 +00:00
}
2019-01-05 03:58:19 +00:00
if block . Slot < s . currentSlot {
2018-12-26 14:16:46 +00:00
return
}
2019-02-26 05:37:28 +00:00
// requesting beacon state if there is no saved state.
2019-03-05 16:59:49 +00:00
if s . reqState {
2019-03-12 20:44:21 +00:00
if err := s . requestStateFromPeer ( s . ctx , s . stateRootOfHighestObservedSlot [ : ] , peerID ) ; err != nil {
2018-12-26 14:16:46 +00:00
log . Errorf ( "Could not request beacon state from peer: %v" , err )
}
return
}
2019-03-08 16:09:24 +00:00
// if it isn't the block in the next slot we check if it is a skipped slot.
// if it isn't skipped we save it in memory.
2019-01-05 03:58:19 +00:00
if block . Slot != ( s . currentSlot + 1 ) {
2019-03-08 16:09:24 +00:00
// if parent exists we validate the block.
if s . doesParentExist ( block ) {
if err := s . validateAndSaveNextBlock ( ctx , block ) ; err != nil {
2019-03-11 22:07:16 +00:00
// Debug error so as not to have noisy error logs
if strings . HasPrefix ( err . Error ( ) , debugError ) {
log . Debug ( strings . TrimPrefix ( err . Error ( ) , debugError ) )
return
}
2019-03-08 16:09:24 +00:00
log . Errorf ( "Unable to save block: %v" , err )
}
return
}
2019-02-26 05:37:28 +00:00
s . mutex . Lock ( )
defer s . mutex . Unlock ( )
2019-01-05 03:58:19 +00:00
if _ , ok := s . inMemoryBlocks [ block . Slot ] ; ! ok {
s . inMemoryBlocks [ block . Slot ] = block
2018-08-22 01:52:03 +00:00
}
2018-12-26 14:16:46 +00:00
return
2018-08-22 01:52:03 +00:00
}
2018-12-26 14:16:46 +00:00
2019-02-28 03:55:47 +00:00
if err := s . validateAndSaveNextBlock ( ctx , block ) ; err != nil {
2019-03-11 22:07:16 +00:00
// Debug error so as not to have noisy error logs
if strings . HasPrefix ( err . Error ( ) , debugError ) {
log . Debug ( strings . TrimPrefix ( err . Error ( ) , debugError ) )
return
}
2018-12-26 14:16:46 +00:00
log . Errorf ( "Unable to save block: %v" , err )
}
}
// processBatchedBlocks processes all the received blocks from
// the p2p message.
2019-03-05 17:31:19 +00:00
func ( s * InitialSync ) processBatchedBlocks ( msg p2p . Message ) {
ctx , span := trace . StartSpan ( msg . Ctx , "beacon-chain.sync.initial-sync.processBatchedBlocks" )
defer span . End ( )
batchedBlockReq . Inc ( )
2018-12-26 14:16:46 +00:00
response := msg . Data . ( * pb . BatchedBeaconBlockResponse )
2019-01-05 03:58:19 +00:00
batchedBlocks := response . BatchedBlocks
2019-03-07 22:43:48 +00:00
if len ( batchedBlocks ) == 0 {
// Do not process empty response
return
}
2018-12-26 14:16:46 +00:00
2019-03-07 22:43:48 +00:00
log . Debug ( "Processing batched block response" )
2018-12-26 14:16:46 +00:00
for _ , block := range batchedBlocks {
2019-02-28 03:55:47 +00:00
s . processBlock ( ctx , block , msg . Peer )
2018-12-26 14:16:46 +00:00
}
log . Debug ( "Finished processing batched blocks" )
2018-08-22 01:52:03 +00:00
}
2019-03-05 17:31:19 +00:00
func ( s * InitialSync ) processState ( msg p2p . Message ) {
2019-03-27 16:15:29 +00:00
ctx , span := trace . StartSpan ( msg . Ctx , "beacon-chain.sync.initial-sync.processState" )
2019-03-05 17:31:19 +00:00
defer span . End ( )
data := msg . Data . ( * pb . BeaconStateResponse )
2019-03-29 21:51:58 +00:00
beaconState := data . BeaconState
2019-03-05 17:31:19 +00:00
recState . Inc ( )
2019-03-29 21:51:58 +00:00
if s . currentSlot > beaconState . FinalizedEpoch * params . BeaconConfig ( ) . SlotsPerEpoch {
2019-03-05 17:31:19 +00:00
return
}
2019-03-29 21:51:58 +00:00
if err := s . db . SaveCurrentAndFinalizedState ( beaconState ) ; err != nil {
log . Errorf ( "Unable to set beacon state for initial sync %v" , err )
2019-03-25 15:21:21 +00:00
return
}
2019-03-29 21:51:58 +00:00
if err := s . db . SaveFinalizedBlock ( beaconState . LatestBlock ) ; err != nil {
2019-03-25 15:21:21 +00:00
log . Errorf ( "Could not save finalized block %v" , err )
return
2019-03-05 17:31:19 +00:00
}
2019-03-29 21:51:58 +00:00
if err := s . db . SaveBlock ( beaconState . LatestBlock ) ; err != nil {
log . Errorf ( "Could not save block %v" , err )
return
}
if err := s . db . UpdateChainHead ( beaconState . LatestBlock , beaconState ) ; err != nil {
log . Errorf ( "Could not update chainhead %v" , err )
return
}
if err := s . db . SaveJustifiedState ( beaconState ) ; err != nil {
2019-03-25 18:23:57 +00:00
log . Errorf ( "Could not set beacon state for initial sync %v" , err )
return
}
2019-03-29 21:51:58 +00:00
if err := s . db . SaveJustifiedBlock ( beaconState . LatestBlock ) ; err != nil {
2019-03-25 18:23:57 +00:00
log . Errorf ( "Could not save finalized block %v" , err )
return
}
2019-03-29 21:51:58 +00:00
h , err := hashutil . HashProto ( beaconState )
2019-03-05 17:31:19 +00:00
if err != nil {
log . Error ( err )
return
}
2019-03-29 21:51:58 +00:00
exists , blkNum , err := s . powchain . BlockExists ( ctx , bytesutil . ToBytes32 ( beaconState . LatestEth1Data . BlockHash32 ) )
2019-03-27 16:15:29 +00:00
if err != nil {
log . Errorf ( "Unable to get powchain block %v" , err )
}
if ! exists {
log . Error ( "Latest ETH1 block doesn't exist in the pow chain" )
return
}
s . db . PrunePendingDeposits ( ctx , blkNum )
2019-03-05 17:31:19 +00:00
if h == s . stateRootOfHighestObservedSlot {
s . reqState = false
}
// sets the current slot to the last finalized slot of the
// beacon state to begin our sync from.
2019-03-29 21:51:58 +00:00
s . currentSlot = beaconState . FinalizedEpoch * params . BeaconConfig ( ) . SlotsPerEpoch
s . beaconStateSlot = beaconState . Slot
log . Debugf ( "Successfully saved beacon state with the last finalized slot: %d" , beaconState . FinalizedEpoch * params . BeaconConfig ( ) . SlotsPerEpoch - params . BeaconConfig ( ) . GenesisSlot )
2019-03-05 17:31:19 +00:00
s . requestBatchedBlocks ( s . currentSlot + 1 , s . highestObservedSlot )
}
2019-03-29 21:51:58 +00:00
// requestStateFromPeer always requests for the last finalized slot from a peer.
2019-03-08 02:37:15 +00:00
func ( s * InitialSync ) requestStateFromPeer ( ctx context . Context , stateRoot [ ] byte , peerID peer . ID ) error {
ctx , span := trace . StartSpan ( ctx , "beacon-chain.sync.initial-sync.requestStateFromPeer" )
2019-03-05 17:31:19 +00:00
defer span . End ( )
stateReq . Inc ( )
2019-03-29 21:51:58 +00:00
log . Debugf ( "Successfully processed incoming block with state hash: %#x" , stateRoot )
2019-03-12 20:44:21 +00:00
return s . p2p . Send ( ctx , & pb . BeaconStateRequest { FinalizedStateRootHash32S : stateRoot } , peerID )
2018-08-22 01:52:03 +00:00
}
2018-11-21 18:00:36 +00:00
// requestNextBlock broadcasts a request for a block with the entered slotnumber.
2019-02-28 03:55:47 +00:00
func ( s * InitialSync ) requestNextBlockBySlot ( ctx context . Context , slotNumber uint64 ) {
2019-03-05 17:31:19 +00:00
ctx , span := trace . StartSpan ( ctx , "beacon-chain.sync.initial-sync.requestBlockBySlot" )
defer span . End ( )
2018-12-01 22:09:12 +00:00
log . Debugf ( "Requesting block %d " , slotNumber )
2019-03-05 17:31:19 +00:00
blockReqSlot . Inc ( )
2019-02-26 05:37:28 +00:00
s . mutex . Lock ( )
defer s . mutex . Unlock ( )
2018-12-26 14:16:46 +00:00
if block , ok := s . inMemoryBlocks [ slotNumber ] ; ok {
2019-03-08 02:37:15 +00:00
s . processBlock ( ctx , block , p2p . AnyPeer )
2018-11-21 18:00:36 +00:00
return
}
2019-03-17 02:56:05 +00:00
s . p2p . Broadcast ( ctx , & pb . BeaconBlockRequestBySlotNumber { SlotNumber : slotNumber } )
2018-11-21 18:00:36 +00:00
}
2018-12-26 14:16:46 +00:00
// requestBatchedBlocks sends out a request for multiple blocks till a
2018-11-21 18:00:36 +00:00
// specified bound slot number.
2019-02-26 05:37:28 +00:00
func ( s * InitialSync ) requestBatchedBlocks ( startSlot uint64 , endSlot uint64 ) {
2019-03-17 02:56:05 +00:00
ctx , span := trace . StartSpan ( context . Background ( ) , "beacon-chain.sync.initial-sync.requestBatchedBlocks" )
2019-03-05 17:31:19 +00:00
defer span . End ( )
sentBatchedBlockReq . Inc ( )
2019-03-07 22:43:48 +00:00
if startSlot > endSlot {
2019-03-29 21:51:58 +00:00
log . Debugf ( "Invalid batched request from slot %d to %d" , startSlot - params . BeaconConfig ( ) . GenesisSlot , endSlot - params . BeaconConfig ( ) . GenesisSlot )
2019-03-07 22:43:48 +00:00
return
}
2019-02-26 05:37:28 +00:00
blockLimit := params . BeaconConfig ( ) . BatchBlockLimit
if startSlot + blockLimit < endSlot {
endSlot = startSlot + blockLimit
}
2019-03-29 21:51:58 +00:00
log . Debugf ( "Requesting batched blocks from slot %d to %d" , startSlot - params . BeaconConfig ( ) . GenesisSlot , endSlot - params . BeaconConfig ( ) . GenesisSlot )
2019-03-17 02:56:05 +00:00
s . p2p . Broadcast ( ctx , & pb . BatchedBeaconBlockRequest {
2019-02-26 05:37:28 +00:00
StartSlot : startSlot ,
2018-12-26 14:16:46 +00:00
EndSlot : endSlot ,
} )
2018-08-22 01:52:03 +00:00
}
// validateAndSaveNextBlock will validate whether blocks received from the blockfetcher
// routine can be added to the chain.
2019-02-28 03:55:47 +00:00
func ( s * InitialSync ) validateAndSaveNextBlock ( ctx context . Context , block * pb . BeaconBlock ) error {
2019-03-05 17:31:19 +00:00
ctx , span := trace . StartSpan ( ctx , "beacon-chain.sync.initial-sync.validateAndSaveNextBlock" )
defer span . End ( )
2019-02-26 03:42:31 +00:00
root , err := hashutil . HashBeaconBlock ( block )
2018-11-21 18:00:36 +00:00
if err != nil {
return err
}
2019-03-08 16:09:24 +00:00
if err := s . checkBlockValidity ( ctx , block ) ; err != nil {
return err
}
log . Infof ( "Saved block with root %#x and slot %d for initial sync" , root , block . Slot )
s . currentSlot = block . Slot
2018-08-22 01:52:03 +00:00
2019-03-08 16:09:24 +00:00
s . mutex . Lock ( )
defer s . mutex . Unlock ( )
// delete block from memory.
if _ , ok := s . inMemoryBlocks [ block . Slot ] ; ok {
delete ( s . inMemoryBlocks , block . Slot )
}
// since the block will not be processed by chainservice we save
// the block and do not send it to chainservice.
2019-03-29 21:51:58 +00:00
if s . beaconStateSlot >= block . Slot {
if err := s . db . SaveBlock ( block ) ; err != nil {
return err
}
return nil
}
// Send block to main chain service to be processed.
beaconState , err := s . chainService . ReceiveBlock ( ctx , block )
if err != nil {
return fmt . Errorf ( "could not process beacon block: %v" , err )
}
if err := s . chainService . ApplyForkChoiceRule ( ctx , block , beaconState ) ; err != nil {
return fmt . Errorf ( "could not apply fork choice rule: %v" , err )
}
return nil
2018-12-26 14:16:46 +00:00
}
2019-02-28 03:55:47 +00:00
func ( s * InitialSync ) checkBlockValidity ( ctx context . Context , block * pb . BeaconBlock ) error {
2019-03-05 17:31:19 +00:00
ctx , span := trace . StartSpan ( ctx , "beacon-chain.sync.initial-sync.checkBlockValidity" )
defer span . End ( )
2019-02-26 03:42:31 +00:00
blockRoot , err := hashutil . HashBeaconBlock ( block )
2018-12-26 14:16:46 +00:00
if err != nil {
2019-02-14 20:04:47 +00:00
return fmt . Errorf ( "could not tree hash received block: %v" , err )
2018-08-22 01:52:03 +00:00
}
2018-12-26 14:16:46 +00:00
2019-02-14 20:04:47 +00:00
if s . db . HasBlock ( blockRoot ) {
2019-03-11 22:07:16 +00:00
return errors . New ( debugError + "received a block that already exists. Exiting" )
2018-12-26 14:16:46 +00:00
}
2019-02-28 03:55:47 +00:00
beaconState , err := s . db . State ( ctx )
2018-12-26 14:16:46 +00:00
if err != nil {
return fmt . Errorf ( "failed to get beacon state: %v" , err )
}
2019-02-18 16:52:16 +00:00
if block . Slot < beaconState . FinalizedEpoch * params . BeaconConfig ( ) . SlotsPerEpoch {
2019-03-11 22:07:16 +00:00
return errors . New ( debugError + "discarding received block with a slot number smaller than the last finalized slot" )
2018-12-26 14:16:46 +00:00
}
// Attestation from proposer not verified as, other nodes only store blocks not proposer
// attestations.
2018-08-22 01:52:03 +00:00
return nil
}
2019-03-05 16:59:49 +00:00
2019-03-29 21:51:58 +00:00
// isSlotDiff large checks if the difference between the current slot and highest observed
// slot isnt too large.
func ( s * InitialSync ) isSlotDiffLarge ( ) bool {
slotsPerEpoch := params . BeaconConfig ( ) . SlotsPerEpoch
epochLimit := params . BeaconConfig ( ) . SyncEpochLimit
return s . currentSlot + slotsPerEpoch * epochLimit < s . highestObservedSlot
}
2019-03-08 16:09:24 +00:00
func ( s * InitialSync ) doesParentExist ( block * pb . BeaconBlock ) bool {
parentHash := bytesutil . ToBytes32 ( block . ParentRootHash32 )
return s . db . HasBlock ( parentHash )
}