mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-22 19:40:37 +00:00
Improvements to powchain
package (#10652)
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
parent
21bdbd548a
commit
7d9d8454b1
@ -22,7 +22,7 @@ const repeatedSearches = 2 * searchThreshold
|
||||
|
||||
// BlockExists returns true if the block exists, its height and any possible error encountered.
|
||||
func (s *Service) BlockExists(ctx context.Context, hash common.Hash) (bool, *big.Int, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "beacon-chain.web3service.BlockExists")
|
||||
ctx, span := trace.StartSpan(ctx, "powchain.BlockExists")
|
||||
defer span.End()
|
||||
|
||||
if exists, hdrInfo, err := s.headerCache.HeaderInfoByHash(hash); exists || err != nil {
|
||||
@ -45,24 +45,9 @@ func (s *Service) BlockExists(ctx context.Context, hash common.Hash) (bool, *big
|
||||
return true, new(big.Int).Set(header.Number), nil
|
||||
}
|
||||
|
||||
// BlockExistsWithCache returns true if the block exists in cache, its height and any possible error encountered.
|
||||
func (s *Service) BlockExistsWithCache(ctx context.Context, hash common.Hash) (bool, *big.Int, error) {
|
||||
_, span := trace.StartSpan(ctx, "beacon-chain.web3service.BlockExistsWithCache")
|
||||
defer span.End()
|
||||
if exists, hdrInfo, err := s.headerCache.HeaderInfoByHash(hash); exists || err != nil {
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
span.AddAttributes(trace.BoolAttribute("blockCacheHit", true))
|
||||
return true, hdrInfo.Number, nil
|
||||
}
|
||||
span.AddAttributes(trace.BoolAttribute("blockCacheHit", false))
|
||||
return false, nil, nil
|
||||
}
|
||||
|
||||
// BlockHashByHeight returns the block hash of the block at the given height.
|
||||
func (s *Service) BlockHashByHeight(ctx context.Context, height *big.Int) (common.Hash, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "beacon-chain.web3service.BlockHashByHeight")
|
||||
ctx, span := trace.StartSpan(ctx, "powchain.BlockHashByHeight")
|
||||
defer span.End()
|
||||
|
||||
if exists, hInfo, err := s.headerCache.HeaderInfoByHeight(height); exists || err != nil {
|
||||
@ -90,9 +75,9 @@ func (s *Service) BlockHashByHeight(ctx context.Context, height *big.Int) (commo
|
||||
return header.Hash(), nil
|
||||
}
|
||||
|
||||
// BlockTimeByHeight fetches an eth1.0 block timestamp by its height.
|
||||
// BlockTimeByHeight fetches an eth1 block timestamp by its height.
|
||||
func (s *Service) BlockTimeByHeight(ctx context.Context, height *big.Int) (uint64, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "beacon-chain.web3service.BlockTimeByHeight")
|
||||
ctx, span := trace.StartSpan(ctx, "powchain.BlockTimeByHeight")
|
||||
defer span.End()
|
||||
if s.eth1DataFetcher == nil {
|
||||
err := errors.New("nil eth1DataFetcher")
|
||||
@ -111,7 +96,7 @@ func (s *Service) BlockTimeByHeight(ctx context.Context, height *big.Int) (uint6
|
||||
// This is an optimized version with the worst case being O(2*repeatedSearches) number of calls
|
||||
// while in best case search for the block is performed in O(1).
|
||||
func (s *Service) BlockByTimestamp(ctx context.Context, time uint64) (*types.HeaderInfo, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "beacon-chain.web3service.BlockByTimestamp")
|
||||
ctx, span := trace.StartSpan(ctx, "powchain.BlockByTimestamp")
|
||||
defer span.End()
|
||||
|
||||
s.latestEth1DataLock.RLock()
|
||||
@ -122,15 +107,14 @@ func (s *Service) BlockByTimestamp(ctx context.Context, time uint64) (*types.Hea
|
||||
if time > latestBlkTime {
|
||||
return nil, errors.Errorf("provided time is later than the current eth1 head. %d > %d", time, latestBlkTime)
|
||||
}
|
||||
// Initialize a pointer to eth1 chain's history to start our search
|
||||
// from.
|
||||
// Initialize a pointer to eth1 chain's history to start our search from.
|
||||
cursorNum := big.NewInt(0).SetUint64(latestBlkHeight)
|
||||
cursorTime := latestBlkTime
|
||||
|
||||
numOfBlocks := uint64(0)
|
||||
estimatedBlk := cursorNum.Uint64()
|
||||
maxTimeBuffer := searchThreshold * params.BeaconConfig().SecondsPerETH1Block
|
||||
// Terminate if we cant find an acceptable block after
|
||||
// Terminate if we can't find an acceptable block after
|
||||
// repeated searches.
|
||||
for i := 0; i < repeatedSearches; i++ {
|
||||
if ctx.Err() != nil {
|
||||
@ -157,12 +141,12 @@ func (s *Service) BlockByTimestamp(ctx context.Context, time uint64) (*types.Hea
|
||||
// time - buffer <= head.time <= time + buffer
|
||||
break
|
||||
}
|
||||
hinfo, err := s.retrieveHeaderInfo(ctx, estimatedBlk)
|
||||
hInfo, err := s.retrieveHeaderInfo(ctx, estimatedBlk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cursorNum = hinfo.Number
|
||||
cursorTime = hinfo.Time
|
||||
cursorNum = hInfo.Number
|
||||
cursorTime = hInfo.Time
|
||||
}
|
||||
|
||||
// Exit early if we get the desired block.
|
||||
@ -170,15 +154,15 @@ func (s *Service) BlockByTimestamp(ctx context.Context, time uint64) (*types.Hea
|
||||
return s.retrieveHeaderInfo(ctx, cursorNum.Uint64())
|
||||
}
|
||||
if cursorTime > time {
|
||||
return s.findLessTargetEth1Block(ctx, big.NewInt(0).SetUint64(estimatedBlk), time)
|
||||
return s.findMaxTargetEth1Block(ctx, big.NewInt(0).SetUint64(estimatedBlk), time)
|
||||
}
|
||||
return s.findMoreTargetEth1Block(ctx, big.NewInt(0).SetUint64(estimatedBlk), time)
|
||||
return s.findMinTargetEth1Block(ctx, big.NewInt(0).SetUint64(estimatedBlk), time)
|
||||
}
|
||||
|
||||
// Performs a search to find a target eth1 block which is earlier than or equal to the
|
||||
// target time. This method is used when head.time > targetTime
|
||||
func (s *Service) findLessTargetEth1Block(ctx context.Context, startBlk *big.Int, targetTime uint64) (*types.HeaderInfo, error) {
|
||||
for bn := startBlk; ; bn = big.NewInt(0).Sub(bn, big.NewInt(1)) {
|
||||
func (s *Service) findMaxTargetEth1Block(ctx context.Context, upperBoundBlk *big.Int, targetTime uint64) (*types.HeaderInfo, error) {
|
||||
for bn := upperBoundBlk; ; bn = big.NewInt(0).Sub(bn, big.NewInt(1)) {
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
@ -194,8 +178,8 @@ func (s *Service) findLessTargetEth1Block(ctx context.Context, startBlk *big.Int
|
||||
|
||||
// Performs a search to find a target eth1 block which is just earlier than or equal to the
|
||||
// target time. This method is used when head.time < targetTime
|
||||
func (s *Service) findMoreTargetEth1Block(ctx context.Context, startBlk *big.Int, targetTime uint64) (*types.HeaderInfo, error) {
|
||||
for bn := startBlk; ; bn = big.NewInt(0).Add(bn, big.NewInt(1)) {
|
||||
func (s *Service) findMinTargetEth1Block(ctx context.Context, lowerBoundBlk *big.Int, targetTime uint64) (*types.HeaderInfo, error) {
|
||||
for bn := lowerBoundBlk; ; bn = big.NewInt(0).Add(bn, big.NewInt(1)) {
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
@ -203,8 +187,7 @@ func (s *Service) findMoreTargetEth1Block(ctx context.Context, startBlk *big.Int
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Return the last block before we hit the threshold
|
||||
// time.
|
||||
// Return the last block before we hit the threshold time.
|
||||
if info.Time > targetTime {
|
||||
return s.retrieveHeaderInfo(ctx, info.Number.Uint64()-1)
|
||||
}
|
||||
|
@ -61,7 +61,7 @@ func TestLatestMainchainInfo_OK(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
tickerChan := make(chan time.Time)
|
||||
web3Service.headTicker = &time.Ticker{C: tickerChan}
|
||||
web3Service.eth1HeadTicker = &time.Ticker{C: tickerChan}
|
||||
tickerChan <- time.Now()
|
||||
web3Service.cancel()
|
||||
exitRoutine <- true
|
||||
@ -207,51 +207,6 @@ func TestBlockExists_UsesCachedBlockInfo(t *testing.T) {
|
||||
require.Equal(t, 0, height.Cmp(header.Number))
|
||||
}
|
||||
|
||||
func TestBlockExistsWithCache_UsesCachedHeaderInfo(t *testing.T) {
|
||||
beaconDB := dbutil.SetupDB(t)
|
||||
server, endpoint, err := mockPOW.SetupRPCServer()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
server.Stop()
|
||||
})
|
||||
web3Service, err := NewService(context.Background(),
|
||||
WithHttpEndpoints([]string{endpoint}),
|
||||
WithDatabase(beaconDB),
|
||||
)
|
||||
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
|
||||
|
||||
header := &gethTypes.Header{
|
||||
Number: big.NewInt(0),
|
||||
}
|
||||
|
||||
err = web3Service.headerCache.AddHeader(header)
|
||||
require.NoError(t, err)
|
||||
|
||||
exists, height, err := web3Service.BlockExistsWithCache(context.Background(), header.Hash())
|
||||
require.NoError(t, err, "Could not get block hash with given height")
|
||||
require.Equal(t, true, exists)
|
||||
require.Equal(t, 0, height.Cmp(header.Number))
|
||||
}
|
||||
|
||||
func TestBlockExistsWithCache_HeaderNotCached(t *testing.T) {
|
||||
beaconDB := dbutil.SetupDB(t)
|
||||
server, endpoint, err := mockPOW.SetupRPCServer()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
server.Stop()
|
||||
})
|
||||
web3Service, err := NewService(context.Background(),
|
||||
WithHttpEndpoints([]string{endpoint}),
|
||||
WithDatabase(beaconDB),
|
||||
)
|
||||
require.NoError(t, err, "unable to setup web3 ETH1.0 chain service")
|
||||
|
||||
exists, height, err := web3Service.BlockExistsWithCache(context.Background(), common.BytesToHash([]byte("hash")))
|
||||
require.NoError(t, err, "Could not get block hash with given height")
|
||||
require.Equal(t, false, exists)
|
||||
require.Equal(t, (*big.Int)(nil), height)
|
||||
}
|
||||
|
||||
func TestService_BlockNumberByTimestamp(t *testing.T) {
|
||||
beaconDB := dbutil.SetupDB(t)
|
||||
testAcc, err := mock.Setup()
|
||||
@ -313,11 +268,11 @@ func TestService_BlockNumberByTimestampLessTargetTime(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
// Provide an unattainable target time
|
||||
_, err = web3Service.findLessTargetEth1Block(ctx, hd.Number, hd.Time/2)
|
||||
_, err = web3Service.findMaxTargetEth1Block(ctx, hd.Number, hd.Time/2)
|
||||
require.ErrorContains(t, context.DeadlineExceeded.Error(), err)
|
||||
|
||||
// Provide an attainable target time
|
||||
blk, err := web3Service.findLessTargetEth1Block(context.Background(), hd.Number, hd.Time-5)
|
||||
blk, err := web3Service.findMaxTargetEth1Block(context.Background(), hd.Number, hd.Time-5)
|
||||
require.NoError(t, err)
|
||||
require.NotEqual(t, hd.Number.Uint64(), blk.Number.Uint64(), "retrieved block is not less than the head")
|
||||
}
|
||||
@ -351,11 +306,11 @@ func TestService_BlockNumberByTimestampMoreTargetTime(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
// Provide an unattainable target time with respect to head
|
||||
_, err = web3Service.findMoreTargetEth1Block(ctx, big.NewInt(0).Div(hd.Number, big.NewInt(2)), hd.Time)
|
||||
_, err = web3Service.findMinTargetEth1Block(ctx, big.NewInt(0).Div(hd.Number, big.NewInt(2)), hd.Time)
|
||||
require.ErrorContains(t, context.DeadlineExceeded.Error(), err)
|
||||
|
||||
// Provide an attainable target time with respect to head
|
||||
blk, err := web3Service.findMoreTargetEth1Block(context.Background(), big.NewInt(0).Sub(hd.Number, big.NewInt(5)), hd.Time)
|
||||
blk, err := web3Service.findMinTargetEth1Block(context.Background(), big.NewInt(0).Sub(hd.Number, big.NewInt(5)), hd.Time)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, hd.Number.Uint64(), blk.Number.Uint64(), "retrieved block is not equal to the head")
|
||||
}
|
||||
|
@ -19,8 +19,6 @@ var (
|
||||
ErrUnknownPayload = errors.New("payload does not exist or is not available")
|
||||
// ErrUnknownPayloadStatus when the payload status is unknown.
|
||||
ErrUnknownPayloadStatus = errors.New("unknown payload status")
|
||||
// ErrUnsupportedScheme for unsupported URL schemes.
|
||||
ErrUnsupportedScheme = errors.New("unsupported url scheme, only http(s) and ipc are supported")
|
||||
// ErrConfigMismatch when the execution node's terminal total difficulty or
|
||||
// terminal block hash received via the API mismatches Prysm's configuration value.
|
||||
ErrConfigMismatch = errors.New("execution client configuration mismatch")
|
||||
|
@ -36,7 +36,7 @@ var (
|
||||
const eth1DataSavingInterval = 1000
|
||||
const maxTolerableDifference = 50
|
||||
const defaultEth1HeaderReqLimit = uint64(1000)
|
||||
const depositlogRequestLimit = 10000
|
||||
const depositLogRequestLimit = 10000
|
||||
const additiveFactorMultiplier = 0.10
|
||||
const multiplicativeDecreaseDivisor = 2
|
||||
|
||||
@ -57,7 +57,7 @@ func (s *Service) Eth2GenesisPowchainInfo() (uint64, *big.Int) {
|
||||
return s.chainStartData.GenesisTime, big.NewInt(int64(s.chainStartData.GenesisBlock))
|
||||
}
|
||||
|
||||
// ProcessETH1Block processes the logs from the provided eth1Block.
|
||||
// ProcessETH1Block processes logs from the provided eth1 block.
|
||||
func (s *Service) ProcessETH1Block(ctx context.Context, blkNum *big.Int) error {
|
||||
query := ethereum.FilterQuery{
|
||||
Addresses: []common.Address{
|
||||
@ -80,7 +80,7 @@ func (s *Service) ProcessETH1Block(ctx context.Context, blkNum *big.Int) error {
|
||||
}
|
||||
}
|
||||
if !s.chainStartData.Chainstarted {
|
||||
if err := s.checkBlockNumberForChainStart(ctx, blkNum); err != nil {
|
||||
if err := s.processChainStartFromBlockNum(ctx, blkNum); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -88,7 +88,7 @@ func (s *Service) ProcessETH1Block(ctx context.Context, blkNum *big.Int) error {
|
||||
}
|
||||
|
||||
// ProcessLog is the main method which handles the processing of all
|
||||
// logs from the deposit contract on the ETH1.0 chain.
|
||||
// logs from the deposit contract on the eth1 chain.
|
||||
func (s *Service) ProcessLog(ctx context.Context, depositLog gethTypes.Log) error {
|
||||
s.processingLock.RLock()
|
||||
defer s.processingLock.RUnlock()
|
||||
@ -107,7 +107,7 @@ func (s *Service) ProcessLog(ctx context.Context, depositLog gethTypes.Log) erro
|
||||
}
|
||||
|
||||
// ProcessDepositLog processes the log which had been received from
|
||||
// the ETH1.0 chain by trying to ascertain which participant deposited
|
||||
// the eth1 chain by trying to ascertain which participant deposited
|
||||
// in the contract.
|
||||
func (s *Service) ProcessDepositLog(ctx context.Context, depositLog gethTypes.Log) error {
|
||||
pubkey, withdrawalCredentials, amount, signature, merkleTreeIndex, err := contracts.UnpackDepositLogData(depositLog.Data)
|
||||
@ -215,7 +215,7 @@ func (s *Service) ProcessDepositLog(ctx context.Context, depositLog gethTypes.Lo
|
||||
}
|
||||
|
||||
// ProcessChainStart processes the log which had been received from
|
||||
// the ETH1.0 chain by trying to determine when to start the beacon chain.
|
||||
// the eth1 chain by trying to determine when to start the beacon chain.
|
||||
func (s *Service) ProcessChainStart(genesisTime uint64, eth1BlockHash [32]byte, blockNumber *big.Int) {
|
||||
s.chainStartData.Chainstarted = true
|
||||
s.chainStartData.GenesisBlock = blockNumber.Uint64()
|
||||
@ -247,15 +247,15 @@ func (s *Service) ProcessChainStart(genesisTime uint64, eth1BlockHash [32]byte,
|
||||
},
|
||||
})
|
||||
if err := s.savePowchainData(s.ctx); err != nil {
|
||||
// continue on, if the save fails as this will get re-saved
|
||||
// continue on if the save fails as this will get re-saved
|
||||
// in the next interval.
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// createGenesisTime adds in the genesis delay to the eth1 block time
|
||||
// on which it was triggered.
|
||||
func createGenesisTime(timeStamp uint64) uint64 {
|
||||
// adds in the genesis delay to the eth1 block time
|
||||
// on which it was triggered.
|
||||
return timeStamp + params.BeaconConfig().GenesisDelay
|
||||
}
|
||||
|
||||
@ -292,7 +292,7 @@ func (s *Service) processPastLogs(ctx context.Context) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
latestFollowHeight, err := s.followBlockHeight(ctx)
|
||||
latestFollowHeight, err := s.followedBlockHeight(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -318,7 +318,7 @@ func (s *Service) processPastLogs(ctx context.Context) error {
|
||||
remainingLogs := logCount - uint64(s.lastReceivedMerkleIndex+1)
|
||||
// only change the end block if the remaining logs are below the required log limit.
|
||||
// reset our query and end block in this case.
|
||||
withinLimit := remainingLogs < depositlogRequestLimit
|
||||
withinLimit := remainingLogs < depositLogRequestLimit
|
||||
aboveFollowHeight := end >= latestFollowHeight
|
||||
if withinLimit && aboveFollowHeight {
|
||||
query.ToBlock = big.NewInt(0).SetUint64(latestFollowHeight)
|
||||
@ -413,9 +413,9 @@ func (s *Service) processPastLogs(ctx context.Context) error {
|
||||
// logs from the period last polled to now.
|
||||
func (s *Service) requestBatchedHeadersAndLogs(ctx context.Context) error {
|
||||
// We request for the nth block behind the current head, in order to have
|
||||
// stabilized logs when we retrieve it from the 1.0 chain.
|
||||
// stabilized logs when we retrieve it from the eth1 chain.
|
||||
|
||||
requestedBlock, err := s.followBlockHeight(ctx)
|
||||
requestedBlock, err := s.followedBlockHeight(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -457,18 +457,17 @@ func (s *Service) retrieveBlockHashAndTime(ctx context.Context, blkNum *big.Int)
|
||||
return bHash, timeStamp, nil
|
||||
}
|
||||
|
||||
// checkBlockNumberForChainStart checks the given block number for if chainstart has occurred.
|
||||
func (s *Service) checkBlockNumberForChainStart(ctx context.Context, blkNum *big.Int) error {
|
||||
func (s *Service) processChainStartFromBlockNum(ctx context.Context, blkNum *big.Int) error {
|
||||
bHash, timeStamp, err := s.retrieveBlockHashAndTime(ctx, blkNum)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.checkForChainstart(ctx, bHash, blkNum, timeStamp)
|
||||
s.processChainStartIfReady(ctx, bHash, blkNum, timeStamp)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) checkHeaderForChainstart(ctx context.Context, header *gethTypes.Header) {
|
||||
s.checkForChainstart(ctx, header.Hash(), header.Number, header.Time)
|
||||
func (s *Service) processChainStartFromHeader(ctx context.Context, header *gethTypes.Header) {
|
||||
s.processChainStartIfReady(ctx, header.Hash(), header.Number, header.Time)
|
||||
}
|
||||
|
||||
func (s *Service) checkHeaderRange(ctx context.Context, start, end uint64, headersMap map[uint64]*gethTypes.Header,
|
||||
@ -484,7 +483,7 @@ func (s *Service) checkHeaderRange(ctx context.Context, start, end uint64, heade
|
||||
i--
|
||||
continue
|
||||
}
|
||||
s.checkHeaderForChainstart(ctx, h)
|
||||
s.processChainStartFromHeader(ctx, h)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@ -504,7 +503,7 @@ func (s *Service) currentCountAndTime(ctx context.Context, blockTime uint64) (ui
|
||||
return valCount, createGenesisTime(blockTime)
|
||||
}
|
||||
|
||||
func (s *Service) checkForChainstart(ctx context.Context, blockHash [32]byte, blockNumber *big.Int, blockTime uint64) {
|
||||
func (s *Service) processChainStartIfReady(ctx context.Context, blockHash [32]byte, blockNumber *big.Int, blockTime uint64) {
|
||||
valCount, genesisTime := s.currentCountAndTime(ctx, blockTime)
|
||||
if valCount == 0 {
|
||||
return
|
||||
@ -516,7 +515,7 @@ func (s *Service) checkForChainstart(ctx context.Context, blockHash [32]byte, bl
|
||||
}
|
||||
}
|
||||
|
||||
// save all powchain related metadata to disk.
|
||||
// savePowchainData saves all powchain related metadata to disk.
|
||||
func (s *Service) savePowchainData(ctx context.Context) error {
|
||||
var pbState *ethpb.BeaconState
|
||||
var err error
|
||||
|
@ -589,7 +589,7 @@ func TestCheckForChainstart_NoValidator(t *testing.T) {
|
||||
require.NoError(t, err, "Unable to set up simulated backend")
|
||||
beaconDB := testDB.SetupDB(t)
|
||||
s := newPowchainService(t, testAcc, beaconDB)
|
||||
s.checkForChainstart(context.Background(), [32]byte{}, nil, 0)
|
||||
s.processChainStartIfReady(context.Background(), [32]byte{}, nil, 0)
|
||||
require.LogsDoNotContain(t, hook, "Could not determine active validator count from pre genesis state")
|
||||
}
|
||||
|
||||
|
@ -13,7 +13,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum"
|
||||
"github.com/ethereum/go-ethereum/accounts/abi/bind"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
@ -98,7 +97,6 @@ type POWBlockFetcher interface {
|
||||
BlockByTimestamp(ctx context.Context, time uint64) (*types.HeaderInfo, error)
|
||||
BlockHashByHeight(ctx context.Context, height *big.Int) (common.Hash, error)
|
||||
BlockExists(ctx context.Context, hash common.Hash) (bool, *big.Int, error)
|
||||
BlockExistsWithCache(ctx context.Context, hash common.Hash) (bool, *big.Int, error)
|
||||
}
|
||||
|
||||
// Chain defines a standard interface for the powchain service in Prysm.
|
||||
@ -114,7 +112,6 @@ type RPCDataFetcher interface {
|
||||
Close()
|
||||
HeaderByNumber(ctx context.Context, number *big.Int) (*gethTypes.Header, error)
|
||||
HeaderByHash(ctx context.Context, hash common.Hash) (*gethTypes.Header, error)
|
||||
SyncProgress(ctx context.Context) (*ethereum.SyncProgress, error)
|
||||
}
|
||||
|
||||
// RPCClient defines the rpc methods required to interact with the eth1 node.
|
||||
@ -139,10 +136,10 @@ type config struct {
|
||||
}
|
||||
|
||||
// Service fetches important information about the canonical
|
||||
// Ethereum ETH1.0 chain via a web3 endpoint using an ethclient. The Random
|
||||
// Beacon Chain requires synchronization with the ETH1.0 chain's current
|
||||
// blockhash, block number, and access to logs within the
|
||||
// Validator Registration Contract on the ETH1.0 chain to kick off the beacon
|
||||
// eth1 chain via a web3 endpoint using an ethclient.
|
||||
// The beacon chain requires synchronization with the eth1 chain's current
|
||||
// block hash, block number, and access to logs within the
|
||||
// Validator Registration Contract on the eth1 chain to kick off the beacon
|
||||
// chain's validator registration process.
|
||||
type Service struct {
|
||||
connectedETH1 bool
|
||||
@ -152,7 +149,7 @@ type Service struct {
|
||||
cfg *config
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
headTicker *time.Ticker
|
||||
eth1HeadTicker *time.Ticker
|
||||
httpLogger bind.ContractFilterer
|
||||
eth1DataFetcher RPCDataFetcher
|
||||
rpcClient RPCClient
|
||||
@ -173,11 +170,11 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) {
|
||||
depositTrie, err := trie.NewTrie(params.BeaconConfig().DepositContractTreeDepth)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, errors.Wrap(err, "could not setup deposit trie")
|
||||
return nil, errors.Wrap(err, "could not set up deposit trie")
|
||||
}
|
||||
genState, err := transition.EmptyGenesisState()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not setup genesis state")
|
||||
return nil, errors.Wrap(err, "could not set up genesis state")
|
||||
}
|
||||
|
||||
s := &Service{
|
||||
@ -201,7 +198,7 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) {
|
||||
},
|
||||
lastReceivedMerkleIndex: -1,
|
||||
preGenesisState: genState,
|
||||
headTicker: time.NewTicker(time.Duration(params.BeaconConfig().SecondsPerETH1Block) * time.Second),
|
||||
eth1HeadTicker: time.NewTicker(time.Duration(params.BeaconConfig().SecondsPerETH1Block) * time.Second),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
@ -225,7 +222,7 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) {
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Start a web3 service's main event loop.
|
||||
// Start the powchain service's main event loop.
|
||||
func (s *Service) Start() {
|
||||
if err := s.setupExecutionClientConnections(s.ctx, s.cfg.currHttpEndpoint); err != nil {
|
||||
log.WithError(err).Error("Could not connect to execution endpoint")
|
||||
@ -372,7 +369,7 @@ func (s *Service) ETH1ConnectionErrors() []error {
|
||||
|
||||
// refers to the latest eth1 block which follows the condition: eth1_timestamp +
|
||||
// SECONDS_PER_ETH1_BLOCK * ETH1_FOLLOW_DISTANCE <= current_unix_time
|
||||
func (s *Service) followBlockHeight(_ context.Context) (uint64, error) {
|
||||
func (s *Service) followedBlockHeight(_ context.Context) (uint64, error) {
|
||||
latestValidBlock := uint64(0)
|
||||
if s.latestEth1Data.BlockHeight > params.BeaconConfig().Eth1FollowDistance {
|
||||
latestValidBlock = s.latestEth1Data.BlockHeight - params.BeaconConfig().Eth1FollowDistance
|
||||
@ -386,8 +383,7 @@ func (s *Service) initDepositCaches(ctx context.Context, ctrs []*ethpb.DepositCo
|
||||
}
|
||||
s.cfg.depositCache.InsertDepositContainers(ctx, ctrs)
|
||||
if !s.chainStartData.Chainstarted {
|
||||
// do not add to pending cache
|
||||
// if no genesis state exists.
|
||||
// Do not add to pending cache if no genesis state exists.
|
||||
validDepositsCount.Add(float64(s.preGenesisState.Eth1DepositIndex()))
|
||||
return nil
|
||||
}
|
||||
@ -395,7 +391,7 @@ func (s *Service) initDepositCaches(ctx context.Context, ctrs []*ethpb.DepositCo
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Default to all deposits post-genesis deposits in
|
||||
// Default to all post-genesis deposits in
|
||||
// the event we cannot find a finalized state.
|
||||
currIndex := genesisState.Eth1DepositIndex()
|
||||
chkPt, err := s.cfg.beaconDB.FinalizedCheckpoint(ctx)
|
||||
@ -411,17 +407,17 @@ func (s *Service) initDepositCaches(ctx context.Context, ctrs []*ethpb.DepositCo
|
||||
// Set deposit index to the one in the current archived state.
|
||||
currIndex = fState.Eth1DepositIndex()
|
||||
|
||||
// when a node pauses for some time and starts again, the deposits to finalize
|
||||
// accumulates. we finalize them here before we are ready to receive a block.
|
||||
// When a node pauses for some time and starts again, the deposits to finalize
|
||||
// accumulates. We finalize them here before we are ready to receive a block.
|
||||
// Otherwise, the first few blocks will be slower to compute as we will
|
||||
// hold the lock and be busy finalizing the deposits.
|
||||
// The deposit index in the state is always the index of the next deposit
|
||||
// to be included(rather than the last one to be processed). This was most likely
|
||||
// to be included (rather than the last one to be processed). This was most likely
|
||||
// done as the state cannot represent signed integers.
|
||||
actualIndex := int64(currIndex) - 1 // lint:ignore uintcast -- deposit index will not exceed int64 in your lifetime.
|
||||
s.cfg.depositCache.InsertFinalizedDeposits(ctx, actualIndex)
|
||||
// Deposit proofs are only used during state transition and can be safely removed to save space.
|
||||
|
||||
// Deposit proofs are only used during state transition and can be safely removed to save space.
|
||||
if err = s.cfg.depositCache.PruneProofs(ctx, actualIndex); err != nil {
|
||||
return errors.Wrap(err, "could not prune deposit proofs")
|
||||
}
|
||||
@ -498,8 +494,7 @@ func (s *Service) batchRequestHeaders(startBlock, endBlock uint64) ([]*gethTypes
|
||||
return headers, nil
|
||||
}
|
||||
|
||||
// safelyHandleHeader will recover and log any panic that occurs from the
|
||||
// block
|
||||
// safelyHandleHeader will recover and log any panic that occurs from the block
|
||||
func safelyHandlePanic() {
|
||||
if r := recover(); r != nil {
|
||||
log.WithFields(logrus.Fields{
|
||||
@ -522,7 +517,7 @@ func (s *Service) handleETH1FollowDistance() {
|
||||
log.Warn("Execution client is not syncing")
|
||||
}
|
||||
if !s.chainStartData.Chainstarted {
|
||||
if err := s.checkBlockNumberForChainStart(ctx, big.NewInt(int64(s.latestEth1Data.LastRequestedBlock))); err != nil {
|
||||
if err := s.processChainStartFromBlockNum(ctx, big.NewInt(int64(s.latestEth1Data.LastRequestedBlock))); err != nil {
|
||||
s.runError = err
|
||||
log.Error(err)
|
||||
return
|
||||
@ -530,9 +525,8 @@ func (s *Service) handleETH1FollowDistance() {
|
||||
}
|
||||
// If the last requested block has not changed,
|
||||
// we do not request batched logs as this means there are no new
|
||||
// logs for the powchain service to process. Also is a potential
|
||||
// failure condition as would mean we have not respected the protocol
|
||||
// threshold.
|
||||
// logs for the powchain service to process. Also it is a potential
|
||||
// failure condition as would mean we have not respected the protocol threshold.
|
||||
if s.latestEth1Data.LastRequestedBlock == s.latestEth1Data.BlockHeight {
|
||||
log.Error("Beacon node is not respecting the follow distance")
|
||||
return
|
||||
@ -595,7 +589,7 @@ func (s *Service) initPOWService() {
|
||||
if s.chainStartData.Chainstarted && s.chainStartData.GenesisBlock == 0 {
|
||||
genHash := common.BytesToHash(s.chainStartData.Eth1Data.BlockHash)
|
||||
genBlock := s.chainStartData.GenesisBlock
|
||||
// In the event our provided chainstart data references a non-existent blockhash
|
||||
// In the event our provided chainstart data references a non-existent block hash,
|
||||
// we assume the genesis block to be 0.
|
||||
if genHash != [32]byte{} {
|
||||
genHeader, err := s.eth1DataFetcher.HeaderByHash(ctx, genHash)
|
||||
@ -618,7 +612,7 @@ func (s *Service) initPOWService() {
|
||||
}
|
||||
}
|
||||
|
||||
// run subscribes to all the services for the ETH1.0 chain.
|
||||
// run subscribes to all the services for the eth1 chain.
|
||||
func (s *Service) run(done <-chan struct{}) {
|
||||
s.runError = nil
|
||||
|
||||
@ -636,7 +630,7 @@ func (s *Service) run(done <-chan struct{}) {
|
||||
s.updateConnectedETH1(false)
|
||||
log.Debug("Context closed, exiting goroutine")
|
||||
return
|
||||
case <-s.headTicker.C:
|
||||
case <-s.eth1HeadTicker.C:
|
||||
head, err := s.eth1DataFetcher.HeaderByNumber(s.ctx, nil)
|
||||
if err != nil {
|
||||
s.pollConnectionStatus(s.ctx)
|
||||
@ -692,10 +686,10 @@ func (s *Service) logTillChainStart(ctx context.Context) {
|
||||
}
|
||||
|
||||
// cacheHeadersForEth1DataVote makes sure that voting for eth1data after startup utilizes cached headers
|
||||
// instead of making multiple RPC requests to the ETH1 endpoint.
|
||||
// instead of making multiple RPC requests to the eth1 endpoint.
|
||||
func (s *Service) cacheHeadersForEth1DataVote(ctx context.Context) error {
|
||||
// Find the end block to request from.
|
||||
end, err := s.followBlockHeight(ctx)
|
||||
end, err := s.followedBlockHeight(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -739,12 +733,12 @@ func (s *Service) cacheBlockHeaders(start, end uint64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// determines the earliest voting block from which to start caching all our previous headers from.
|
||||
// Determines the earliest voting block from which to start caching all our previous headers from.
|
||||
func (s *Service) determineEarliestVotingBlock(ctx context.Context, followBlock uint64) (uint64, error) {
|
||||
genesisTime := s.chainStartData.GenesisTime
|
||||
currSlot := slots.CurrentSlot(genesisTime)
|
||||
|
||||
// In the event genesis has not occurred yet, we just request go back follow_distance blocks.
|
||||
// In the event genesis has not occurred yet, we just request to go back follow_distance blocks.
|
||||
if genesisTime == 0 || currSlot == 0 {
|
||||
earliestBlk := uint64(0)
|
||||
if followBlock > params.BeaconConfig().Eth1FollowDistance {
|
||||
@ -791,7 +785,7 @@ func (s *Service) initializeEth1Data(ctx context.Context, eth1DataInDB *ethpb.ET
|
||||
return nil
|
||||
}
|
||||
|
||||
// validates that all deposit containers are valid and have their relevant indices
|
||||
// Validates that all deposit containers are valid and have their relevant indices
|
||||
// in order.
|
||||
func validateDepositContainers(ctrs []*ethpb.DepositContainer) bool {
|
||||
ctrLen := len(ctrs)
|
||||
@ -814,7 +808,7 @@ func validateDepositContainers(ctrs []*ethpb.DepositContainer) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// validates the current powchain data saved and makes sure that any
|
||||
// Validates the current powchain data is saved and makes sure that any
|
||||
// embedded genesis state is correctly accounted for.
|
||||
func (s *Service) ensureValidPowchainData(ctx context.Context) error {
|
||||
genState, err := s.cfg.beaconDB.GenesisState(ctx)
|
||||
|
@ -267,7 +267,7 @@ func TestFollowBlock_OK(t *testing.T) {
|
||||
web3Service.latestEth1Data.BlockHeight = testAcc.Backend.Blockchain().CurrentBlock().NumberU64()
|
||||
web3Service.latestEth1Data.BlockTime = testAcc.Backend.Blockchain().CurrentBlock().Time()
|
||||
|
||||
h, err := web3Service.followBlockHeight(context.Background())
|
||||
h, err := web3Service.followedBlockHeight(context.Background())
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, baseHeight, h, "Unexpected block height")
|
||||
numToForward := uint64(2)
|
||||
@ -280,7 +280,7 @@ func TestFollowBlock_OK(t *testing.T) {
|
||||
web3Service.latestEth1Data.BlockHeight = testAcc.Backend.Blockchain().CurrentBlock().NumberU64()
|
||||
web3Service.latestEth1Data.BlockTime = testAcc.Backend.Blockchain().CurrentBlock().Time()
|
||||
|
||||
h, err = web3Service.followBlockHeight(context.Background())
|
||||
h, err = web3Service.followedBlockHeight(context.Background())
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, expectedHeight, h, "Unexpected block height")
|
||||
}
|
||||
|
@ -71,8 +71,3 @@ func (_ *FaultyMockPOWChain) ClearPreGenesisData() {
|
||||
func (_ *FaultyMockPOWChain) IsConnectedToETH1() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// BlockExistsWithCache --
|
||||
func (f *FaultyMockPOWChain) BlockExistsWithCache(ctx context.Context, hash common.Hash) (bool, *big.Int, error) {
|
||||
return f.BlockExists(ctx, hash)
|
||||
}
|
||||
|
@ -179,11 +179,6 @@ func (m *POWChain) InsertBlock(height int, time uint64, hash []byte) *POWChain {
|
||||
return m
|
||||
}
|
||||
|
||||
// BlockExistsWithCache --
|
||||
func (m *POWChain) BlockExistsWithCache(ctx context.Context, hash common.Hash) (bool, *big.Int, error) {
|
||||
return m.BlockExists(ctx, hash)
|
||||
}
|
||||
|
||||
func SetupRPCServer() (*rpc.Server, string, error) {
|
||||
srv := rpc.NewServer()
|
||||
if err := srv.RegisterName("eth", &testETHRPC{}); err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user