Code health: review map usage (#7635)

* remove unused cache states map

* correct typo

* Remove unused array

* Add lock around deposits cache chainstart pubkeys

* Copy attestation before grabbing lock. This may reduce lock contention time as other callers wanting the lock do not need to wait as long for the lock to become available.

* Copy attestation before grabbing lock. This may reduce lock contention time as other callers wanting the lock do not need to wait as long for the lock to become available.

* Set capacity to 1 since it is known that the slice will be 1 after insertion

* require validatorSlashingPreconditionCheck caller to hold lock

* Add lock for voluntary exits pool HasBeenIncluded

* Require rate limiter retrieveCollector to hold lock

* Add lock requirement assertions in sync

* Remove unused struct

* remove ClearCachedStates API

* field initSyncState is unused

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
This commit is contained in:
Preston Van Loon 2020-10-26 16:17:07 -05:00 committed by GitHub
parent f1bce1001d
commit fcbb168c76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 76 additions and 39 deletions

View File

@ -5,6 +5,9 @@ test --test_verbose_timeout_warnings
test --build_tests_only
test --test_output=errors
# E2E run with debug gotag
test:e2e --define gotags=debug
# Clearly indicate that coverage is enabled to disable certain nogo checks.
coverage --define=coverage_enabled=1

View File

@ -319,7 +319,6 @@ func TestUpdateJustified_CouldUpdateBest(t *testing.T) {
service.justifiedCheckpt = &ethpb.Checkpoint{Root: []byte{'A'}}
service.bestJustifiedCheckpt = &ethpb.Checkpoint{Root: []byte{'A'}}
st := testutil.NewBeaconState()
service.initSyncState[r] = st.Copy()
require.NoError(t, db.SaveState(ctx, st.Copy(), r))
// Could update

View File

@ -62,7 +62,6 @@ type Service struct {
finalizedCheckpt *ethpb.Checkpoint
prevFinalizedCheckpt *ethpb.Checkpoint
nextEpochBoundarySlot uint64
initSyncState map[[32]byte]*stateTrie.BeaconState
boundaryRoots [][32]byte
checkpointState *cache.CheckpointStateCache
checkpointStateLock sync.Mutex
@ -113,7 +112,6 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
maxRoutines: cfg.MaxRoutines,
stateNotifier: cfg.StateNotifier,
forkChoiceStore: cfg.ForkChoiceStore,
initSyncState: make(map[[32]byte]*stateTrie.BeaconState),
boundaryRoots: [][32]byte{},
checkpointState: cache.NewCheckpointStateCache(),
opsService: cfg.OpsService,
@ -337,12 +335,6 @@ func (s *Service) Status() error {
return nil
}
// ClearCachedStates removes all stored caches states. This is done after the node
// is synced.
func (s *Service) ClearCachedStates() {
s.initSyncState = map[[32]byte]*stateTrie.BeaconState{}
}
// This gets called when beacon chain is first initialized to save genesis data (state, block, and more) in db.
func (s *Service) saveGenesisData(ctx context.Context, genesisState *stateTrie.BeaconState) error {
stateRoot, err := genesisState.HashTreeRoot(ctx)

View File

@ -347,9 +347,6 @@ func (ms *ChainService) IsCanonical(_ context.Context, r [32]byte) (bool, error)
return true, nil
}
// ClearCachedStates does nothing.
func (ms *ChainService) ClearCachedStates() {}
// HasInitSyncBlock mocks the same method in the chain service.
func (ms *ChainService) HasInitSyncBlock(_ [32]byte) bool {
return false

View File

@ -54,8 +54,8 @@ type DepositCache struct {
deposits []*dbpb.DepositContainer
finalizedDeposits *FinalizedDeposits
depositsLock sync.RWMutex
chainStartDeposits []*ethpb.Deposit
chainStartPubkeys map[string]bool
chainStartPubkeysLock sync.RWMutex
}
// New instantiates a new deposit cache
@ -72,7 +72,6 @@ func New() (*DepositCache, error) {
deposits: []*dbpb.DepositContainer{},
finalizedDeposits: &FinalizedDeposits{Deposits: finalizedDepositsTrie, MerkleTrieIndex: -1},
chainStartPubkeys: make(map[string]bool),
chainStartDeposits: make([]*ethpb.Deposit, 0),
}, nil
}
@ -156,6 +155,8 @@ func (dc *DepositCache) AllDepositContainers(ctx context.Context) []*dbpb.Deposi
func (dc *DepositCache) MarkPubkeyForChainstart(ctx context.Context, pubkey string) {
ctx, span := trace.StartSpan(ctx, "DepositsCache.MarkPubkeyForChainstart")
defer span.End()
dc.chainStartPubkeysLock.Lock()
defer dc.chainStartPubkeysLock.Unlock()
dc.chainStartPubkeys[pubkey] = true
}
@ -163,6 +164,8 @@ func (dc *DepositCache) MarkPubkeyForChainstart(ctx context.Context, pubkey stri
func (dc *DepositCache) PubkeyInChainstart(ctx context.Context, pubkey string) bool {
ctx, span := trace.StartSpan(ctx, "DepositsCache.PubkeyInChainstart")
defer span.End()
dc.chainStartPubkeysLock.RLock()
defer dc.chainStartPubkeysLock.RUnlock()
if dc.chainStartPubkeys != nil {
return dc.chainStartPubkeys[pubkey]
}

View File

@ -50,11 +50,11 @@ func (s *StateSummaryCache) GetAll() []*pb.StateSummary {
s.initSyncStateSummariesLock.RLock()
defer s.initSyncStateSummariesLock.RUnlock()
blks := make([]*pb.StateSummary, 0, len(s.initSyncStateSummaries))
summaries := make([]*pb.StateSummary, 0, len(s.initSyncStateSummaries))
for _, b := range s.initSyncStateSummaries {
blks = append(blks, b)
summaries = append(summaries, b)
}
return blks
return summaries
}
// Clear clears out the initial sync state summaries cache.

View File

@ -20,7 +20,7 @@ func (p *AttCaches) SaveBlockAttestation(att *ethpb.Attestation) error {
defer p.blockAttLock.Unlock()
atts, ok := p.blockAtt[r]
if !ok {
atts = make([]*ethpb.Attestation, 0)
atts = make([]*ethpb.Attestation, 0, 1)
}
// Ensure that this attestation is not already fully contained in an existing attestation.

View File

@ -16,9 +16,10 @@ func (p *AttCaches) SaveForkchoiceAttestation(att *ethpb.Attestation) error {
return errors.Wrap(err, "could not tree hash attestation")
}
att = stateTrie.CopyAttestation(att)
p.forkchoiceAttLock.Lock()
defer p.forkchoiceAttLock.Unlock()
p.forkchoiceAtt[r] = stateTrie.CopyAttestation(att) // Copied.
p.forkchoiceAtt[r] = att
return nil
}

View File

@ -28,9 +28,10 @@ func (p *AttCaches) SaveUnaggregatedAttestation(att *ethpb.Attestation) error {
if err != nil {
return errors.Wrap(err, "could not tree hash attestation")
}
att = stateTrie.CopyAttestation(att) // Copied.
p.unAggregateAttLock.Lock()
defer p.unAggregateAttLock.Unlock()
p.unAggregatedAtt[r] = stateTrie.CopyAttestation(att) // Copied.
p.unAggregatedAtt[r] = att
return nil
}

View File

@ -26,6 +26,7 @@ go_library(
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_trailofbits_go_mutexasserts//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
)
@ -36,6 +37,7 @@ go_test(
srcs = [
"service_attester_test.go",
"service_proposer_test.go",
"service_test.go",
],
embed = [":go_default_library"],
deps = [

View File

@ -12,6 +12,7 @@ import (
beaconstate "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
"github.com/trailofbits/go-mutexasserts"
"go.opencensus.io/trace"
)
@ -259,10 +260,15 @@ func (p *Pool) MarkIncludedProposerSlashing(ps *ethpb.ProposerSlashing) {
// this function checks a few items about a validator before proceeding with inserting
// a proposer/attester slashing into the pool. First, it checks if the validator
// has been recently included in the pool, then it checks if the validator is slashable.
// Note: this method requires caller to hold the lock.
func (p *Pool) validatorSlashingPreconditionCheck(
state *beaconstate.BeaconState,
valIdx uint64,
) (bool, error) {
if !mutexasserts.RWMutexLocked(&p.lock) && !mutexasserts.RWMutexRLocked(&p.lock) {
return false, errors.New("pool.validatorSlashingPreconditionCheck: caller must hold read/write lock")
}
// Check if the validator index has been included recently.
if p.included[valIdx] {
return false, nil

View File

@ -0,0 +1,13 @@
package slashings
import (
"testing"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestPool_validatorSlashingPreconditionCheck_requiresLock(t *testing.T) {
p := &Pool{}
_, err := p.validatorSlashingPreconditionCheck(nil, 0)
require.ErrorContains(t, "caller must hold read/write lock", err)
}

View File

@ -110,5 +110,7 @@ func (p *Pool) MarkIncluded(exit *ethpb.SignedVoluntaryExit) {
// HasBeenIncluded returns true if the pool has recorded that a validator index has been recorded.
func (p *Pool) HasBeenIncluded(bIdx uint64) bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.included[bIdx]
}

View File

@ -93,6 +93,7 @@ go_library(
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_trailofbits_go_mutexasserts//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
)

View File

@ -40,7 +40,6 @@ type batchBlockReceiverFn func(ctx context.Context, blks []*eth.SignedBeaconBloc
func (s *Service) roundRobinSync(genesis time.Time) error {
ctx, cancel := context.WithCancel(s.ctx)
defer cancel()
defer s.chain.ClearCachedStates()
state.SkipSlotCache.Disable()
defer state.SkipSlotCache.Enable()

View File

@ -29,7 +29,6 @@ var _ shared.Service = (*Service)(nil)
type blockchainService interface {
blockchain.BlockReceiver
blockchain.HeadFetcher
ClearCachedStates()
blockchain.FinalizationFetcher
}

View File

@ -47,8 +47,8 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
// be deleted from the queue if invalid (ie. getting staled from falling too many slots behind).
s.validatePendingAtts(ctx, s.chain.CurrentSlot())
roots := make([][32]byte, 0, len(s.blkRootToPendingAtts))
s.pendingAttsLock.RLock()
roots := make([][32]byte, 0, len(s.blkRootToPendingAtts))
for br := range s.blkRootToPendingAtts {
roots = append(roots, br)
}

View File

@ -18,6 +18,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/slotutil"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"github.com/sirupsen/logrus"
"github.com/trailofbits/go-mutexasserts"
"go.opencensus.io/trace"
)
@ -255,6 +256,8 @@ func (s *Service) clearPendingSlots() {
// Delete block from the list from the pending queue using the slot as key.
// Note: this helper is not thread safe.
func (s *Service) deleteBlockFromPendingQueue(slot uint64, b *ethpb.SignedBeaconBlock, r [32]byte) {
mutexasserts.AssertRWMutexLocked(&s.pendingQueueLock)
blks, ok := s.slotToPendingBlocks[slot]
if !ok {
return
@ -277,6 +280,8 @@ func (s *Service) deleteBlockFromPendingQueue(slot uint64, b *ethpb.SignedBeacon
// Insert block to the list in the pending queue using the slot as key.
// Note: this helper is not thread safe.
func (s *Service) insertBlockToPendingQueue(slot uint64, b *ethpb.SignedBeaconBlock, r [32]byte) {
mutexasserts.AssertRWMutexLocked(&s.pendingQueueLock)
if s.seenPendingBlocks[r] {
return
}

View File

@ -10,6 +10,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/sirupsen/logrus"
"github.com/trailofbits/go-mutexasserts"
)
const defaultBurstLimit = 5
@ -137,6 +138,9 @@ func (l *limiter) free() {
// not to be used outside the rate limiter file as it is unsafe for concurrent usage
// and is protected by a lock on all of its usages here.
func (l *limiter) retrieveCollector(topic string) (*leakybucket.Collector, error) {
if !mutexasserts.RWMutexLocked(&l.RWMutex) && !mutexasserts.RWMutexRLocked(&l.RWMutex) {
return nil, errors.New("limiter.retrieveCollector: caller must hold read/write lock")
}
collector, ok := l.limiterMap[topic]
if !ok {
return nil, errors.Errorf("collector does not exist for topic %s", topic)

View File

@ -62,3 +62,9 @@ func TestRateLimiter_ExceedCapacity(t *testing.T) {
t.Fatal("Did not receive stream within 1 sec")
}
}
func Test_limiter_retrieveCollector_requiresLock(t *testing.T) {
l := limiter{}
_, err := l.retrieveCollector("")
require.ErrorContains(t, "caller must hold read/write lock", err)
}

View File

@ -168,6 +168,8 @@ func TestRecentBeaconBlocksRPCHandler_HandleZeroBlocks(t *testing.T) {
t.Fatal("Did not receive stream within 1 sec")
}
r.rateLimiter.RLock() // retrieveCollector requires a lock to be held.
defer r.rateLimiter.RUnlock()
lter, err := r.rateLimiter.retrieveCollector(topic)
require.NoError(t, err)
assert.Equal(t, 1, int(lter.Count(stream1.Conn().RemotePeer().String())))

View File

@ -3654,3 +3654,9 @@ def prysm_deps():
sum = "h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=",
version = "v1.5.1",
)
go_repository(
name = "com_github_trailofbits_go_mutexasserts",
importpath = "github.com/trailofbits/go-mutexasserts",
sum = "h1:8LRP+2JK8piIUU16ZDgWDXwjJcuJNTtCzadjTZj8Jf0=",
version = "v0.0.0-20200708152505-19999e7d3cef",
)

1
go.mod
View File

@ -98,6 +98,7 @@ require (
github.com/status-im/keycard-go v0.0.0-20200402102358-957c09536969 // indirect
github.com/stretchr/testify v1.6.1
github.com/supranational/blst v0.1.2-alpha.1.0.20200917144033-cd0847a7580b
github.com/trailofbits/go-mutexasserts v0.0.0-20200708152505-19999e7d3cef
github.com/tyler-smith/go-bip39 v1.0.2
github.com/urfave/cli/v2 v2.2.0
github.com/wealdtech/go-bytesutil v1.1.1

2
go.sum
View File

@ -1008,6 +1008,8 @@ github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161/go.mod h1:wM7WEv
github.com/templexxx/xor v0.0.0-20191217153810-f85b25db303b/go.mod h1:5XA7W9S6mni3h5uvOC75dA3m9CCCaS83lltmc0ukdi4=
github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tjfoc/gmsm v1.3.0/go.mod h1:HaUcFuY0auTiaHB9MHFGCPx5IaLhTUd2atbCFBQXn9w=
github.com/trailofbits/go-mutexasserts v0.0.0-20200708152505-19999e7d3cef h1:8LRP+2JK8piIUU16ZDgWDXwjJcuJNTtCzadjTZj8Jf0=
github.com/trailofbits/go-mutexasserts v0.0.0-20200708152505-19999e7d3cef/go.mod h1:+SV/613m53DNAmlXPTWGZhIyt4E/qDvn9g/lOPRiy0A=
github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs=
github.com/tyler-smith/go-bip39 v1.0.2 h1:+t3w+KwLXO6154GNJY+qUtIxLTmFjfUmpguQT1OlOT8=
github.com/tyler-smith/go-bip39 v1.0.2/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs=

View File

@ -22,13 +22,6 @@ const (
minimalSize = latestEpochWrittenSize
)
// AttestationHistoryNew stores the historical attestation data needed
// for protection of validators.
type AttestationHistoryNew struct {
TargetToSource map[uint64]*HistoryData
LatestEpochWritten uint64
}
// HistoryData stores the needed data to confirm if an attestation is slashable
// or repeated.
type HistoryData struct {