diff --git a/.bazelrc b/.bazelrc index 81b5f178c..7eee06e4e 100644 --- a/.bazelrc +++ b/.bazelrc @@ -5,6 +5,9 @@ test --test_verbose_timeout_warnings test --build_tests_only test --test_output=errors +# E2E run with debug gotag +test:e2e --define gotags=debug + # Clearly indicate that coverage is enabled to disable certain nogo checks. coverage --define=coverage_enabled=1 diff --git a/beacon-chain/blockchain/process_block_test.go b/beacon-chain/blockchain/process_block_test.go index 9738e2d3f..69e81b254 100644 --- a/beacon-chain/blockchain/process_block_test.go +++ b/beacon-chain/blockchain/process_block_test.go @@ -319,7 +319,6 @@ func TestUpdateJustified_CouldUpdateBest(t *testing.T) { service.justifiedCheckpt = ðpb.Checkpoint{Root: []byte{'A'}} service.bestJustifiedCheckpt = ðpb.Checkpoint{Root: []byte{'A'}} st := testutil.NewBeaconState() - service.initSyncState[r] = st.Copy() require.NoError(t, db.SaveState(ctx, st.Copy(), r)) // Could update diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index ee1217bec..aa971b1c0 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -62,7 +62,6 @@ type Service struct { finalizedCheckpt *ethpb.Checkpoint prevFinalizedCheckpt *ethpb.Checkpoint nextEpochBoundarySlot uint64 - initSyncState map[[32]byte]*stateTrie.BeaconState boundaryRoots [][32]byte checkpointState *cache.CheckpointStateCache checkpointStateLock sync.Mutex @@ -113,7 +112,6 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) { maxRoutines: cfg.MaxRoutines, stateNotifier: cfg.StateNotifier, forkChoiceStore: cfg.ForkChoiceStore, - initSyncState: make(map[[32]byte]*stateTrie.BeaconState), boundaryRoots: [][32]byte{}, checkpointState: cache.NewCheckpointStateCache(), opsService: cfg.OpsService, @@ -337,12 +335,6 @@ func (s *Service) Status() error { return nil } -// ClearCachedStates removes all stored caches states. This is done after the node -// is synced. -func (s *Service) ClearCachedStates() { - s.initSyncState = map[[32]byte]*stateTrie.BeaconState{} -} - // This gets called when beacon chain is first initialized to save genesis data (state, block, and more) in db. func (s *Service) saveGenesisData(ctx context.Context, genesisState *stateTrie.BeaconState) error { stateRoot, err := genesisState.HashTreeRoot(ctx) diff --git a/beacon-chain/blockchain/testing/mock.go b/beacon-chain/blockchain/testing/mock.go index b28b33b2e..6a380a070 100644 --- a/beacon-chain/blockchain/testing/mock.go +++ b/beacon-chain/blockchain/testing/mock.go @@ -347,9 +347,6 @@ func (ms *ChainService) IsCanonical(_ context.Context, r [32]byte) (bool, error) return true, nil } -// ClearCachedStates does nothing. -func (ms *ChainService) ClearCachedStates() {} - // HasInitSyncBlock mocks the same method in the chain service. func (ms *ChainService) HasInitSyncBlock(_ [32]byte) bool { return false diff --git a/beacon-chain/cache/depositcache/deposits_cache.go b/beacon-chain/cache/depositcache/deposits_cache.go index 590f41855..39cb4a83d 100644 --- a/beacon-chain/cache/depositcache/deposits_cache.go +++ b/beacon-chain/cache/depositcache/deposits_cache.go @@ -50,12 +50,12 @@ type FinalizedDeposits struct { // stores all the deposit related data that is required by the beacon-node. type DepositCache struct { // Beacon chain deposits in memory. - pendingDeposits []*dbpb.DepositContainer - deposits []*dbpb.DepositContainer - finalizedDeposits *FinalizedDeposits - depositsLock sync.RWMutex - chainStartDeposits []*ethpb.Deposit - chainStartPubkeys map[string]bool + pendingDeposits []*dbpb.DepositContainer + deposits []*dbpb.DepositContainer + finalizedDeposits *FinalizedDeposits + depositsLock sync.RWMutex + chainStartPubkeys map[string]bool + chainStartPubkeysLock sync.RWMutex } // New instantiates a new deposit cache @@ -68,11 +68,10 @@ func New() (*DepositCache, error) { // finalizedDeposits.MerkleTrieIndex is initialized to -1 because it represents the index of the last trie item. // Inserting the first item into the trie will set the value of the index to 0. return &DepositCache{ - pendingDeposits: []*dbpb.DepositContainer{}, - deposits: []*dbpb.DepositContainer{}, - finalizedDeposits: &FinalizedDeposits{Deposits: finalizedDepositsTrie, MerkleTrieIndex: -1}, - chainStartPubkeys: make(map[string]bool), - chainStartDeposits: make([]*ethpb.Deposit, 0), + pendingDeposits: []*dbpb.DepositContainer{}, + deposits: []*dbpb.DepositContainer{}, + finalizedDeposits: &FinalizedDeposits{Deposits: finalizedDepositsTrie, MerkleTrieIndex: -1}, + chainStartPubkeys: make(map[string]bool), }, nil } @@ -156,6 +155,8 @@ func (dc *DepositCache) AllDepositContainers(ctx context.Context) []*dbpb.Deposi func (dc *DepositCache) MarkPubkeyForChainstart(ctx context.Context, pubkey string) { ctx, span := trace.StartSpan(ctx, "DepositsCache.MarkPubkeyForChainstart") defer span.End() + dc.chainStartPubkeysLock.Lock() + defer dc.chainStartPubkeysLock.Unlock() dc.chainStartPubkeys[pubkey] = true } @@ -163,6 +164,8 @@ func (dc *DepositCache) MarkPubkeyForChainstart(ctx context.Context, pubkey stri func (dc *DepositCache) PubkeyInChainstart(ctx context.Context, pubkey string) bool { ctx, span := trace.StartSpan(ctx, "DepositsCache.PubkeyInChainstart") defer span.End() + dc.chainStartPubkeysLock.RLock() + defer dc.chainStartPubkeysLock.RUnlock() if dc.chainStartPubkeys != nil { return dc.chainStartPubkeys[pubkey] } diff --git a/beacon-chain/cache/state_summary.go b/beacon-chain/cache/state_summary.go index a7cda4f07..580cb73ee 100644 --- a/beacon-chain/cache/state_summary.go +++ b/beacon-chain/cache/state_summary.go @@ -50,11 +50,11 @@ func (s *StateSummaryCache) GetAll() []*pb.StateSummary { s.initSyncStateSummariesLock.RLock() defer s.initSyncStateSummariesLock.RUnlock() - blks := make([]*pb.StateSummary, 0, len(s.initSyncStateSummaries)) + summaries := make([]*pb.StateSummary, 0, len(s.initSyncStateSummaries)) for _, b := range s.initSyncStateSummaries { - blks = append(blks, b) + summaries = append(summaries, b) } - return blks + return summaries } // Clear clears out the initial sync state summaries cache. diff --git a/beacon-chain/operations/attestations/kv/block.go b/beacon-chain/operations/attestations/kv/block.go index 9a4038c4f..7313ee386 100644 --- a/beacon-chain/operations/attestations/kv/block.go +++ b/beacon-chain/operations/attestations/kv/block.go @@ -20,7 +20,7 @@ func (p *AttCaches) SaveBlockAttestation(att *ethpb.Attestation) error { defer p.blockAttLock.Unlock() atts, ok := p.blockAtt[r] if !ok { - atts = make([]*ethpb.Attestation, 0) + atts = make([]*ethpb.Attestation, 0, 1) } // Ensure that this attestation is not already fully contained in an existing attestation. diff --git a/beacon-chain/operations/attestations/kv/forkchoice.go b/beacon-chain/operations/attestations/kv/forkchoice.go index 0928a708e..2c7e754be 100644 --- a/beacon-chain/operations/attestations/kv/forkchoice.go +++ b/beacon-chain/operations/attestations/kv/forkchoice.go @@ -16,9 +16,10 @@ func (p *AttCaches) SaveForkchoiceAttestation(att *ethpb.Attestation) error { return errors.Wrap(err, "could not tree hash attestation") } + att = stateTrie.CopyAttestation(att) p.forkchoiceAttLock.Lock() defer p.forkchoiceAttLock.Unlock() - p.forkchoiceAtt[r] = stateTrie.CopyAttestation(att) // Copied. + p.forkchoiceAtt[r] = att return nil } diff --git a/beacon-chain/operations/attestations/kv/unaggregated.go b/beacon-chain/operations/attestations/kv/unaggregated.go index 3b8d8807a..bac3ee708 100644 --- a/beacon-chain/operations/attestations/kv/unaggregated.go +++ b/beacon-chain/operations/attestations/kv/unaggregated.go @@ -28,9 +28,10 @@ func (p *AttCaches) SaveUnaggregatedAttestation(att *ethpb.Attestation) error { if err != nil { return errors.Wrap(err, "could not tree hash attestation") } + att = stateTrie.CopyAttestation(att) // Copied. p.unAggregateAttLock.Lock() defer p.unAggregateAttLock.Unlock() - p.unAggregatedAtt[r] = stateTrie.CopyAttestation(att) // Copied. + p.unAggregatedAtt[r] = att return nil } diff --git a/beacon-chain/operations/slashings/BUILD.bazel b/beacon-chain/operations/slashings/BUILD.bazel index ba1f84090..295b1799f 100644 --- a/beacon-chain/operations/slashings/BUILD.bazel +++ b/beacon-chain/operations/slashings/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", + "@com_github_trailofbits_go_mutexasserts//:go_default_library", "@io_opencensus_go//trace:go_default_library", ], ) @@ -36,6 +37,7 @@ go_test( srcs = [ "service_attester_test.go", "service_proposer_test.go", + "service_test.go", ], embed = [":go_default_library"], deps = [ diff --git a/beacon-chain/operations/slashings/service.go b/beacon-chain/operations/slashings/service.go index 79901827d..4b7041ce4 100644 --- a/beacon-chain/operations/slashings/service.go +++ b/beacon-chain/operations/slashings/service.go @@ -12,6 +12,7 @@ import ( beaconstate "github.com/prysmaticlabs/prysm/beacon-chain/state" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/sliceutil" + "github.com/trailofbits/go-mutexasserts" "go.opencensus.io/trace" ) @@ -259,10 +260,15 @@ func (p *Pool) MarkIncludedProposerSlashing(ps *ethpb.ProposerSlashing) { // this function checks a few items about a validator before proceeding with inserting // a proposer/attester slashing into the pool. First, it checks if the validator // has been recently included in the pool, then it checks if the validator is slashable. +// Note: this method requires caller to hold the lock. func (p *Pool) validatorSlashingPreconditionCheck( state *beaconstate.BeaconState, valIdx uint64, ) (bool, error) { + if !mutexasserts.RWMutexLocked(&p.lock) && !mutexasserts.RWMutexRLocked(&p.lock) { + return false, errors.New("pool.validatorSlashingPreconditionCheck: caller must hold read/write lock") + } + // Check if the validator index has been included recently. if p.included[valIdx] { return false, nil diff --git a/beacon-chain/operations/slashings/service_test.go b/beacon-chain/operations/slashings/service_test.go new file mode 100644 index 000000000..2dff1ed13 --- /dev/null +++ b/beacon-chain/operations/slashings/service_test.go @@ -0,0 +1,13 @@ +package slashings + +import ( + "testing" + + "github.com/prysmaticlabs/prysm/shared/testutil/require" +) + +func TestPool_validatorSlashingPreconditionCheck_requiresLock(t *testing.T) { + p := &Pool{} + _, err := p.validatorSlashingPreconditionCheck(nil, 0) + require.ErrorContains(t, "caller must hold read/write lock", err) +} diff --git a/beacon-chain/operations/voluntaryexits/service.go b/beacon-chain/operations/voluntaryexits/service.go index 25e167dc6..b4d53829b 100644 --- a/beacon-chain/operations/voluntaryexits/service.go +++ b/beacon-chain/operations/voluntaryexits/service.go @@ -110,5 +110,7 @@ func (p *Pool) MarkIncluded(exit *ethpb.SignedVoluntaryExit) { // HasBeenIncluded returns true if the pool has recorded that a validator index has been recorded. func (p *Pool) HasBeenIncluded(bIdx uint64) bool { + p.lock.RLock() + defer p.lock.RUnlock() return p.included[bIdx] } diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 421345f6a..ee7b48a50 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -93,6 +93,7 @@ go_library( "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", "@com_github_prysmaticlabs_go_ssz//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", + "@com_github_trailofbits_go_mutexasserts//:go_default_library", "@io_opencensus_go//trace:go_default_library", ], ) diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index 7087393cd..c5d0ed4be 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -40,7 +40,6 @@ type batchBlockReceiverFn func(ctx context.Context, blks []*eth.SignedBeaconBloc func (s *Service) roundRobinSync(genesis time.Time) error { ctx, cancel := context.WithCancel(s.ctx) defer cancel() - defer s.chain.ClearCachedStates() state.SkipSlotCache.Disable() defer state.SkipSlotCache.Enable() diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index ce2a78ecf..ed6fb676b 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -29,7 +29,6 @@ var _ shared.Service = (*Service)(nil) type blockchainService interface { blockchain.BlockReceiver blockchain.HeadFetcher - ClearCachedStates() blockchain.FinalizationFetcher } diff --git a/beacon-chain/sync/pending_attestations_queue.go b/beacon-chain/sync/pending_attestations_queue.go index 6fde77991..ecd06b6cc 100644 --- a/beacon-chain/sync/pending_attestations_queue.go +++ b/beacon-chain/sync/pending_attestations_queue.go @@ -47,8 +47,8 @@ func (s *Service) processPendingAtts(ctx context.Context) error { // be deleted from the queue if invalid (ie. getting staled from falling too many slots behind). s.validatePendingAtts(ctx, s.chain.CurrentSlot()) - roots := make([][32]byte, 0, len(s.blkRootToPendingAtts)) s.pendingAttsLock.RLock() + roots := make([][32]byte, 0, len(s.blkRootToPendingAtts)) for br := range s.blkRootToPendingAtts { roots = append(roots, br) } diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index 1d200add7..7cdecd9ce 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -18,6 +18,7 @@ import ( "github.com/prysmaticlabs/prysm/shared/slotutil" "github.com/prysmaticlabs/prysm/shared/traceutil" "github.com/sirupsen/logrus" + "github.com/trailofbits/go-mutexasserts" "go.opencensus.io/trace" ) @@ -255,6 +256,8 @@ func (s *Service) clearPendingSlots() { // Delete block from the list from the pending queue using the slot as key. // Note: this helper is not thread safe. func (s *Service) deleteBlockFromPendingQueue(slot uint64, b *ethpb.SignedBeaconBlock, r [32]byte) { + mutexasserts.AssertRWMutexLocked(&s.pendingQueueLock) + blks, ok := s.slotToPendingBlocks[slot] if !ok { return @@ -277,6 +280,8 @@ func (s *Service) deleteBlockFromPendingQueue(slot uint64, b *ethpb.SignedBeacon // Insert block to the list in the pending queue using the slot as key. // Note: this helper is not thread safe. func (s *Service) insertBlockToPendingQueue(slot uint64, b *ethpb.SignedBeaconBlock, r [32]byte) { + mutexasserts.AssertRWMutexLocked(&s.pendingQueueLock) + if s.seenPendingBlocks[r] { return } diff --git a/beacon-chain/sync/rate_limiter.go b/beacon-chain/sync/rate_limiter.go index 81db9fc9b..a09440abf 100644 --- a/beacon-chain/sync/rate_limiter.go +++ b/beacon-chain/sync/rate_limiter.go @@ -10,6 +10,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/flags" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/sirupsen/logrus" + "github.com/trailofbits/go-mutexasserts" ) const defaultBurstLimit = 5 @@ -137,6 +138,9 @@ func (l *limiter) free() { // not to be used outside the rate limiter file as it is unsafe for concurrent usage // and is protected by a lock on all of its usages here. func (l *limiter) retrieveCollector(topic string) (*leakybucket.Collector, error) { + if !mutexasserts.RWMutexLocked(&l.RWMutex) && !mutexasserts.RWMutexRLocked(&l.RWMutex) { + return nil, errors.New("limiter.retrieveCollector: caller must hold read/write lock") + } collector, ok := l.limiterMap[topic] if !ok { return nil, errors.Errorf("collector does not exist for topic %s", topic) diff --git a/beacon-chain/sync/rate_limiter_test.go b/beacon-chain/sync/rate_limiter_test.go index 18333ff13..06b31d679 100644 --- a/beacon-chain/sync/rate_limiter_test.go +++ b/beacon-chain/sync/rate_limiter_test.go @@ -62,3 +62,9 @@ func TestRateLimiter_ExceedCapacity(t *testing.T) { t.Fatal("Did not receive stream within 1 sec") } } + +func Test_limiter_retrieveCollector_requiresLock(t *testing.T) { + l := limiter{} + _, err := l.retrieveCollector("") + require.ErrorContains(t, "caller must hold read/write lock", err) +} diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go index 795559eab..d5511cb14 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go @@ -168,6 +168,8 @@ func TestRecentBeaconBlocksRPCHandler_HandleZeroBlocks(t *testing.T) { t.Fatal("Did not receive stream within 1 sec") } + r.rateLimiter.RLock() // retrieveCollector requires a lock to be held. + defer r.rateLimiter.RUnlock() lter, err := r.rateLimiter.retrieveCollector(topic) require.NoError(t, err) assert.Equal(t, 1, int(lter.Count(stream1.Conn().RemotePeer().String()))) diff --git a/deps.bzl b/deps.bzl index d49de0f0b..560e7bc06 100644 --- a/deps.bzl +++ b/deps.bzl @@ -3654,3 +3654,9 @@ def prysm_deps(): sum = "h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=", version = "v1.5.1", ) + go_repository( + name = "com_github_trailofbits_go_mutexasserts", + importpath = "github.com/trailofbits/go-mutexasserts", + sum = "h1:8LRP+2JK8piIUU16ZDgWDXwjJcuJNTtCzadjTZj8Jf0=", + version = "v0.0.0-20200708152505-19999e7d3cef", + ) diff --git a/go.mod b/go.mod index c0da0e3c8..d486297ac 100644 --- a/go.mod +++ b/go.mod @@ -98,6 +98,7 @@ require ( github.com/status-im/keycard-go v0.0.0-20200402102358-957c09536969 // indirect github.com/stretchr/testify v1.6.1 github.com/supranational/blst v0.1.2-alpha.1.0.20200917144033-cd0847a7580b + github.com/trailofbits/go-mutexasserts v0.0.0-20200708152505-19999e7d3cef github.com/tyler-smith/go-bip39 v1.0.2 github.com/urfave/cli/v2 v2.2.0 github.com/wealdtech/go-bytesutil v1.1.1 diff --git a/go.sum b/go.sum index f84118608..52b865a19 100644 --- a/go.sum +++ b/go.sum @@ -1008,6 +1008,8 @@ github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161/go.mod h1:wM7WEv github.com/templexxx/xor v0.0.0-20191217153810-f85b25db303b/go.mod h1:5XA7W9S6mni3h5uvOC75dA3m9CCCaS83lltmc0ukdi4= github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tjfoc/gmsm v1.3.0/go.mod h1:HaUcFuY0auTiaHB9MHFGCPx5IaLhTUd2atbCFBQXn9w= +github.com/trailofbits/go-mutexasserts v0.0.0-20200708152505-19999e7d3cef h1:8LRP+2JK8piIUU16ZDgWDXwjJcuJNTtCzadjTZj8Jf0= +github.com/trailofbits/go-mutexasserts v0.0.0-20200708152505-19999e7d3cef/go.mod h1:+SV/613m53DNAmlXPTWGZhIyt4E/qDvn9g/lOPRiy0A= github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs= github.com/tyler-smith/go-bip39 v1.0.2 h1:+t3w+KwLXO6154GNJY+qUtIxLTmFjfUmpguQT1OlOT8= github.com/tyler-smith/go-bip39 v1.0.2/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs= diff --git a/validator/db/kv/attestation_history_new.go b/validator/db/kv/attestation_history_new.go index df382d2ed..25e944889 100644 --- a/validator/db/kv/attestation_history_new.go +++ b/validator/db/kv/attestation_history_new.go @@ -22,13 +22,6 @@ const ( minimalSize = latestEpochWrittenSize ) -// AttestationHistoryNew stores the historical attestation data needed -// for protection of validators. -type AttestationHistoryNew struct { - TargetToSource map[uint64]*HistoryData - LatestEpochWritten uint64 -} - // HistoryData stores the needed data to confirm if an attestation is slashable // or repeated. type HistoryData struct {