diff --git a/kv/kvcache/cache.go b/kv/kvcache/cache.go index 0e9416948..66d83031c 100644 --- a/kv/kvcache/cache.go +++ b/kv/kvcache/cache.go @@ -20,6 +20,7 @@ import ( "context" "encoding/binary" "fmt" + "hash" "sort" "sync" "time" @@ -30,7 +31,9 @@ import ( "github.com/ledgerwatch/erigon-lib/gointerfaces" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/log/v3" "go.uber.org/atomic" + "golang.org/x/crypto/sha3" ) type Cache interface { @@ -41,6 +44,7 @@ type Cache interface { } type CacheView interface { Get(k []byte) ([]byte, error) + GetCode(k []byte) ([]byte, error) } // Coherent works on top of Database Transaction and pair Coherent+ReadTransaction must @@ -78,29 +82,32 @@ type CacheView interface { // - only OnNewBlock method may do parent.Clone() and apply StateChanges to create coherent view of kv.Tx // - parent.Clone() can't be caled if parent.isCanonical=false // - only OnNewBlock method can set view.isCanonical=true -// Rules of filling cache.evictList: -// - changes in Canonical View SHOULD reflect in evictList -// - changes in Non-Canonical View SHOULD NOT reflect in evictList +// Rules of filling cache.stateEvict: +// - changes in Canonical View SHOULD reflect in stateEvict +// - changes in Non-Canonical View SHOULD NOT reflect in stateEvict type Coherent struct { - hits, miss, timeout *metrics.Counter - keys, evict *metrics.Counter - latestView *CoherentRoot - evictList *List - roots map[ViewID]*CoherentRoot - lock sync.RWMutex - cfg CoherentConfig - latestViewID ViewID + hits, miss, timeout *metrics.Counter + keys, evict *metrics.Counter + codeHits, codeMiss, codeKeys *metrics.Counter + latestStateView *CoherentRoot + roots map[ViewID]*CoherentRoot + stateEvict, codeEvict *ThreadSafeEvictionList + lock sync.RWMutex + cfg CoherentConfig + latestViewID ViewID + hasher hash.Hash } type CoherentRoot struct { cache *btree.BTree + codeCache *btree.BTree ready chan struct{} // close when ready readyChanClosed atomic.Bool // protecting `ready` field from double-close (on unwind). Consumers don't need check this field. // Views marked as `Canonical` if it received onNewBlock message // we may drop `Non-Canonical` views even if they had fresh keys - // keys added to `Non-Canonical` views SHOULD NOT be added to evictList - // cache.latestView is always `Canonical` + // keys added to `Non-Canonical` views SHOULD NOT be added to stateEvict + // cache.latestStateView is always `Canonical` isCanonical bool } @@ -112,7 +119,8 @@ type CoherentView struct { tx kv.Tx } -func (c *CoherentView) Get(k []byte) ([]byte, error) { return c.cache.Get(k, c.tx, c.viewID) } +func (c *CoherentView) Get(k []byte) ([]byte, error) { return c.cache.Get(k, c.tx, c.viewID) } +func (c *CoherentView) GetCode(k []byte) ([]byte, error) { return c.cache.GetCode(k, c.tx, c.viewID) } var _ Cache = (*Coherent)(nil) // compile-time interface check var _ CacheView = (*CoherentView)(nil) // compile-time interface check @@ -120,31 +128,41 @@ var _ CacheView = (*CoherentView)(nil) // compile-time interface check const DEGREE = 32 type CoherentConfig struct { - KeepViews uint64 // keep in memory up to this amount of views, evict older - NewBlockWait time.Duration // how long wait - MetricsLabel string - WithStorage bool - KeysLimit int + KeepViews uint64 // keep in memory up to this amount of views, evict older + NewBlockWait time.Duration // how long wait + MetricsLabel string + WithStorage bool + KeysLimit int + CodeKeysLimit int } var DefaultCoherentConfig = CoherentConfig{ - KeepViews: 50, - NewBlockWait: 50 * time.Millisecond, - KeysLimit: 1_000_000, - MetricsLabel: "default", - WithStorage: true, + KeepViews: 50, + NewBlockWait: 50 * time.Millisecond, + KeysLimit: 1_000_000, + CodeKeysLimit: 10_000, + MetricsLabel: "default", + WithStorage: true, } func New(cfg CoherentConfig) *Coherent { + if cfg.KeepViews == 0 { + panic("empty config passed") + } return &Coherent{ - roots: map[ViewID]*CoherentRoot{}, - evictList: NewList(), - cfg: cfg, - miss: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_total{result="miss",name="%s"}`, cfg.MetricsLabel)), - hits: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_total{result="hit",name="%s"}`, cfg.MetricsLabel)), - timeout: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_timeout_total{name="%s"}`, cfg.MetricsLabel)), - keys: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_keys_total{name="%s"}`, cfg.MetricsLabel)), - evict: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_list_total{name="%s"}`, cfg.MetricsLabel)), + roots: map[ViewID]*CoherentRoot{}, + stateEvict: &ThreadSafeEvictionList{l: NewList()}, + codeEvict: &ThreadSafeEvictionList{l: NewList()}, + hasher: sha3.NewLegacyKeccak256(), + cfg: cfg, + miss: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_total{result="miss",name="%s"}`, cfg.MetricsLabel)), + hits: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_total{result="hit",name="%s"}`, cfg.MetricsLabel)), + timeout: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_timeout_total{name="%s"}`, cfg.MetricsLabel)), + keys: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_keys_total{name="%s"}`, cfg.MetricsLabel)), + evict: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_list_total{name="%s"}`, cfg.MetricsLabel)), + codeMiss: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_code_total{result="miss",name="%s"}`, cfg.MetricsLabel)), + codeHits: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_code_total{result="hit",name="%s"}`, cfg.MetricsLabel)), + codeKeys: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_code_keys_total{name="%s"}`, cfg.MetricsLabel)), } } @@ -156,14 +174,11 @@ func (c *Coherent) selectOrCreateRoot(viewID ViewID) *CoherentRoot { if ok { return r } - r = &CoherentRoot{ready: make(chan struct{})} - if prevView, ok := c.roots[viewID-1]; ok { - //log.Info("advance: clone", "from", viewID-1, "to", viewID) - r.cache = prevView.cache.Clone() - r.isCanonical = prevView.isCanonical - } else { - //log.Info("advance: new", "to", viewID) - r.cache = btree.New(DEGREE) + + r = &CoherentRoot{ + ready: make(chan struct{}), + cache: btree.New(DEGREE), + codeCache: btree.New(DEGREE), } c.roots[viewID] = r return r @@ -176,17 +191,25 @@ func (c *Coherent) advanceRoot(viewID ViewID) (r *CoherentRoot) { r = &CoherentRoot{ready: make(chan struct{})} c.roots[viewID] = r } + if prevView, ok := c.roots[viewID-1]; ok && prevView.isCanonical { //log.Info("advance: clone", "from", viewID-1, "to", viewID) r.cache = prevView.cache.Clone() + r.codeCache = prevView.codeCache.Clone() } else { - c.evictList.Init() + c.stateEvict.Init() + c.codeEvict.Init() if r.cache == nil { //log.Info("advance: new", "to", viewID) r.cache = btree.New(DEGREE) + r.codeCache = btree.New(DEGREE) } else { r.cache.Ascend(func(i btree.Item) bool { - c.evictList.PushFront(i.(*Element)) + c.stateEvict.PushFront(i.(*Element)) + return true + }) + r.codeCache.Ascend(func(i btree.Item) bool { + c.codeEvict.PushFront(i.(*Element)) return true }) } @@ -195,10 +218,11 @@ func (c *Coherent) advanceRoot(viewID ViewID) (r *CoherentRoot) { c.evictRoots() c.latestViewID = viewID - c.latestView = r + c.latestStateView = r - c.keys.Set(uint64(c.latestView.cache.Len())) - c.evict.Set(uint64(c.evictList.Len())) + c.keys.Set(uint64(c.latestStateView.cache.Len())) + c.codeKeys.Set(uint64(c.latestStateView.codeCache.Len())) + c.evict.Set(uint64(c.stateEvict.Len())) return r } @@ -210,16 +234,31 @@ func (c *Coherent) OnNewBlock(stateChanges *remote.StateChangeBatch) { for _, sc := range stateChanges.ChangeBatch { for i := range sc.Changes { switch sc.Changes[i].Action { - case remote.Action_UPSERT, remote.Action_UPSERT_CODE: + case remote.Action_UPSERT: addr := gointerfaces.ConvertH160toAddress(sc.Changes[i].Address) v := sc.Changes[i].Data //fmt.Printf("set: %x,%x\n", addr, v) c.add(addr[:], v, r, id) + case remote.Action_UPSERT_CODE: + addr := gointerfaces.ConvertH160toAddress(sc.Changes[i].Address) + v := sc.Changes[i].Data + c.add(addr[:], v, r, id) + c.hasher.Reset() + c.hasher.Write(sc.Changes[i].Code) + k := make([]byte, 32) + c.hasher.Sum(k) + c.addCode(k, sc.Changes[i].Code, r, id) case remote.Action_DELETE: addr := gointerfaces.ConvertH160toAddress(sc.Changes[i].Address) c.add(addr[:], nil, r, id) - case remote.Action_CODE, remote.Action_STORAGE: - //skip + case remote.Action_STORAGE: + //skip, will check later + case remote.Action_CODE: + c.hasher.Reset() + c.hasher.Write(sc.Changes[i].Code) + k := make([]byte, 32) + c.hasher.Sum(k) + c.addCode(k, sc.Changes[i].Code, r, id) default: panic("not implemented yet") } @@ -268,33 +307,38 @@ func (c *Coherent) View(ctx context.Context, tx kv.Tx) (CacheView, error) { return &CoherentView{viewID: ViewID(tx.ViewID()), tx: tx, cache: c}, nil } -func (c *Coherent) getFromCache(k []byte, id ViewID) (btree.Item, *CoherentRoot, bool, error) { +func (c *Coherent) getFromCache(k []byte, id ViewID, code bool) (btree.Item, *CoherentRoot, error) { c.lock.RLock() defer c.lock.RUnlock() - isLatest := c.latestViewID == id + //TODO: create thread-safe list wrapper r, ok := c.roots[id] if !ok { - return nil, r, isLatest, fmt.Errorf("too old ViewID: %d, latestViewID=%d", id, c.latestViewID) + return nil, r, fmt.Errorf("too old ViewID: %d, latestViewID=%d", id, c.latestViewID) + } + isLatest := c.latestViewID == id + + var it btree.Item + if code { + it = r.codeCache.Get(&Element{K: k}) + } else { + it = r.cache.Get(&Element{K: k}) + } + if it != nil && isLatest { + c.stateEvict.MoveToFront(it.(*Element)) } - it := r.cache.Get(&Element{K: k}) - return it, r, isLatest, nil + return it, r, nil } - func (c *Coherent) Get(k []byte, tx kv.Tx, id ViewID) ([]byte, error) { - it, r, isLatest, err := c.getFromCache(k, id) + it, r, err := c.getFromCache(k, id, false) if err != nil { return nil, err } if it != nil { c.hits.Inc() - - if isLatest { - c.evictList.MoveToFront(it.(*Element)) - } //fmt.Printf("from cache: %#x,%x\n", k, it.(*Element).V) return it.(*Element).V, nil } @@ -312,13 +356,50 @@ func (c *Coherent) Get(k []byte, tx kv.Tx, id ViewID) ([]byte, error) { return v, nil } +func (c *Coherent) GetCode(k []byte, tx kv.Tx, id ViewID) ([]byte, error) { + it, r, err := c.getFromCache(k, id, true) + if err != nil { + return nil, err + } + + if it != nil { + c.codeHits.Inc() + //fmt.Printf("from cache: %#x,%x\n", k, it.(*Element).V) + return it.(*Element).V, nil + } + + //{ + // r, ok := c.roots[id] + // if ok { + // fmt.Printf("miss: %x,%d,%d\n", k, id, r.codeCache.Len()) + // } + //} + c.codeMiss.Inc() + v, err := tx.GetOne(kv.Code, k) + if err != nil { + return nil, err + } + //fmt.Printf("from db: %#x,%x\n", k, v) + + c.lock.Lock() + defer c.lock.Unlock() + v = c.addCode(common.Copy(k), common.Copy(v), r, id).V + return v, nil +} func (c *Coherent) removeOldest(r *CoherentRoot) { - e := c.evictList.Back() + e := c.stateEvict.Oldest() if e != nil { - c.evictList.Remove(e) + c.stateEvict.Remove(e) r.cache.Delete(e) } } +func (c *Coherent) removeOldestCode(r *CoherentRoot) { + e := c.codeEvict.Oldest() + if e != nil { + c.codeEvict.Remove(e) + r.codeCache.Delete(e) + } +} func (c *Coherent) add(k, v []byte, r *CoherentRoot, id ViewID) *Element { it := &Element{K: k, V: v} replaced := r.cache.ReplaceOrInsert(it) @@ -327,16 +408,34 @@ func (c *Coherent) add(k, v []byte, r *CoherentRoot, id ViewID) *Element { return it } if replaced != nil { - c.evictList.Remove(replaced.(*Element)) + c.stateEvict.Remove(replaced.(*Element)) } - c.evictList.PushFront(it) - evict := c.evictList.Len() > c.cfg.KeysLimit + c.stateEvict.PushFront(it) + evict := c.stateEvict.Len() > c.cfg.KeysLimit // Verify size not exceeded if evict { c.removeOldest(r) } return it } +func (c *Coherent) addCode(k, v []byte, r *CoherentRoot, id ViewID) *Element { + it := &Element{K: k, V: v} + replaced := r.codeCache.ReplaceOrInsert(it) + if c.latestViewID != id { + //fmt.Printf("add to non-last viewID: %d<%d\n", c.latestViewID, id) + return it + } + if replaced != nil { + c.codeEvict.Remove(replaced.(*Element)) + } + c.codeEvict.PushFront(it) + evict := c.codeEvict.Len() > c.cfg.CodeKeysLimit + // Verify size not exceeded + if evict { + c.removeOldestCode(r) + } + return it +} type Stat struct { BlockNum uint64 @@ -407,6 +506,7 @@ func (c *Coherent) evictRoots() { return } to := c.latestViewID - ViewID(c.cfg.KeepViews) + fmt.Printf("evict: %d,%d,%d\n", c.cfg.KeepViews, c.latestViewID, to) //fmt.Printf("collecting: %d\n", to) var toDel []ViewID for txId := range c.roots { @@ -415,7 +515,7 @@ func (c *Coherent) evictRoots() { } toDel = append(toDel, txId) } - //log.Info("forget old roots", "list", fmt.Sprintf("%d", toDel)) + log.Info("forget old roots", "list", fmt.Sprintf("%d", toDel)) for _, txId := range toDel { delete(c.roots, txId) } @@ -423,10 +523,10 @@ func (c *Coherent) evictRoots() { func (c *Coherent) Len() int { c.lock.RLock() defer c.lock.RUnlock() - if c.latestView == nil { + if c.latestStateView == nil { return 0 } - return c.latestView.cache.Len() //todo: is it same with cache.len()? + return c.latestStateView.cache.Len() //todo: is it same with cache.len()? } // Element is an element of a linked list. @@ -449,6 +549,48 @@ func (e *Element) Less(than btree.Item) bool { return bytes.Compare(e.K, than.(*Element).K) < 0 } +type ThreadSafeEvictionList struct { + l *List + lock sync.RWMutex +} + +func (l *ThreadSafeEvictionList) Init() { + l.lock.Lock() + l.l.Init() + l.lock.Unlock() +} +func (l *ThreadSafeEvictionList) PushFront(e *Element) { + l.lock.Lock() + l.l.PushFront(e) + l.lock.Unlock() +} + +func (l *ThreadSafeEvictionList) MoveToFront(e *Element) { + l.lock.Lock() + l.l.MoveToFront(e) + l.lock.Unlock() +} + +func (l *ThreadSafeEvictionList) Remove(e *Element) { + l.lock.Lock() + l.l.Remove(e) + l.lock.Unlock() +} + +func (l *ThreadSafeEvictionList) Oldest() *Element { + l.lock.Lock() + e := l.l.Back() + l.lock.Unlock() + return e +} + +func (l *ThreadSafeEvictionList) Len() int { + l.lock.RLock() + length := l.l.Len() + l.lock.RUnlock() + return length +} + // ========= copypaste of List implementation from stdlib ======== // Next returns the next list element or nil. diff --git a/kv/kvcache/cache_test.go b/kv/kvcache/cache_test.go index f7561a891..37d97aa4a 100644 --- a/kv/kvcache/cache_test.go +++ b/kv/kvcache/cache_test.go @@ -42,7 +42,7 @@ func TestEvictionInUnexpectedOrder(t *testing.T) { require.False(c.roots[2].isCanonical) c.add([]byte{1}, nil, c.roots[2], 2) - require.Equal(0, c.evictList.Len()) + require.Equal(0, c.stateEvict.Len()) c.advanceRoot(2) require.Equal(1, len(c.roots)) @@ -50,7 +50,7 @@ func TestEvictionInUnexpectedOrder(t *testing.T) { require.True(c.roots[2].isCanonical) c.add([]byte{1}, nil, c.roots[2], 2) - require.Equal(1, c.evictList.Len()) + require.Equal(1, c.stateEvict.Len()) c.selectOrCreateRoot(5) require.Equal(2, len(c.roots)) @@ -58,9 +58,9 @@ func TestEvictionInUnexpectedOrder(t *testing.T) { require.False(c.roots[5].isCanonical) c.add([]byte{2}, nil, c.roots[5], 5) // not added to evict list - require.Equal(1, c.evictList.Len()) + require.Equal(1, c.stateEvict.Len()) c.add([]byte{2}, nil, c.roots[2], 2) // added to evict list, because it's latest view - require.Equal(2, c.evictList.Len()) + require.Equal(2, c.stateEvict.Len()) c.selectOrCreateRoot(6) require.Equal(3, len(c.roots)) @@ -93,8 +93,8 @@ func TestEvictionInUnexpectedOrder(t *testing.T) { require.True(c.roots[100].isCanonical) //c.add([]byte{1}, nil, c.roots[2], 2) - require.Equal(0, c.latestView.cache.Len()) - require.Equal(0, c.evictList.Len()) + require.Equal(0, c.latestStateView.cache.Len()) + require.Equal(0, c.stateEvict.Len()) } func TestEviction(t *testing.T) { @@ -116,11 +116,11 @@ func TestEviction(t *testing.T) { _, _ = c.Get([]byte{1}, tx, view.viewID) _, _ = c.Get([]byte{2}, tx, view.viewID) _, _ = c.Get([]byte{3}, tx, view.viewID) - //require.Equal(c.roots[c.latestViewID].cache.Len(), c.evictList.Len()) + //require.Equal(c.roots[c.latestViewID].cache.Len(), c.stateEvict.Len()) return nil }) - require.Equal(0, c.evictList.Len()) - //require.Equal(c.roots[c.latestViewID].cache.Len(), c.evictList.Len()) + require.Equal(0, c.stateEvict.Len()) + //require.Equal(c.roots[c.latestViewID].cache.Len(), c.stateEvict.Len()) c.OnNewBlock(&remote.StateChangeBatch{ DatabaseViewID: id + 1, ChangeBatch: []*remote.StateChange{ @@ -134,8 +134,8 @@ func TestEviction(t *testing.T) { }, }, }) - require.Equal(1, c.evictList.Len()) - require.Equal(c.roots[c.latestViewID].cache.Len(), c.evictList.Len()) + require.Equal(1, c.stateEvict.Len()) + require.Equal(c.roots[c.latestViewID].cache.Len(), c.stateEvict.Len()) _ = db.Update(ctx, func(tx kv.RwTx) error { _ = tx.Put(kv.PlainState, k1[:], []byte{1}) cacheView, _ := c.View(ctx, tx) @@ -147,8 +147,8 @@ func TestEviction(t *testing.T) { _, _ = c.Get([]byte{6}, tx, view.viewID) return nil }) - require.Equal(c.roots[c.latestViewID].cache.Len(), c.evictList.Len()) - require.Equal(cfg.KeysLimit, c.evictList.Len()) + require.Equal(c.roots[c.latestViewID].cache.Len(), c.stateEvict.Len()) + require.Equal(cfg.KeysLimit, c.stateEvict.Len()) } func TestAPI(t *testing.T) { @@ -337,3 +337,27 @@ func TestAPI(t *testing.T) { wg.Wait() } + +func TestCode(t *testing.T) { + require, ctx := require.New(t), context.Background() + c := New(DefaultCoherentConfig) + db := memdb.NewTestDB(t) + k1, k2 := [20]byte{1}, [20]byte{2} + + _ = db.Update(ctx, func(tx kv.RwTx) error { + _ = tx.Put(kv.Code, k1[:], k2[:]) + cacheView, _ := c.View(ctx, tx) + view := cacheView.(*CoherentView) + + v, err := c.GetCode(k1[:], tx, view.viewID) + require.NoError(err) + require.Equal(k2[:], v) + + v, err = c.GetCode(k1[:], tx, view.viewID) + require.NoError(err) + require.Equal(k2[:], v) + + //require.Equal(c.roots[c.latestViewID].cache.Len(), c.stateEvict.Len()) + return nil + }) +} diff --git a/kv/kvcache/dummy.go b/kv/kvcache/dummy.go index a8b39461d..fd6587a78 100644 --- a/kv/kvcache/dummy.go +++ b/kv/kvcache/dummy.go @@ -38,10 +38,14 @@ func (c *DummyCache) Len() int { return 0 } func (c *DummyCache) Get(k []byte, tx kv.Tx, id ViewID) ([]byte, error) { return tx.GetOne(kv.PlainState, k) } +func (c *DummyCache) GetCode(k []byte, tx kv.Tx, id ViewID) ([]byte, error) { + return tx.GetOne(kv.Code, k) +} type DummyView struct { cache *DummyCache tx kv.Tx } -func (c *DummyView) Get(k []byte) ([]byte, error) { return c.cache.Get(k, c.tx, 0) } +func (c *DummyView) Get(k []byte) ([]byte, error) { return c.cache.Get(k, c.tx, 0) } +func (c *DummyView) GetCode(k []byte) ([]byte, error) { return c.cache.GetCode(k, c.tx, 0) }