From 7d9d8454b169beec67d71ec5122ce26efc732b20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Kapka?= Date: Mon, 9 May 2022 15:57:34 +0200 Subject: [PATCH] Improvements to `powchain` package (#10652) Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> --- beacon-chain/powchain/block_reader.go | 51 +++++---------- beacon-chain/powchain/block_reader_test.go | 55 ++-------------- beacon-chain/powchain/errors.go | 2 - beacon-chain/powchain/log_processing.go | 41 ++++++------ beacon-chain/powchain/log_processing_test.go | 2 +- beacon-chain/powchain/service.go | 64 +++++++++---------- beacon-chain/powchain/service_test.go | 4 +- .../powchain/testing/mock_faulty_powchain.go | 5 -- .../powchain/testing/mock_powchain.go | 5 -- 9 files changed, 74 insertions(+), 155 deletions(-) diff --git a/beacon-chain/powchain/block_reader.go b/beacon-chain/powchain/block_reader.go index d317f815f..6ce9cda9f 100644 --- a/beacon-chain/powchain/block_reader.go +++ b/beacon-chain/powchain/block_reader.go @@ -22,7 +22,7 @@ const repeatedSearches = 2 * searchThreshold // BlockExists returns true if the block exists, its height and any possible error encountered. func (s *Service) BlockExists(ctx context.Context, hash common.Hash) (bool, *big.Int, error) { - ctx, span := trace.StartSpan(ctx, "beacon-chain.web3service.BlockExists") + ctx, span := trace.StartSpan(ctx, "powchain.BlockExists") defer span.End() if exists, hdrInfo, err := s.headerCache.HeaderInfoByHash(hash); exists || err != nil { @@ -45,24 +45,9 @@ func (s *Service) BlockExists(ctx context.Context, hash common.Hash) (bool, *big return true, new(big.Int).Set(header.Number), nil } -// BlockExistsWithCache returns true if the block exists in cache, its height and any possible error encountered. -func (s *Service) BlockExistsWithCache(ctx context.Context, hash common.Hash) (bool, *big.Int, error) { - _, span := trace.StartSpan(ctx, "beacon-chain.web3service.BlockExistsWithCache") - defer span.End() - if exists, hdrInfo, err := s.headerCache.HeaderInfoByHash(hash); exists || err != nil { - if err != nil { - return false, nil, err - } - span.AddAttributes(trace.BoolAttribute("blockCacheHit", true)) - return true, hdrInfo.Number, nil - } - span.AddAttributes(trace.BoolAttribute("blockCacheHit", false)) - return false, nil, nil -} - // BlockHashByHeight returns the block hash of the block at the given height. func (s *Service) BlockHashByHeight(ctx context.Context, height *big.Int) (common.Hash, error) { - ctx, span := trace.StartSpan(ctx, "beacon-chain.web3service.BlockHashByHeight") + ctx, span := trace.StartSpan(ctx, "powchain.BlockHashByHeight") defer span.End() if exists, hInfo, err := s.headerCache.HeaderInfoByHeight(height); exists || err != nil { @@ -90,9 +75,9 @@ func (s *Service) BlockHashByHeight(ctx context.Context, height *big.Int) (commo return header.Hash(), nil } -// BlockTimeByHeight fetches an eth1.0 block timestamp by its height. +// BlockTimeByHeight fetches an eth1 block timestamp by its height. func (s *Service) BlockTimeByHeight(ctx context.Context, height *big.Int) (uint64, error) { - ctx, span := trace.StartSpan(ctx, "beacon-chain.web3service.BlockTimeByHeight") + ctx, span := trace.StartSpan(ctx, "powchain.BlockTimeByHeight") defer span.End() if s.eth1DataFetcher == nil { err := errors.New("nil eth1DataFetcher") @@ -111,7 +96,7 @@ func (s *Service) BlockTimeByHeight(ctx context.Context, height *big.Int) (uint6 // This is an optimized version with the worst case being O(2*repeatedSearches) number of calls // while in best case search for the block is performed in O(1). func (s *Service) BlockByTimestamp(ctx context.Context, time uint64) (*types.HeaderInfo, error) { - ctx, span := trace.StartSpan(ctx, "beacon-chain.web3service.BlockByTimestamp") + ctx, span := trace.StartSpan(ctx, "powchain.BlockByTimestamp") defer span.End() s.latestEth1DataLock.RLock() @@ -122,15 +107,14 @@ func (s *Service) BlockByTimestamp(ctx context.Context, time uint64) (*types.Hea if time > latestBlkTime { return nil, errors.Errorf("provided time is later than the current eth1 head. %d > %d", time, latestBlkTime) } - // Initialize a pointer to eth1 chain's history to start our search - // from. + // Initialize a pointer to eth1 chain's history to start our search from. cursorNum := big.NewInt(0).SetUint64(latestBlkHeight) cursorTime := latestBlkTime numOfBlocks := uint64(0) estimatedBlk := cursorNum.Uint64() maxTimeBuffer := searchThreshold * params.BeaconConfig().SecondsPerETH1Block - // Terminate if we cant find an acceptable block after + // Terminate if we can't find an acceptable block after // repeated searches. for i := 0; i < repeatedSearches; i++ { if ctx.Err() != nil { @@ -157,12 +141,12 @@ func (s *Service) BlockByTimestamp(ctx context.Context, time uint64) (*types.Hea // time - buffer <= head.time <= time + buffer break } - hinfo, err := s.retrieveHeaderInfo(ctx, estimatedBlk) + hInfo, err := s.retrieveHeaderInfo(ctx, estimatedBlk) if err != nil { return nil, err } - cursorNum = hinfo.Number - cursorTime = hinfo.Time + cursorNum = hInfo.Number + cursorTime = hInfo.Time } // Exit early if we get the desired block. @@ -170,15 +154,15 @@ func (s *Service) BlockByTimestamp(ctx context.Context, time uint64) (*types.Hea return s.retrieveHeaderInfo(ctx, cursorNum.Uint64()) } if cursorTime > time { - return s.findLessTargetEth1Block(ctx, big.NewInt(0).SetUint64(estimatedBlk), time) + return s.findMaxTargetEth1Block(ctx, big.NewInt(0).SetUint64(estimatedBlk), time) } - return s.findMoreTargetEth1Block(ctx, big.NewInt(0).SetUint64(estimatedBlk), time) + return s.findMinTargetEth1Block(ctx, big.NewInt(0).SetUint64(estimatedBlk), time) } // Performs a search to find a target eth1 block which is earlier than or equal to the // target time. This method is used when head.time > targetTime -func (s *Service) findLessTargetEth1Block(ctx context.Context, startBlk *big.Int, targetTime uint64) (*types.HeaderInfo, error) { - for bn := startBlk; ; bn = big.NewInt(0).Sub(bn, big.NewInt(1)) { +func (s *Service) findMaxTargetEth1Block(ctx context.Context, upperBoundBlk *big.Int, targetTime uint64) (*types.HeaderInfo, error) { + for bn := upperBoundBlk; ; bn = big.NewInt(0).Sub(bn, big.NewInt(1)) { if ctx.Err() != nil { return nil, ctx.Err() } @@ -194,8 +178,8 @@ func (s *Service) findLessTargetEth1Block(ctx context.Context, startBlk *big.Int // Performs a search to find a target eth1 block which is just earlier than or equal to the // target time. This method is used when head.time < targetTime -func (s *Service) findMoreTargetEth1Block(ctx context.Context, startBlk *big.Int, targetTime uint64) (*types.HeaderInfo, error) { - for bn := startBlk; ; bn = big.NewInt(0).Add(bn, big.NewInt(1)) { +func (s *Service) findMinTargetEth1Block(ctx context.Context, lowerBoundBlk *big.Int, targetTime uint64) (*types.HeaderInfo, error) { + for bn := lowerBoundBlk; ; bn = big.NewInt(0).Add(bn, big.NewInt(1)) { if ctx.Err() != nil { return nil, ctx.Err() } @@ -203,8 +187,7 @@ func (s *Service) findMoreTargetEth1Block(ctx context.Context, startBlk *big.Int if err != nil { return nil, err } - // Return the last block before we hit the threshold - // time. + // Return the last block before we hit the threshold time. if info.Time > targetTime { return s.retrieveHeaderInfo(ctx, info.Number.Uint64()-1) } diff --git a/beacon-chain/powchain/block_reader_test.go b/beacon-chain/powchain/block_reader_test.go index 848e86f8b..ea1add3f2 100644 --- a/beacon-chain/powchain/block_reader_test.go +++ b/beacon-chain/powchain/block_reader_test.go @@ -61,7 +61,7 @@ func TestLatestMainchainInfo_OK(t *testing.T) { require.NoError(t, err) tickerChan := make(chan time.Time) - web3Service.headTicker = &time.Ticker{C: tickerChan} + web3Service.eth1HeadTicker = &time.Ticker{C: tickerChan} tickerChan <- time.Now() web3Service.cancel() exitRoutine <- true @@ -207,51 +207,6 @@ func TestBlockExists_UsesCachedBlockInfo(t *testing.T) { require.Equal(t, 0, height.Cmp(header.Number)) } -func TestBlockExistsWithCache_UsesCachedHeaderInfo(t *testing.T) { - beaconDB := dbutil.SetupDB(t) - server, endpoint, err := mockPOW.SetupRPCServer() - require.NoError(t, err) - t.Cleanup(func() { - server.Stop() - }) - web3Service, err := NewService(context.Background(), - WithHttpEndpoints([]string{endpoint}), - WithDatabase(beaconDB), - ) - require.NoError(t, err, "unable to setup web3 ETH1.0 chain service") - - header := &gethTypes.Header{ - Number: big.NewInt(0), - } - - err = web3Service.headerCache.AddHeader(header) - require.NoError(t, err) - - exists, height, err := web3Service.BlockExistsWithCache(context.Background(), header.Hash()) - require.NoError(t, err, "Could not get block hash with given height") - require.Equal(t, true, exists) - require.Equal(t, 0, height.Cmp(header.Number)) -} - -func TestBlockExistsWithCache_HeaderNotCached(t *testing.T) { - beaconDB := dbutil.SetupDB(t) - server, endpoint, err := mockPOW.SetupRPCServer() - require.NoError(t, err) - t.Cleanup(func() { - server.Stop() - }) - web3Service, err := NewService(context.Background(), - WithHttpEndpoints([]string{endpoint}), - WithDatabase(beaconDB), - ) - require.NoError(t, err, "unable to setup web3 ETH1.0 chain service") - - exists, height, err := web3Service.BlockExistsWithCache(context.Background(), common.BytesToHash([]byte("hash"))) - require.NoError(t, err, "Could not get block hash with given height") - require.Equal(t, false, exists) - require.Equal(t, (*big.Int)(nil), height) -} - func TestService_BlockNumberByTimestamp(t *testing.T) { beaconDB := dbutil.SetupDB(t) testAcc, err := mock.Setup() @@ -313,11 +268,11 @@ func TestService_BlockNumberByTimestampLessTargetTime(t *testing.T) { defer cancel() // Provide an unattainable target time - _, err = web3Service.findLessTargetEth1Block(ctx, hd.Number, hd.Time/2) + _, err = web3Service.findMaxTargetEth1Block(ctx, hd.Number, hd.Time/2) require.ErrorContains(t, context.DeadlineExceeded.Error(), err) // Provide an attainable target time - blk, err := web3Service.findLessTargetEth1Block(context.Background(), hd.Number, hd.Time-5) + blk, err := web3Service.findMaxTargetEth1Block(context.Background(), hd.Number, hd.Time-5) require.NoError(t, err) require.NotEqual(t, hd.Number.Uint64(), blk.Number.Uint64(), "retrieved block is not less than the head") } @@ -351,11 +306,11 @@ func TestService_BlockNumberByTimestampMoreTargetTime(t *testing.T) { defer cancel() // Provide an unattainable target time with respect to head - _, err = web3Service.findMoreTargetEth1Block(ctx, big.NewInt(0).Div(hd.Number, big.NewInt(2)), hd.Time) + _, err = web3Service.findMinTargetEth1Block(ctx, big.NewInt(0).Div(hd.Number, big.NewInt(2)), hd.Time) require.ErrorContains(t, context.DeadlineExceeded.Error(), err) // Provide an attainable target time with respect to head - blk, err := web3Service.findMoreTargetEth1Block(context.Background(), big.NewInt(0).Sub(hd.Number, big.NewInt(5)), hd.Time) + blk, err := web3Service.findMinTargetEth1Block(context.Background(), big.NewInt(0).Sub(hd.Number, big.NewInt(5)), hd.Time) require.NoError(t, err) require.Equal(t, hd.Number.Uint64(), blk.Number.Uint64(), "retrieved block is not equal to the head") } diff --git a/beacon-chain/powchain/errors.go b/beacon-chain/powchain/errors.go index f9b83929c..a83551500 100644 --- a/beacon-chain/powchain/errors.go +++ b/beacon-chain/powchain/errors.go @@ -19,8 +19,6 @@ var ( ErrUnknownPayload = errors.New("payload does not exist or is not available") // ErrUnknownPayloadStatus when the payload status is unknown. ErrUnknownPayloadStatus = errors.New("unknown payload status") - // ErrUnsupportedScheme for unsupported URL schemes. - ErrUnsupportedScheme = errors.New("unsupported url scheme, only http(s) and ipc are supported") // ErrConfigMismatch when the execution node's terminal total difficulty or // terminal block hash received via the API mismatches Prysm's configuration value. ErrConfigMismatch = errors.New("execution client configuration mismatch") diff --git a/beacon-chain/powchain/log_processing.go b/beacon-chain/powchain/log_processing.go index 160bc41c8..93d894547 100644 --- a/beacon-chain/powchain/log_processing.go +++ b/beacon-chain/powchain/log_processing.go @@ -36,7 +36,7 @@ var ( const eth1DataSavingInterval = 1000 const maxTolerableDifference = 50 const defaultEth1HeaderReqLimit = uint64(1000) -const depositlogRequestLimit = 10000 +const depositLogRequestLimit = 10000 const additiveFactorMultiplier = 0.10 const multiplicativeDecreaseDivisor = 2 @@ -57,7 +57,7 @@ func (s *Service) Eth2GenesisPowchainInfo() (uint64, *big.Int) { return s.chainStartData.GenesisTime, big.NewInt(int64(s.chainStartData.GenesisBlock)) } -// ProcessETH1Block processes the logs from the provided eth1Block. +// ProcessETH1Block processes logs from the provided eth1 block. func (s *Service) ProcessETH1Block(ctx context.Context, blkNum *big.Int) error { query := ethereum.FilterQuery{ Addresses: []common.Address{ @@ -80,7 +80,7 @@ func (s *Service) ProcessETH1Block(ctx context.Context, blkNum *big.Int) error { } } if !s.chainStartData.Chainstarted { - if err := s.checkBlockNumberForChainStart(ctx, blkNum); err != nil { + if err := s.processChainStartFromBlockNum(ctx, blkNum); err != nil { return err } } @@ -88,7 +88,7 @@ func (s *Service) ProcessETH1Block(ctx context.Context, blkNum *big.Int) error { } // ProcessLog is the main method which handles the processing of all -// logs from the deposit contract on the ETH1.0 chain. +// logs from the deposit contract on the eth1 chain. func (s *Service) ProcessLog(ctx context.Context, depositLog gethTypes.Log) error { s.processingLock.RLock() defer s.processingLock.RUnlock() @@ -107,7 +107,7 @@ func (s *Service) ProcessLog(ctx context.Context, depositLog gethTypes.Log) erro } // ProcessDepositLog processes the log which had been received from -// the ETH1.0 chain by trying to ascertain which participant deposited +// the eth1 chain by trying to ascertain which participant deposited // in the contract. func (s *Service) ProcessDepositLog(ctx context.Context, depositLog gethTypes.Log) error { pubkey, withdrawalCredentials, amount, signature, merkleTreeIndex, err := contracts.UnpackDepositLogData(depositLog.Data) @@ -215,7 +215,7 @@ func (s *Service) ProcessDepositLog(ctx context.Context, depositLog gethTypes.Lo } // ProcessChainStart processes the log which had been received from -// the ETH1.0 chain by trying to determine when to start the beacon chain. +// the eth1 chain by trying to determine when to start the beacon chain. func (s *Service) ProcessChainStart(genesisTime uint64, eth1BlockHash [32]byte, blockNumber *big.Int) { s.chainStartData.Chainstarted = true s.chainStartData.GenesisBlock = blockNumber.Uint64() @@ -247,15 +247,15 @@ func (s *Service) ProcessChainStart(genesisTime uint64, eth1BlockHash [32]byte, }, }) if err := s.savePowchainData(s.ctx); err != nil { - // continue on, if the save fails as this will get re-saved + // continue on if the save fails as this will get re-saved // in the next interval. log.Error(err) } } +// createGenesisTime adds in the genesis delay to the eth1 block time +// on which it was triggered. func createGenesisTime(timeStamp uint64) uint64 { - // adds in the genesis delay to the eth1 block time - // on which it was triggered. return timeStamp + params.BeaconConfig().GenesisDelay } @@ -292,7 +292,7 @@ func (s *Service) processPastLogs(ctx context.Context) error { } return nil } - latestFollowHeight, err := s.followBlockHeight(ctx) + latestFollowHeight, err := s.followedBlockHeight(ctx) if err != nil { return err } @@ -318,7 +318,7 @@ func (s *Service) processPastLogs(ctx context.Context) error { remainingLogs := logCount - uint64(s.lastReceivedMerkleIndex+1) // only change the end block if the remaining logs are below the required log limit. // reset our query and end block in this case. - withinLimit := remainingLogs < depositlogRequestLimit + withinLimit := remainingLogs < depositLogRequestLimit aboveFollowHeight := end >= latestFollowHeight if withinLimit && aboveFollowHeight { query.ToBlock = big.NewInt(0).SetUint64(latestFollowHeight) @@ -413,9 +413,9 @@ func (s *Service) processPastLogs(ctx context.Context) error { // logs from the period last polled to now. func (s *Service) requestBatchedHeadersAndLogs(ctx context.Context) error { // We request for the nth block behind the current head, in order to have - // stabilized logs when we retrieve it from the 1.0 chain. + // stabilized logs when we retrieve it from the eth1 chain. - requestedBlock, err := s.followBlockHeight(ctx) + requestedBlock, err := s.followedBlockHeight(ctx) if err != nil { return err } @@ -457,18 +457,17 @@ func (s *Service) retrieveBlockHashAndTime(ctx context.Context, blkNum *big.Int) return bHash, timeStamp, nil } -// checkBlockNumberForChainStart checks the given block number for if chainstart has occurred. -func (s *Service) checkBlockNumberForChainStart(ctx context.Context, blkNum *big.Int) error { +func (s *Service) processChainStartFromBlockNum(ctx context.Context, blkNum *big.Int) error { bHash, timeStamp, err := s.retrieveBlockHashAndTime(ctx, blkNum) if err != nil { return err } - s.checkForChainstart(ctx, bHash, blkNum, timeStamp) + s.processChainStartIfReady(ctx, bHash, blkNum, timeStamp) return nil } -func (s *Service) checkHeaderForChainstart(ctx context.Context, header *gethTypes.Header) { - s.checkForChainstart(ctx, header.Hash(), header.Number, header.Time) +func (s *Service) processChainStartFromHeader(ctx context.Context, header *gethTypes.Header) { + s.processChainStartIfReady(ctx, header.Hash(), header.Number, header.Time) } func (s *Service) checkHeaderRange(ctx context.Context, start, end uint64, headersMap map[uint64]*gethTypes.Header, @@ -484,7 +483,7 @@ func (s *Service) checkHeaderRange(ctx context.Context, start, end uint64, heade i-- continue } - s.checkHeaderForChainstart(ctx, h) + s.processChainStartFromHeader(ctx, h) } } return nil @@ -504,7 +503,7 @@ func (s *Service) currentCountAndTime(ctx context.Context, blockTime uint64) (ui return valCount, createGenesisTime(blockTime) } -func (s *Service) checkForChainstart(ctx context.Context, blockHash [32]byte, blockNumber *big.Int, blockTime uint64) { +func (s *Service) processChainStartIfReady(ctx context.Context, blockHash [32]byte, blockNumber *big.Int, blockTime uint64) { valCount, genesisTime := s.currentCountAndTime(ctx, blockTime) if valCount == 0 { return @@ -516,7 +515,7 @@ func (s *Service) checkForChainstart(ctx context.Context, blockHash [32]byte, bl } } -// save all powchain related metadata to disk. +// savePowchainData saves all powchain related metadata to disk. func (s *Service) savePowchainData(ctx context.Context) error { var pbState *ethpb.BeaconState var err error diff --git a/beacon-chain/powchain/log_processing_test.go b/beacon-chain/powchain/log_processing_test.go index 05f711f62..4f6c3c267 100644 --- a/beacon-chain/powchain/log_processing_test.go +++ b/beacon-chain/powchain/log_processing_test.go @@ -589,7 +589,7 @@ func TestCheckForChainstart_NoValidator(t *testing.T) { require.NoError(t, err, "Unable to set up simulated backend") beaconDB := testDB.SetupDB(t) s := newPowchainService(t, testAcc, beaconDB) - s.checkForChainstart(context.Background(), [32]byte{}, nil, 0) + s.processChainStartIfReady(context.Background(), [32]byte{}, nil, 0) require.LogsDoNotContain(t, hook, "Could not determine active validator count from pre genesis state") } diff --git a/beacon-chain/powchain/service.go b/beacon-chain/powchain/service.go index 4ea77d2e3..1c62c9a7f 100644 --- a/beacon-chain/powchain/service.go +++ b/beacon-chain/powchain/service.go @@ -13,7 +13,6 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -98,7 +97,6 @@ type POWBlockFetcher interface { BlockByTimestamp(ctx context.Context, time uint64) (*types.HeaderInfo, error) BlockHashByHeight(ctx context.Context, height *big.Int) (common.Hash, error) BlockExists(ctx context.Context, hash common.Hash) (bool, *big.Int, error) - BlockExistsWithCache(ctx context.Context, hash common.Hash) (bool, *big.Int, error) } // Chain defines a standard interface for the powchain service in Prysm. @@ -114,7 +112,6 @@ type RPCDataFetcher interface { Close() HeaderByNumber(ctx context.Context, number *big.Int) (*gethTypes.Header, error) HeaderByHash(ctx context.Context, hash common.Hash) (*gethTypes.Header, error) - SyncProgress(ctx context.Context) (*ethereum.SyncProgress, error) } // RPCClient defines the rpc methods required to interact with the eth1 node. @@ -139,10 +136,10 @@ type config struct { } // Service fetches important information about the canonical -// Ethereum ETH1.0 chain via a web3 endpoint using an ethclient. The Random -// Beacon Chain requires synchronization with the ETH1.0 chain's current -// blockhash, block number, and access to logs within the -// Validator Registration Contract on the ETH1.0 chain to kick off the beacon +// eth1 chain via a web3 endpoint using an ethclient. +// The beacon chain requires synchronization with the eth1 chain's current +// block hash, block number, and access to logs within the +// Validator Registration Contract on the eth1 chain to kick off the beacon // chain's validator registration process. type Service struct { connectedETH1 bool @@ -152,7 +149,7 @@ type Service struct { cfg *config ctx context.Context cancel context.CancelFunc - headTicker *time.Ticker + eth1HeadTicker *time.Ticker httpLogger bind.ContractFilterer eth1DataFetcher RPCDataFetcher rpcClient RPCClient @@ -173,11 +170,11 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) { depositTrie, err := trie.NewTrie(params.BeaconConfig().DepositContractTreeDepth) if err != nil { cancel() - return nil, errors.Wrap(err, "could not setup deposit trie") + return nil, errors.Wrap(err, "could not set up deposit trie") } genState, err := transition.EmptyGenesisState() if err != nil { - return nil, errors.Wrap(err, "could not setup genesis state") + return nil, errors.Wrap(err, "could not set up genesis state") } s := &Service{ @@ -201,7 +198,7 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) { }, lastReceivedMerkleIndex: -1, preGenesisState: genState, - headTicker: time.NewTicker(time.Duration(params.BeaconConfig().SecondsPerETH1Block) * time.Second), + eth1HeadTicker: time.NewTicker(time.Duration(params.BeaconConfig().SecondsPerETH1Block) * time.Second), } for _, opt := range opts { @@ -225,7 +222,7 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) { return s, nil } -// Start a web3 service's main event loop. +// Start the powchain service's main event loop. func (s *Service) Start() { if err := s.setupExecutionClientConnections(s.ctx, s.cfg.currHttpEndpoint); err != nil { log.WithError(err).Error("Could not connect to execution endpoint") @@ -372,7 +369,7 @@ func (s *Service) ETH1ConnectionErrors() []error { // refers to the latest eth1 block which follows the condition: eth1_timestamp + // SECONDS_PER_ETH1_BLOCK * ETH1_FOLLOW_DISTANCE <= current_unix_time -func (s *Service) followBlockHeight(_ context.Context) (uint64, error) { +func (s *Service) followedBlockHeight(_ context.Context) (uint64, error) { latestValidBlock := uint64(0) if s.latestEth1Data.BlockHeight > params.BeaconConfig().Eth1FollowDistance { latestValidBlock = s.latestEth1Data.BlockHeight - params.BeaconConfig().Eth1FollowDistance @@ -386,8 +383,7 @@ func (s *Service) initDepositCaches(ctx context.Context, ctrs []*ethpb.DepositCo } s.cfg.depositCache.InsertDepositContainers(ctx, ctrs) if !s.chainStartData.Chainstarted { - // do not add to pending cache - // if no genesis state exists. + // Do not add to pending cache if no genesis state exists. validDepositsCount.Add(float64(s.preGenesisState.Eth1DepositIndex())) return nil } @@ -395,7 +391,7 @@ func (s *Service) initDepositCaches(ctx context.Context, ctrs []*ethpb.DepositCo if err != nil { return err } - // Default to all deposits post-genesis deposits in + // Default to all post-genesis deposits in // the event we cannot find a finalized state. currIndex := genesisState.Eth1DepositIndex() chkPt, err := s.cfg.beaconDB.FinalizedCheckpoint(ctx) @@ -411,17 +407,17 @@ func (s *Service) initDepositCaches(ctx context.Context, ctrs []*ethpb.DepositCo // Set deposit index to the one in the current archived state. currIndex = fState.Eth1DepositIndex() - // when a node pauses for some time and starts again, the deposits to finalize - // accumulates. we finalize them here before we are ready to receive a block. + // When a node pauses for some time and starts again, the deposits to finalize + // accumulates. We finalize them here before we are ready to receive a block. // Otherwise, the first few blocks will be slower to compute as we will // hold the lock and be busy finalizing the deposits. // The deposit index in the state is always the index of the next deposit - // to be included(rather than the last one to be processed). This was most likely + // to be included (rather than the last one to be processed). This was most likely // done as the state cannot represent signed integers. actualIndex := int64(currIndex) - 1 // lint:ignore uintcast -- deposit index will not exceed int64 in your lifetime. s.cfg.depositCache.InsertFinalizedDeposits(ctx, actualIndex) - // Deposit proofs are only used during state transition and can be safely removed to save space. + // Deposit proofs are only used during state transition and can be safely removed to save space. if err = s.cfg.depositCache.PruneProofs(ctx, actualIndex); err != nil { return errors.Wrap(err, "could not prune deposit proofs") } @@ -498,8 +494,7 @@ func (s *Service) batchRequestHeaders(startBlock, endBlock uint64) ([]*gethTypes return headers, nil } -// safelyHandleHeader will recover and log any panic that occurs from the -// block +// safelyHandleHeader will recover and log any panic that occurs from the block func safelyHandlePanic() { if r := recover(); r != nil { log.WithFields(logrus.Fields{ @@ -522,7 +517,7 @@ func (s *Service) handleETH1FollowDistance() { log.Warn("Execution client is not syncing") } if !s.chainStartData.Chainstarted { - if err := s.checkBlockNumberForChainStart(ctx, big.NewInt(int64(s.latestEth1Data.LastRequestedBlock))); err != nil { + if err := s.processChainStartFromBlockNum(ctx, big.NewInt(int64(s.latestEth1Data.LastRequestedBlock))); err != nil { s.runError = err log.Error(err) return @@ -530,9 +525,8 @@ func (s *Service) handleETH1FollowDistance() { } // If the last requested block has not changed, // we do not request batched logs as this means there are no new - // logs for the powchain service to process. Also is a potential - // failure condition as would mean we have not respected the protocol - // threshold. + // logs for the powchain service to process. Also it is a potential + // failure condition as would mean we have not respected the protocol threshold. if s.latestEth1Data.LastRequestedBlock == s.latestEth1Data.BlockHeight { log.Error("Beacon node is not respecting the follow distance") return @@ -595,7 +589,7 @@ func (s *Service) initPOWService() { if s.chainStartData.Chainstarted && s.chainStartData.GenesisBlock == 0 { genHash := common.BytesToHash(s.chainStartData.Eth1Data.BlockHash) genBlock := s.chainStartData.GenesisBlock - // In the event our provided chainstart data references a non-existent blockhash + // In the event our provided chainstart data references a non-existent block hash, // we assume the genesis block to be 0. if genHash != [32]byte{} { genHeader, err := s.eth1DataFetcher.HeaderByHash(ctx, genHash) @@ -618,7 +612,7 @@ func (s *Service) initPOWService() { } } -// run subscribes to all the services for the ETH1.0 chain. +// run subscribes to all the services for the eth1 chain. func (s *Service) run(done <-chan struct{}) { s.runError = nil @@ -636,7 +630,7 @@ func (s *Service) run(done <-chan struct{}) { s.updateConnectedETH1(false) log.Debug("Context closed, exiting goroutine") return - case <-s.headTicker.C: + case <-s.eth1HeadTicker.C: head, err := s.eth1DataFetcher.HeaderByNumber(s.ctx, nil) if err != nil { s.pollConnectionStatus(s.ctx) @@ -692,10 +686,10 @@ func (s *Service) logTillChainStart(ctx context.Context) { } // cacheHeadersForEth1DataVote makes sure that voting for eth1data after startup utilizes cached headers -// instead of making multiple RPC requests to the ETH1 endpoint. +// instead of making multiple RPC requests to the eth1 endpoint. func (s *Service) cacheHeadersForEth1DataVote(ctx context.Context) error { // Find the end block to request from. - end, err := s.followBlockHeight(ctx) + end, err := s.followedBlockHeight(ctx) if err != nil { return err } @@ -739,12 +733,12 @@ func (s *Service) cacheBlockHeaders(start, end uint64) error { return nil } -// determines the earliest voting block from which to start caching all our previous headers from. +// Determines the earliest voting block from which to start caching all our previous headers from. func (s *Service) determineEarliestVotingBlock(ctx context.Context, followBlock uint64) (uint64, error) { genesisTime := s.chainStartData.GenesisTime currSlot := slots.CurrentSlot(genesisTime) - // In the event genesis has not occurred yet, we just request go back follow_distance blocks. + // In the event genesis has not occurred yet, we just request to go back follow_distance blocks. if genesisTime == 0 || currSlot == 0 { earliestBlk := uint64(0) if followBlock > params.BeaconConfig().Eth1FollowDistance { @@ -791,7 +785,7 @@ func (s *Service) initializeEth1Data(ctx context.Context, eth1DataInDB *ethpb.ET return nil } -// validates that all deposit containers are valid and have their relevant indices +// Validates that all deposit containers are valid and have their relevant indices // in order. func validateDepositContainers(ctrs []*ethpb.DepositContainer) bool { ctrLen := len(ctrs) @@ -814,7 +808,7 @@ func validateDepositContainers(ctrs []*ethpb.DepositContainer) bool { return true } -// validates the current powchain data saved and makes sure that any +// Validates the current powchain data is saved and makes sure that any // embedded genesis state is correctly accounted for. func (s *Service) ensureValidPowchainData(ctx context.Context) error { genState, err := s.cfg.beaconDB.GenesisState(ctx) diff --git a/beacon-chain/powchain/service_test.go b/beacon-chain/powchain/service_test.go index 9734cdf1c..47cac478d 100644 --- a/beacon-chain/powchain/service_test.go +++ b/beacon-chain/powchain/service_test.go @@ -267,7 +267,7 @@ func TestFollowBlock_OK(t *testing.T) { web3Service.latestEth1Data.BlockHeight = testAcc.Backend.Blockchain().CurrentBlock().NumberU64() web3Service.latestEth1Data.BlockTime = testAcc.Backend.Blockchain().CurrentBlock().Time() - h, err := web3Service.followBlockHeight(context.Background()) + h, err := web3Service.followedBlockHeight(context.Background()) require.NoError(t, err) assert.Equal(t, baseHeight, h, "Unexpected block height") numToForward := uint64(2) @@ -280,7 +280,7 @@ func TestFollowBlock_OK(t *testing.T) { web3Service.latestEth1Data.BlockHeight = testAcc.Backend.Blockchain().CurrentBlock().NumberU64() web3Service.latestEth1Data.BlockTime = testAcc.Backend.Blockchain().CurrentBlock().Time() - h, err = web3Service.followBlockHeight(context.Background()) + h, err = web3Service.followedBlockHeight(context.Background()) require.NoError(t, err) assert.Equal(t, expectedHeight, h, "Unexpected block height") } diff --git a/beacon-chain/powchain/testing/mock_faulty_powchain.go b/beacon-chain/powchain/testing/mock_faulty_powchain.go index ea111fa65..aaad9c3f2 100644 --- a/beacon-chain/powchain/testing/mock_faulty_powchain.go +++ b/beacon-chain/powchain/testing/mock_faulty_powchain.go @@ -71,8 +71,3 @@ func (_ *FaultyMockPOWChain) ClearPreGenesisData() { func (_ *FaultyMockPOWChain) IsConnectedToETH1() bool { return true } - -// BlockExistsWithCache -- -func (f *FaultyMockPOWChain) BlockExistsWithCache(ctx context.Context, hash common.Hash) (bool, *big.Int, error) { - return f.BlockExists(ctx, hash) -} diff --git a/beacon-chain/powchain/testing/mock_powchain.go b/beacon-chain/powchain/testing/mock_powchain.go index 25643525f..79006ba39 100644 --- a/beacon-chain/powchain/testing/mock_powchain.go +++ b/beacon-chain/powchain/testing/mock_powchain.go @@ -179,11 +179,6 @@ func (m *POWChain) InsertBlock(height int, time uint64, hash []byte) *POWChain { return m } -// BlockExistsWithCache -- -func (m *POWChain) BlockExistsWithCache(ctx context.Context, hash common.Hash) (bool, *big.Int, error) { - return m.BlockExists(ctx, hash) -} - func SetupRPCServer() (*rpc.Server, string, error) { srv := rpc.NewServer() if err := srv.RegisterName("eth", &testETHRPC{}); err != nil {