mirror of
https://gitlab.com/pulsechaincom/prysm-pulse.git
synced 2024-12-25 12:57:18 +00:00
Implement flat spans cache into DB (#6248)
* Implement flat spans cache DB * Fix interface * Fix db func * Gaz * Update slasher/detection/attestations/types/epoch_store_test.go * Fix tests * Fix * Fix for comments * Fix test Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
This commit is contained in:
parent
0bfa1ecd03
commit
cd439adfc7
@ -31,7 +31,7 @@ type ReadOnlyDatabase interface {
|
||||
LatestIndexedAttestationsTargetEpoch(ctx context.Context) (uint64, error)
|
||||
|
||||
// MinMaxSpan related methods.
|
||||
EpochSpans(ctx context.Context, epoch uint64) (*detectionTypes.EpochStore, error)
|
||||
EpochSpans(ctx context.Context, epoch uint64, fromCache bool) (*detectionTypes.EpochStore, error)
|
||||
EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]detectionTypes.Span, bool, error)
|
||||
EpochSpanByValidatorIndex(ctx context.Context, validatorIdx uint64, epoch uint64) (detectionTypes.Span, error)
|
||||
EpochsSpanByValidatorsIndices(ctx context.Context, validatorIndices []uint64, maxEpoch uint64) (map[uint64]map[uint64]detectionTypes.Span, error)
|
||||
@ -69,7 +69,7 @@ type WriteAccessDatabase interface {
|
||||
PruneAttHistory(ctx context.Context, currentEpoch uint64, pruningEpochAge uint64) error
|
||||
|
||||
// MinMaxSpan related methods.
|
||||
SaveEpochSpans(ctx context.Context, epoch uint64, spans *detectionTypes.EpochStore) error
|
||||
SaveEpochSpans(ctx context.Context, epoch uint64, spans *detectionTypes.EpochStore, toCache bool) error
|
||||
SaveEpochSpansMap(ctx context.Context, epoch uint64, spanMap map[uint64]detectionTypes.Span) error
|
||||
SaveValidatorEpochSpan(ctx context.Context, validatorIdx uint64, epoch uint64, spans detectionTypes.Span) error
|
||||
SaveCachedSpansMaps(ctx context.Context) error
|
||||
|
@ -34,7 +34,7 @@ func BenchmarkStore_SaveEpochSpans(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
err := db.SaveEpochSpans(ctx, uint64(i%54000), es)
|
||||
err := db.SaveEpochSpans(ctx, uint64(i%54000), es, false)
|
||||
if err != nil {
|
||||
b.Fatalf("Save validator span map failed: %v", err)
|
||||
}
|
||||
@ -60,7 +60,7 @@ func BenchmarkStore_EpochSpans(b *testing.B) {
|
||||
}
|
||||
b.Log(len(es.Bytes()))
|
||||
for i := 0; i < 200; i++ {
|
||||
err := db.SaveEpochSpans(ctx, uint64(i), es)
|
||||
err := db.SaveEpochSpans(ctx, uint64(i), es, false)
|
||||
if err != nil {
|
||||
b.Fatalf("Save validator span map failed: %v", err)
|
||||
}
|
||||
@ -69,7 +69,7 @@ func BenchmarkStore_EpochSpans(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err := db.EpochSpans(ctx, uint64(i%200))
|
||||
_, err := db.EpochSpans(ctx, uint64(i%200), false)
|
||||
if err != nil {
|
||||
b.Fatalf("Read validator span map failed: %v", err)
|
||||
}
|
||||
|
@ -45,10 +45,16 @@ func persistFlatSpanMapsOnEviction(db *Store) func(key interface{}, value interf
|
||||
// for slashing detection.
|
||||
// Returns span byte array, and error in case of db error.
|
||||
// returns empty byte array if no entry for this epoch exists in db.
|
||||
func (db *Store) EpochSpans(ctx context.Context, epoch uint64) (*types.EpochStore, error) {
|
||||
func (db *Store) EpochSpans(ctx context.Context, epoch uint64, fromCache bool) (*types.EpochStore, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "slasherDB.EpochSpans")
|
||||
defer span.End()
|
||||
|
||||
// Get from the cache if it exists or is requested, if not, go to DB.
|
||||
if fromCache && db.flatSpanCache.Has(epoch) || db.flatSpanCache.Has(epoch) {
|
||||
spans, _ := db.flatSpanCache.Get(epoch)
|
||||
return spans, nil
|
||||
}
|
||||
|
||||
var copiedSpans []byte
|
||||
err := db.view(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(validatorsMinMaxSpanBucketNew)
|
||||
@ -70,7 +76,7 @@ func (db *Store) EpochSpans(ctx context.Context, epoch uint64) (*types.EpochStor
|
||||
}
|
||||
|
||||
// SaveEpochSpans accepts a epoch and span byte array and writes it to disk.
|
||||
func (db *Store) SaveEpochSpans(ctx context.Context, epoch uint64, es *types.EpochStore) error {
|
||||
func (db *Store) SaveEpochSpans(ctx context.Context, epoch uint64, es *types.EpochStore, toCache bool) error {
|
||||
ctx, span := trace.StartSpan(ctx, "slasherDB.SaveEpochSpans")
|
||||
defer span.End()
|
||||
|
||||
@ -78,6 +84,14 @@ func (db *Store) SaveEpochSpans(ctx context.Context, epoch uint64, es *types.Epo
|
||||
return types.ErrWrongSize
|
||||
}
|
||||
|
||||
// Saving to the cache if it exists so cache and DB never conflict.
|
||||
if toCache || db.flatSpanCache.Has(epoch) {
|
||||
db.flatSpanCache.Set(epoch, es)
|
||||
}
|
||||
if toCache {
|
||||
return nil
|
||||
}
|
||||
|
||||
return db.update(func(tx *bolt.Tx) error {
|
||||
b, err := tx.CreateBucketIfNotExists(validatorsMinMaxSpanBucketNew)
|
||||
if err != nil {
|
||||
|
@ -1,12 +1,14 @@
|
||||
package kv
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"flag"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
dbTypes "github.com/prysmaticlabs/prysm/slasher/db/types"
|
||||
"github.com/prysmaticlabs/prysm/slasher/detection/attestations/types"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
@ -59,7 +61,7 @@ func TestValidatorSpans_NilDB(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
validatorIdx := uint64(1)
|
||||
es, err := db.EpochSpans(ctx, validatorIdx)
|
||||
es, err := db.EpochSpans(ctx, validatorIdx, false)
|
||||
if err != nil {
|
||||
t.Fatalf("Nil EpochSpansMap should not return error: %v", err)
|
||||
}
|
||||
@ -88,10 +90,10 @@ func TestStore_SaveReadEpochSpans(t *testing.T) {
|
||||
if err != tt.err {
|
||||
t.Fatalf("Failed to get the right error expected: %v got: %v", tt.err, err)
|
||||
}
|
||||
if err = db.SaveEpochSpans(ctx, tt.epoch, es); err != nil {
|
||||
if err = db.SaveEpochSpans(ctx, tt.epoch, es, false); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
sm, err := db.EpochSpans(ctx, tt.epoch)
|
||||
sm, err := db.EpochSpans(ctx, tt.epoch, false)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get validator spans: %v", err)
|
||||
}
|
||||
@ -117,3 +119,86 @@ func TestStore_SaveReadEpochSpans(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_SaveEpochSpans_ToCache(t *testing.T) {
|
||||
app := cli.App{}
|
||||
set := flag.NewFlagSet("test", 0)
|
||||
db := setupDB(t, cli.NewContext(&app, set, nil))
|
||||
ctx := context.Background()
|
||||
|
||||
spansToSave := map[uint64]types.Span{
|
||||
0: {MinSpan: 5, MaxSpan: 69, SigBytes: [2]byte{40, 219}, HasAttested: false},
|
||||
10: {MinSpan: 43, MaxSpan: 32, SigBytes: [2]byte{10, 13}, HasAttested: true},
|
||||
1000: {MinSpan: 40, MaxSpan: 36, SigBytes: [2]byte{61, 151}, HasAttested: false},
|
||||
10000: {MinSpan: 40, MaxSpan: 64, SigBytes: [2]byte{190, 215}, HasAttested: true},
|
||||
50000: {MinSpan: 40, MaxSpan: 64, SigBytes: [2]byte{190, 215}, HasAttested: true},
|
||||
100: {MinSpan: 49, MaxSpan: 96, SigBytes: [2]byte{11, 98}, HasAttested: true},
|
||||
}
|
||||
epochStore, err := types.EpochStoreFromMap(spansToSave)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
epoch := uint64(9)
|
||||
if err := db.SaveEpochSpans(ctx, epoch, epochStore, dbTypes.UseCache); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
esFromCache, err := db.EpochSpans(ctx, epoch, dbTypes.UseCache)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !bytes.Equal(epochStore.Bytes(), esFromCache.Bytes()) {
|
||||
t.Fatalf("Expected store from DB to be %#x, received %#x", epochStore.Bytes(), esFromCache.Bytes())
|
||||
}
|
||||
|
||||
esFromDB, err := db.EpochSpans(ctx, epoch, dbTypes.UseDB)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !bytes.Equal(esFromDB.Bytes(), esFromCache.Bytes()) {
|
||||
t.Fatalf("Expected store asked from DB to use cache, \nreceived %#x, \nexpected %#x", esFromDB.Bytes(), esFromCache.Bytes())
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_SaveEpochSpans_ToDB(t *testing.T) {
|
||||
app := cli.App{}
|
||||
set := flag.NewFlagSet("test", 0)
|
||||
db := setupDB(t, cli.NewContext(&app, set, nil))
|
||||
ctx := context.Background()
|
||||
|
||||
spansToSave := map[uint64]types.Span{
|
||||
0: {MinSpan: 5, MaxSpan: 69, SigBytes: [2]byte{40, 219}, HasAttested: false},
|
||||
10: {MinSpan: 43, MaxSpan: 32, SigBytes: [2]byte{10, 13}, HasAttested: true},
|
||||
1000: {MinSpan: 40, MaxSpan: 36, SigBytes: [2]byte{61, 151}, HasAttested: false},
|
||||
10000: {MinSpan: 40, MaxSpan: 64, SigBytes: [2]byte{190, 215}, HasAttested: true},
|
||||
100000: {MinSpan: 20, MaxSpan: 64, SigBytes: [2]byte{170, 215}, HasAttested: false},
|
||||
100: {MinSpan: 49, MaxSpan: 96, SigBytes: [2]byte{11, 98}, HasAttested: true},
|
||||
}
|
||||
epochStore, err := types.EpochStoreFromMap(spansToSave)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
epoch := uint64(9)
|
||||
if err := db.SaveEpochSpans(ctx, epoch, epochStore, dbTypes.UseDB); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Expect cache to retrieve from DB if its not in cache.
|
||||
esFromCache, err := db.EpochSpans(ctx, epoch, dbTypes.UseCache)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !bytes.Equal(esFromCache.Bytes(), epochStore.Bytes()) {
|
||||
t.Fatalf("Expected cache request to be %#x, expected %#x", epochStore.Bytes(), esFromCache.Bytes())
|
||||
}
|
||||
|
||||
esFromDB, err := db.EpochSpans(ctx, epoch, dbTypes.UseDB)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !bytes.Equal(epochStore.Bytes(), esFromDB.Bytes()) {
|
||||
t.Fatalf("Expected store from DB to be %#x, received %#x", epochStore.Bytes(), esFromDB.Bytes())
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,13 @@ const (
|
||||
Reverted //relevant again
|
||||
)
|
||||
|
||||
const (
|
||||
// UseCache is used to mark when calling a DB function, to save it to the cache.
|
||||
UseCache = true
|
||||
// UseDB is used to mark when calling a DB function, to save it to the DB.
|
||||
UseDB = false
|
||||
)
|
||||
|
||||
func (status SlashingStatus) String() string {
|
||||
names := [...]string{
|
||||
"Unknown",
|
||||
|
@ -19,5 +19,8 @@ go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["epoch_store_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = ["//slasher/db/testing:go_default_library"],
|
||||
deps = [
|
||||
"//slasher/db/testing:go_default_library",
|
||||
"//slasher/db/types:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -93,7 +93,7 @@ func (es *EpochStore) Bytes() []byte {
|
||||
|
||||
// ToMap is a helper function to convert an epoch store to a map, mainly used for testing.
|
||||
func (es *EpochStore) ToMap() (map[uint64]Span, error) {
|
||||
spanMap := make(map[uint64]Span)
|
||||
spanMap := make(map[uint64]Span, es.highestObservedIdx)
|
||||
var err error
|
||||
for i := uint64(0); i < es.highestObservedIdx; i++ {
|
||||
spanMap[i], err = es.GetValidatorSpan(i)
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"testing"
|
||||
|
||||
testDB "github.com/prysmaticlabs/prysm/slasher/db/testing"
|
||||
dbTypes "github.com/prysmaticlabs/prysm/slasher/db/types"
|
||||
"github.com/prysmaticlabs/prysm/slasher/detection/attestations/types"
|
||||
)
|
||||
|
||||
@ -256,7 +257,7 @@ func BenchmarkEpochStore_Save(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
if err := db.SaveEpochSpans(context.Background(), 1, store); err != nil {
|
||||
if err := db.SaveEpochSpans(context.Background(), 1, store, dbTypes.UseDB); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user