Add Ethereum 1 block->timestamp cache (#4924)

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
This commit is contained in:
Jim McDonald 2020-02-24 16:53:35 +00:00 committed by GitHub
parent 0470d37072
commit a951c4f6ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 88 additions and 34 deletions

View File

@ -44,6 +44,8 @@ go_library(
"@com_github_gogo_protobuf//types:go_default_library",
"@com_github_patrickmn_go_cache//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",

View File

@ -10,6 +10,8 @@ import (
"time"
cache "github.com/patrickmn/go-cache"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache"
@ -29,20 +31,22 @@ import (
// infostream is a struct for each instance of the infostream created by a client connection.
type infostream struct {
ctx context.Context
headFetcher blockchain.HeadFetcher
depositFetcher depositcache.DepositFetcher
blockFetcher powchain.POWBlockFetcher
beaconDB db.ReadOnlyDatabase
pubKeys [][]byte
pubKeysMutex *sync.RWMutex
stateChannel chan *feed.Event
stateSub event.Subscription
eth1Deposits *cache.Cache
eth1DepositsMutex *sync.RWMutex
currentEpoch uint64
stream ethpb.BeaconChain_StreamValidatorsInfoServer
genesisTime uint64
ctx context.Context
headFetcher blockchain.HeadFetcher
depositFetcher depositcache.DepositFetcher
blockFetcher powchain.POWBlockFetcher
beaconDB db.ReadOnlyDatabase
pubKeys [][]byte
pubKeysMutex *sync.RWMutex
stateChannel chan *feed.Event
stateSub event.Subscription
eth1Deposits *cache.Cache
eth1DepositsMutex *sync.RWMutex
eth1Blocktimes *cache.Cache
eth1BlocktimesMutex *sync.RWMutex
currentEpoch uint64
stream ethpb.BeaconChain_StreamValidatorsInfoServer
genesisTime uint64
}
// eth1Deposit contains information about a deposit made on the Ethereum 1 chain.
@ -51,6 +55,33 @@ type eth1Deposit struct {
data *ethpb.Deposit_Data
}
var (
eth1DepositCacheHits = promauto.NewCounter(
prometheus.CounterOpts{
Name: "infostream_eth1_deposit_cache_hits",
Help: "The number of times the infostream Ethereum 1 deposit cache is hit.",
},
)
eth1DepositCacheMisses = promauto.NewCounter(
prometheus.CounterOpts{
Name: "infostream_eth1_deposit_cache_misses",
Help: "The number of times the infostream Ethereum 1 deposit cache is missed.",
},
)
eth1BlocktimeCacheHits = promauto.NewCounter(
prometheus.CounterOpts{
Name: "infostream_eth1_blocktime_cache_hits",
Help: "The number of times the infostream Ethereum 1 block time cache is hit.",
},
)
eth1BlocktimeCacheMisses = promauto.NewCounter(
prometheus.CounterOpts{
Name: "infostream_eth1_blocktime_cache_misses",
Help: "The number of times the infostream Ethereum 1 block time cache is missed.",
},
)
)
// StreamValidatorsInfo returns a stream of information for given validators.
// Validators are supplied dynamically by the client, and can be added, removed and reset at any time.
// Information about the current set of validators is supplied as soon as the end-of-epoch accounting has been processed,
@ -59,7 +90,7 @@ type eth1Deposit struct {
// over time. If this is not required then the client can either wait until the beacon node is synced, or filter results
// based on the epoch value in the returned validator info.
func (bs *Server) StreamValidatorsInfo(stream ethpb.BeaconChain_StreamValidatorsInfoServer) error {
stateChannel := make(chan *feed.Event, 1)
stateChannel := make(chan *feed.Event, params.BeaconConfig().SlotsPerEpoch)
epochDuration := time.Duration(params.BeaconConfig().SecondsPerSlot*params.BeaconConfig().SlotsPerEpoch) * time.Second
// Fetch our current epoch.
@ -73,20 +104,22 @@ func (bs *Server) StreamValidatorsInfo(stream ethpb.BeaconChain_StreamValidators
// Create an infostream struct. This will track relevant state for the stream.
infostream := &infostream{
ctx: bs.Ctx,
headFetcher: bs.HeadFetcher,
depositFetcher: bs.DepositFetcher,
blockFetcher: bs.BlockFetcher,
beaconDB: bs.BeaconDB,
pubKeys: make([][]byte, 0),
pubKeysMutex: &sync.RWMutex{},
stateChannel: stateChannel,
stateSub: bs.StateNotifier.StateFeed().Subscribe(stateChannel),
eth1Deposits: cache.New(epochDuration, epochDuration*2),
eth1DepositsMutex: &sync.RWMutex{},
currentEpoch: headState.Slot() / params.BeaconConfig().SlotsPerEpoch,
stream: stream,
genesisTime: headState.GenesisTime(),
ctx: bs.Ctx,
headFetcher: bs.HeadFetcher,
depositFetcher: bs.DepositFetcher,
blockFetcher: bs.BlockFetcher,
beaconDB: bs.BeaconDB,
pubKeys: make([][]byte, 0),
pubKeysMutex: &sync.RWMutex{},
stateChannel: stateChannel,
stateSub: bs.StateNotifier.StateFeed().Subscribe(stateChannel),
eth1Deposits: cache.New(epochDuration, epochDuration*2),
eth1DepositsMutex: &sync.RWMutex{},
eth1Blocktimes: cache.New(epochDuration*12, epochDuration*24),
eth1BlocktimesMutex: &sync.RWMutex{},
currentEpoch: headState.Slot() / params.BeaconConfig().SlotsPerEpoch,
stream: stream,
genesisTime: headState.GenesisTime(),
}
defer infostream.stateSub.Unsubscribe()
@ -124,7 +157,7 @@ func (is *infostream) handleConnection() error {
select {
case event := <-is.stateChannel:
if event.Type == statefeed.BlockProcessed {
go is.handleBlockProcessed()
is.handleBlockProcessed()
}
case <-is.stateSub.Err():
return status.Error(codes.Aborted, "Subscriber closed")
@ -274,6 +307,9 @@ func (is *infostream) generateValidatorInfo(pubKey []byte, validators []*state.R
// We don't know of this validator; it's either a pending deposit or totally unknown.
return is.generatePendingValidatorInfo(info)
}
if info.Index >= uint64(len(validators)) {
return nil, status.Error(codes.Internal, "Unknown validator index")
}
validator := validators[info.Index]
// Status and progression timestamp
@ -298,8 +334,10 @@ func (is *infostream) generatePendingValidatorInfo(info *ethpb.ValidatorInfo) (*
var deposit *eth1Deposit
is.eth1DepositsMutex.Lock()
if fetchedDeposit, exists := is.eth1Deposits.Get(key); exists {
eth1DepositCacheHits.Inc()
deposit = fetchedDeposit.(*eth1Deposit)
} else {
eth1DepositCacheMisses.Inc()
fetchedDeposit, eth1BlockNumber := is.depositFetcher.DepositByPubkey(is.ctx, info.PublicKey)
if fetchedDeposit == nil {
deposit = &eth1Deposit{}
@ -452,12 +490,26 @@ func (is *infostream) epochToTimestamp(epoch uint64) uint64 {
// depositQueueTimestamp calculates the timestamp for exit of the validator from the deposit queue.
func (is *infostream) depositQueueTimestamp(eth1BlockNumber *big.Int) (uint64, error) {
blockTimeStamp, err := is.blockFetcher.BlockTimeByHeight(is.ctx, eth1BlockNumber)
if err != nil {
return 0, err
var blockTimestamp uint64
key := fmt.Sprintf("%v", eth1BlockNumber)
is.eth1BlocktimesMutex.Lock()
if cachedTimestamp, exists := is.eth1Blocktimes.Get(key); exists {
eth1BlocktimeCacheHits.Inc()
blockTimestamp = cachedTimestamp.(uint64)
} else {
eth1BlocktimeCacheMisses.Inc()
var err error
blockTimestamp, err = is.blockFetcher.BlockTimeByHeight(is.ctx, eth1BlockNumber)
if err != nil {
is.eth1BlocktimesMutex.Unlock()
return 0, err
}
is.eth1Blocktimes.Set(key, blockTimestamp, cache.DefaultExpiration)
}
is.eth1BlocktimesMutex.Unlock()
followTime := time.Duration(params.BeaconConfig().Eth1FollowDistance*params.BeaconConfig().GoerliBlockTime) * time.Second
eth1UnixTime := time.Unix(int64(blockTimeStamp), 0).Add(followTime)
eth1UnixTime := time.Unix(int64(blockTimestamp), 0).Add(followTime)
votingPeriod := time.Duration(params.BeaconConfig().SlotsPerEth1VotingPeriod*params.BeaconConfig().SecondsPerSlot) * time.Second
activationTime := eth1UnixTime.Add(votingPeriod)