diff --git a/beacon-chain/blockchain/forkchoice/BUILD.bazel b/beacon-chain/blockchain/forkchoice/BUILD.bazel index be4ce0edd..b0cca6eec 100644 --- a/beacon-chain/blockchain/forkchoice/BUILD.bazel +++ b/beacon-chain/blockchain/forkchoice/BUILD.bazel @@ -32,6 +32,7 @@ go_library( go_test( name = "go_default_test", srcs = [ + "benchmark_test.go", "process_attestation_test.go", "process_block_test.go", "service_test.go", @@ -42,7 +43,6 @@ go_test( "//beacon-chain/core/blocks:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/filters:go_default_library", - "//beacon-chain/db/kv:go_default_library", "//beacon-chain/db/testing:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//proto/eth/v1alpha1:go_default_library", diff --git a/beacon-chain/blockchain/forkchoice/benchmark_test.go b/beacon-chain/blockchain/forkchoice/benchmark_test.go new file mode 100644 index 000000000..efbb565f4 --- /dev/null +++ b/beacon-chain/blockchain/forkchoice/benchmark_test.go @@ -0,0 +1,174 @@ +package forkchoice + +import ( + "context" + "testing" + + testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/hashutil" +) + +func BenchmarkForkChoiceTree1(b *testing.B) { + ctx := context.Background() + db := testDB.SetupDB(b) + defer testDB.TeardownDB(b, db) + + store := NewForkChoiceService(ctx, db) + + roots, err := blockTree1(db) + if err != nil { + b.Fatal(err) + } + + // Benchmark fork choice with 1024 validators + validators := make([]*ethpb.Validator, 1024) + for i := 0; i < len(validators); i++ { + validators[i] = ðpb.Validator{ExitEpoch: 2, EffectiveBalance: 1e9} + } + s := &pb.BeaconState{Validators: validators} + if err := store.GenesisStore(ctx, s); err != nil { + b.Fatal(err) + } + + store.justifiedCheckpt.Root = roots[0] + if err := store.db.SaveState(ctx, s, bytesutil.ToBytes32(roots[0])); err != nil { + b.Fatal(err) + } + + h, err := hashutil.HashProto(store.justifiedCheckpt) + if err != nil { + log.Fatal(err) + } + store.checkptBlkRoot[h] = bytesutil.ToBytes32(roots[0]) + + // Spread out the votes evenly for all 3 leaf nodes + for i := 0; i < len(validators); i++ { + switch { + case i < 256: + if err := store.db.SaveValidatorLatestVote(ctx, uint64(i), &pb.ValidatorLatestVote{Root: roots[1]}); err != nil { + b.Fatal(err) + } + case i > 768: + if err := store.db.SaveValidatorLatestVote(ctx, uint64(i), &pb.ValidatorLatestVote{Root: roots[7]}); err != nil { + b.Fatal(err) + } + default: + if err := store.db.SaveValidatorLatestVote(ctx, uint64(i), &pb.ValidatorLatestVote{Root: roots[8]}); err != nil { + b.Fatal(err) + } + } + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := store.Head(ctx) + if err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkForkChoiceTree2(b *testing.B) { + ctx := context.Background() + db := testDB.SetupDB(b) + defer testDB.TeardownDB(b, db) + + store := NewForkChoiceService(ctx, db) + + roots, err := blockTree2(db) + if err != nil { + b.Fatal(err) + } + + // Benchmark fork choice with 1024 validators + validators := make([]*ethpb.Validator, 1024) + for i := 0; i < len(validators); i++ { + validators[i] = ðpb.Validator{ExitEpoch: 2, EffectiveBalance: 1e9} + } + s := &pb.BeaconState{Validators: validators} + if err := store.GenesisStore(ctx, s); err != nil { + b.Fatal(err) + } + + store.justifiedCheckpt.Root = roots[0] + if err := store.db.SaveState(ctx, s, bytesutil.ToBytes32(roots[0])); err != nil { + b.Fatal(err) + } + + h, err := hashutil.HashProto(store.justifiedCheckpt) + if err != nil { + log.Fatal(err) + } + store.checkptBlkRoot[h] = bytesutil.ToBytes32(roots[0]) + + // Spread out the votes evenly for all the leaf nodes. 8 to 15 + nodeIndex := 8 + for i := 0; i < len(validators); i++ { + if err := store.db.SaveValidatorLatestVote(ctx, uint64(i), &pb.ValidatorLatestVote{Root: roots[nodeIndex]}); err != nil { + b.Fatal(err) + } + if i%155 == 0 { + nodeIndex++ + } + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := store.Head(ctx) + if err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkForkChoiceTree3(b *testing.B) { + ctx := context.Background() + db := testDB.SetupDB(b) + defer testDB.TeardownDB(b, db) + + store := NewForkChoiceService(ctx, db) + + roots, err := blockTree3(db) + if err != nil { + b.Fatal(err) + } + + // Benchmark fork choice with 1024 validators + validators := make([]*ethpb.Validator, 1024) + for i := 0; i < len(validators); i++ { + validators[i] = ðpb.Validator{ExitEpoch: 2, EffectiveBalance: 1e9} + } + s := &pb.BeaconState{Validators: validators} + if err := store.GenesisStore(ctx, s); err != nil { + b.Fatal(err) + } + + store.justifiedCheckpt.Root = roots[0] + if err := store.db.SaveState(ctx, s, bytesutil.ToBytes32(roots[0])); err != nil { + b.Fatal(err) + } + + h, err := hashutil.HashProto(store.justifiedCheckpt) + if err != nil { + log.Fatal(err) + } + store.checkptBlkRoot[h] = bytesutil.ToBytes32(roots[0]) + + // All validators vote on the same head + for i := 0; i < len(validators); i++ { + if err := store.db.SaveValidatorLatestVote(ctx, uint64(i), &pb.ValidatorLatestVote{Root: roots[len(roots)-1]}); err != nil { + b.Fatal(err) + } + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := store.Head(ctx) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/beacon-chain/blockchain/forkchoice/process_attestation.go b/beacon-chain/blockchain/forkchoice/process_attestation.go index 5fc1a5afd..7c9637932 100644 --- a/beacon-chain/blockchain/forkchoice/process_attestation.go +++ b/beacon-chain/blockchain/forkchoice/process_attestation.go @@ -157,7 +157,6 @@ func (s *Store) updateAttVotes( tgtRoot []byte, tgtEpoch uint64) error { for _, i := range append(indexedAtt.CustodyBit_0Indices, indexedAtt.CustodyBit_1Indices...) { - s.db.HasValidatorLatestVote(ctx, i) vote, err := s.db.ValidatorLatestVote(ctx, i) if err != nil { return errors.Wrapf(err, "could not get latest vote for validator %d", i) diff --git a/beacon-chain/blockchain/forkchoice/process_attestation_test.go b/beacon-chain/blockchain/forkchoice/process_attestation_test.go index f353d3b69..fcbacc2b4 100644 --- a/beacon-chain/blockchain/forkchoice/process_attestation_test.go +++ b/beacon-chain/blockchain/forkchoice/process_attestation_test.go @@ -6,7 +6,6 @@ import ( "testing" "github.com/prysmaticlabs/go-ssz" - "github.com/prysmaticlabs/prysm/beacon-chain/db/kv" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1" @@ -16,10 +15,9 @@ import ( func TestStore_OnAttestation(t *testing.T) { ctx := context.Background() db := testDB.SetupDB(t) - kv := db.(*kv.Store) - defer testDB.TeardownDB(t, kv) + defer testDB.TeardownDB(t, db) - store := NewForkChoiceService(ctx, kv) + store := NewForkChoiceService(ctx, db) _, err := blockTree1(db) if err != nil { diff --git a/beacon-chain/blockchain/forkchoice/process_block_test.go b/beacon-chain/blockchain/forkchoice/process_block_test.go index c2895e9c4..7df382dfd 100644 --- a/beacon-chain/blockchain/forkchoice/process_block_test.go +++ b/beacon-chain/blockchain/forkchoice/process_block_test.go @@ -5,7 +5,6 @@ import ( "strings" "testing" - "github.com/prysmaticlabs/prysm/beacon-chain/db/kv" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1" @@ -16,10 +15,9 @@ import ( func TestStore_OnBlock(t *testing.T) { ctx := context.Background() db := testDB.SetupDB(t) - kv := db.(*kv.Store) - defer testDB.TeardownDB(t, kv) + defer testDB.TeardownDB(t, db) - store := NewForkChoiceService(ctx, kv) + store := NewForkChoiceService(ctx, db) roots, err := blockTree1(db) if err != nil { diff --git a/beacon-chain/blockchain/forkchoice/service.go b/beacon-chain/blockchain/forkchoice/service.go index 5c5511807..264b591ba 100644 --- a/beacon-chain/blockchain/forkchoice/service.go +++ b/beacon-chain/blockchain/forkchoice/service.go @@ -160,13 +160,13 @@ func (s *Store) latestAttestingBalance(ctx context.Context, root []byte) (uint64 balances := uint64(0) for _, i := range activeIndices { - if !s.db.HasValidatorLatestVote(ctx, i) { - continue - } vote, err := s.db.ValidatorLatestVote(ctx, i) if err != nil { return 0, errors.Wrapf(err, "could not get validator %d's latest vote", i) } + if vote == nil { + continue + } wantedRoot, err := s.ancestor(ctx, vote.Root, wantedBlk.Slot) if err != nil { diff --git a/beacon-chain/blockchain/forkchoice/service_test.go b/beacon-chain/blockchain/forkchoice/service_test.go index f413527df..b5259ddec 100644 --- a/beacon-chain/blockchain/forkchoice/service_test.go +++ b/beacon-chain/blockchain/forkchoice/service_test.go @@ -10,7 +10,6 @@ import ( "github.com/prysmaticlabs/go-ssz" "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" "github.com/prysmaticlabs/prysm/beacon-chain/db/filters" - "github.com/prysmaticlabs/prysm/beacon-chain/db/kv" testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1" @@ -21,10 +20,9 @@ import ( func TestStore_GensisStoreOk(t *testing.T) { ctx := context.Background() db := testDB.SetupDB(t) - kv := db.(*kv.Store) - defer testDB.TeardownDB(t, kv) + defer testDB.TeardownDB(t, db) - store := NewForkChoiceService(ctx, kv) + store := NewForkChoiceService(ctx, db) genesisTime := time.Unix(9999, 0) genesisState := &pb.BeaconState{GenesisTime: uint64(genesisTime.Unix())} @@ -71,10 +69,9 @@ func TestStore_GensisStoreOk(t *testing.T) { func TestStore_AncestorOk(t *testing.T) { ctx := context.Background() db := testDB.SetupDB(t) - kv := db.(*kv.Store) - defer testDB.TeardownDB(t, kv) + defer testDB.TeardownDB(t, db) - store := NewForkChoiceService(ctx, kv) + store := NewForkChoiceService(ctx, db) roots, err := blockTree1(db) if err != nil { @@ -112,10 +109,9 @@ func TestStore_AncestorOk(t *testing.T) { func TestStore_AncestorNotPartOfTheChain(t *testing.T) { ctx := context.Background() db := testDB.SetupDB(t) - kv := db.(*kv.Store) - defer testDB.TeardownDB(t, kv) + defer testDB.TeardownDB(t, db) - store := NewForkChoiceService(ctx, kv) + store := NewForkChoiceService(ctx, db) roots, err := blockTree1(db) if err != nil { @@ -144,10 +140,9 @@ func TestStore_AncestorNotPartOfTheChain(t *testing.T) { func TestStore_LatestAttestingBalance(t *testing.T) { ctx := context.Background() db := testDB.SetupDB(t) - kv := db.(*kv.Store) - defer testDB.TeardownDB(t, kv) + defer testDB.TeardownDB(t, db) - store := NewForkChoiceService(ctx, kv) + store := NewForkChoiceService(ctx, db) roots, err := blockTree1(db) if err != nil { @@ -210,10 +205,9 @@ func TestStore_LatestAttestingBalance(t *testing.T) { func TestStore_ChildrenBlocksFromParentRoot(t *testing.T) { ctx := context.Background() db := testDB.SetupDB(t) - kv := db.(*kv.Store) - defer testDB.TeardownDB(t, kv) + defer testDB.TeardownDB(t, db) - store := NewForkChoiceService(ctx, kv) + store := NewForkChoiceService(ctx, db) roots, err := blockTree1(db) if err != nil { @@ -242,10 +236,9 @@ func TestStore_ChildrenBlocksFromParentRoot(t *testing.T) { func TestStore_GetHead(t *testing.T) { ctx := context.Background() db := testDB.SetupDB(t) - kv := db.(*kv.Store) - defer testDB.TeardownDB(t, kv) + defer testDB.TeardownDB(t, db) - store := NewForkChoiceService(ctx, kv) + store := NewForkChoiceService(ctx, db) roots, err := blockTree1(db) if err != nil { diff --git a/beacon-chain/db/kv/BUILD.bazel b/beacon-chain/db/kv/BUILD.bazel index 601bd7037..c1aff6699 100644 --- a/beacon-chain/db/kv/BUILD.bazel +++ b/beacon-chain/db/kv/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//beacon-chain/db/filters:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//proto/eth/v1alpha1:go_default_library", + "//shared/bytesutil:go_default_library", "//shared/sliceutil:go_default_library", "@com_github_boltdb_bolt//:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", diff --git a/beacon-chain/db/kv/blocks.go b/beacon-chain/db/kv/blocks.go index 71ff55e1e..53f2edf0e 100644 --- a/beacon-chain/db/kv/blocks.go +++ b/beacon-chain/db/kv/blocks.go @@ -11,6 +11,7 @@ import ( "github.com/prysmaticlabs/go-ssz" "github.com/prysmaticlabs/prysm/beacon-chain/db/filters" ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/sliceutil" "go.opencensus.io/trace" ) @@ -19,6 +20,13 @@ import ( func (k *Store) Block(ctx context.Context, blockRoot [32]byte) (*ethpb.BeaconBlock, error) { ctx, span := trace.StartSpan(ctx, "BeaconDB.Block") defer span.End() + k.blocksLock.RLock() + // Return block from cache if it exists. + if blk, exists := k.blocks[blockRoot]; exists && blk != nil { + k.blocksLock.RUnlock() + return blk, nil + } + k.blocksLock.RUnlock() var block *ethpb.BeaconBlock err := k.db.View(func(tx *bolt.Tx) error { bkt := tx.Bucket(blocksBucket) @@ -145,6 +153,12 @@ func (k *Store) BlockRoots(ctx context.Context, f *filters.QueryFilter) ([][]byt func (k *Store) HasBlock(ctx context.Context, blockRoot [32]byte) bool { ctx, span := trace.StartSpan(ctx, "BeaconDB.HasBlock") defer span.End() + k.blocksLock.RLock() + if blk, exists := k.blocks[blockRoot]; exists && blk != nil { + k.blocksLock.RUnlock() + return true + } + k.blocksLock.RUnlock() exists := false // #nosec G104. Always returns nil. k.db.View(func(tx *bolt.Tx) error { @@ -174,6 +188,9 @@ func (k *Store) DeleteBlock(ctx context.Context, blockRoot [32]byte) error { if err := deleteValueForIndices(indicesByBucket, blockRoot[:], tx); err != nil { return errors.Wrap(err, "could not delete root for DB indices") } + k.blocksLock.Lock() + delete(k.blocks, blockRoot) + k.blocksLock.Unlock() return bkt.Delete(blockRoot[:]) }) } @@ -186,6 +203,14 @@ func (k *Store) SaveBlock(ctx context.Context, block *ethpb.BeaconBlock) error { if err != nil { return err } + + k.blocksLock.RLock() + // Skip saving block to DB if it exists in the cache. + if blk, exists := k.blocks[blockRoot]; exists && blk != nil { + k.blocksLock.RUnlock() + return nil + } + k.blocksLock.RUnlock() enc, err := proto.Marshal(block) if err != nil { return err @@ -196,6 +221,9 @@ func (k *Store) SaveBlock(ctx context.Context, block *ethpb.BeaconBlock) error { if err := updateValueForIndices(indicesByBucket, blockRoot[:], tx); err != nil { return errors.Wrap(err, "could not update DB indices") } + k.blocksLock.Lock() + k.blocks[blockRoot] = block + k.blocksLock.Unlock() return bkt.Put(blockRoot[:], enc) }) } @@ -225,6 +253,9 @@ func (k *Store) SaveBlocks(ctx context.Context, blocks []*ethpb.BeaconBlock) err if err := updateValueForIndices(indicesByBucket, keys[i], tx); err != nil { return errors.Wrap(err, "could not update DB indices") } + k.blocksLock.Lock() + k.blocks[bytesutil.ToBytes32(keys[i])] = blocks[i] + k.blocksLock.Unlock() if err := bucket.Put(keys[i], encodedValues[i]); err != nil { return err } diff --git a/beacon-chain/db/kv/kv.go b/beacon-chain/db/kv/kv.go index d31d68cab..9702ce960 100644 --- a/beacon-chain/db/kv/kv.go +++ b/beacon-chain/db/kv/kv.go @@ -3,10 +3,13 @@ package kv import ( "os" "path" + "sync" "time" "github.com/boltdb/bolt" "github.com/pkg/errors" + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1" ) // Store defines an implementation of the Prysm Database interface @@ -14,6 +17,12 @@ import ( type Store struct { db *bolt.DB databasePath string + + // Caching layer properties. + blocksLock sync.RWMutex + votesLock sync.RWMutex + blocks map[[32]byte]*ethpb.BeaconBlock + latestVotes map[uint64]*pb.ValidatorLatestVote } // NewKVStore initializes a new boltDB key-value store at the directory @@ -32,7 +41,12 @@ func NewKVStore(dirPath string) (*Store, error) { return nil, err } - kv := &Store{db: boltDB, databasePath: dirPath} + kv := &Store{ + db: boltDB, + databasePath: dirPath, + blocks: make(map[[32]byte]*ethpb.BeaconBlock), + latestVotes: make(map[uint64]*pb.ValidatorLatestVote), + } if err := kv.db.Update(func(tx *bolt.Tx) error { return createBuckets( diff --git a/beacon-chain/db/kv/validators.go b/beacon-chain/db/kv/validators.go index 927661954..c6c2ac7d4 100644 --- a/beacon-chain/db/kv/validators.go +++ b/beacon-chain/db/kv/validators.go @@ -15,6 +15,15 @@ import ( func (k *Store) ValidatorLatestVote(ctx context.Context, validatorIdx uint64) (*pb.ValidatorLatestVote, error) { ctx, span := trace.StartSpan(ctx, "BeaconDB.ValidatorLatestVote") defer span.End() + + k.votesLock.RLock() + // Return latest vote from cache if it exists. + if vote, exists := k.latestVotes[validatorIdx]; exists && vote != nil { + k.votesLock.RUnlock() + return vote, nil + } + k.votesLock.RUnlock() + buf := uint64ToBytes(validatorIdx) var latestVote *pb.ValidatorLatestVote err := k.db.View(func(tx *bolt.Tx) error { @@ -33,6 +42,14 @@ func (k *Store) ValidatorLatestVote(ctx context.Context, validatorIdx uint64) (* func (k *Store) HasValidatorLatestVote(ctx context.Context, validatorIdx uint64) bool { ctx, span := trace.StartSpan(ctx, "BeaconDB.HasValidatorLatestVote") defer span.End() + + k.votesLock.RLock() + if vote, exists := k.latestVotes[validatorIdx]; exists && vote != nil { + k.votesLock.RUnlock() + return true + } + k.votesLock.RUnlock() + buf := uint64ToBytes(validatorIdx) exists := false // #nosec G104. Always returns nil. @@ -55,6 +72,9 @@ func (k *Store) SaveValidatorLatestVote(ctx context.Context, validatorIdx uint64 } return k.db.Update(func(tx *bolt.Tx) error { bucket := tx.Bucket(validatorsBucket) + k.votesLock.Lock() + k.latestVotes[validatorIdx] = vote + k.votesLock.Unlock() return bucket.Put(buf, enc) }) }