Code cache (#108)

This commit is contained in:
Alex Sharov 2021-10-11 21:25:22 +07:00 committed by GitHub
parent bbcf1f0cff
commit 6a77e30374
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 251 additions and 81 deletions

View File

@ -20,6 +20,7 @@ import (
"context"
"encoding/binary"
"fmt"
"hash"
"sort"
"sync"
"time"
@ -30,7 +31,9 @@ import (
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"go.uber.org/atomic"
"golang.org/x/crypto/sha3"
)
type Cache interface {
@ -41,6 +44,7 @@ type Cache interface {
}
type CacheView interface {
Get(k []byte) ([]byte, error)
GetCode(k []byte) ([]byte, error)
}
// Coherent works on top of Database Transaction and pair Coherent+ReadTransaction must
@ -78,29 +82,32 @@ type CacheView interface {
// - only OnNewBlock method may do parent.Clone() and apply StateChanges to create coherent view of kv.Tx
// - parent.Clone() can't be caled if parent.isCanonical=false
// - only OnNewBlock method can set view.isCanonical=true
// Rules of filling cache.evictList:
// - changes in Canonical View SHOULD reflect in evictList
// - changes in Non-Canonical View SHOULD NOT reflect in evictList
// Rules of filling cache.stateEvict:
// - changes in Canonical View SHOULD reflect in stateEvict
// - changes in Non-Canonical View SHOULD NOT reflect in stateEvict
type Coherent struct {
hits, miss, timeout *metrics.Counter
keys, evict *metrics.Counter
latestView *CoherentRoot
evictList *List
roots map[ViewID]*CoherentRoot
lock sync.RWMutex
cfg CoherentConfig
latestViewID ViewID
hits, miss, timeout *metrics.Counter
keys, evict *metrics.Counter
codeHits, codeMiss, codeKeys *metrics.Counter
latestStateView *CoherentRoot
roots map[ViewID]*CoherentRoot
stateEvict, codeEvict *ThreadSafeEvictionList
lock sync.RWMutex
cfg CoherentConfig
latestViewID ViewID
hasher hash.Hash
}
type CoherentRoot struct {
cache *btree.BTree
codeCache *btree.BTree
ready chan struct{} // close when ready
readyChanClosed atomic.Bool // protecting `ready` field from double-close (on unwind). Consumers don't need check this field.
// Views marked as `Canonical` if it received onNewBlock message
// we may drop `Non-Canonical` views even if they had fresh keys
// keys added to `Non-Canonical` views SHOULD NOT be added to evictList
// cache.latestView is always `Canonical`
// keys added to `Non-Canonical` views SHOULD NOT be added to stateEvict
// cache.latestStateView is always `Canonical`
isCanonical bool
}
@ -112,7 +119,8 @@ type CoherentView struct {
tx kv.Tx
}
func (c *CoherentView) Get(k []byte) ([]byte, error) { return c.cache.Get(k, c.tx, c.viewID) }
func (c *CoherentView) Get(k []byte) ([]byte, error) { return c.cache.Get(k, c.tx, c.viewID) }
func (c *CoherentView) GetCode(k []byte) ([]byte, error) { return c.cache.GetCode(k, c.tx, c.viewID) }
var _ Cache = (*Coherent)(nil) // compile-time interface check
var _ CacheView = (*CoherentView)(nil) // compile-time interface check
@ -120,31 +128,41 @@ var _ CacheView = (*CoherentView)(nil) // compile-time interface check
const DEGREE = 32
type CoherentConfig struct {
KeepViews uint64 // keep in memory up to this amount of views, evict older
NewBlockWait time.Duration // how long wait
MetricsLabel string
WithStorage bool
KeysLimit int
KeepViews uint64 // keep in memory up to this amount of views, evict older
NewBlockWait time.Duration // how long wait
MetricsLabel string
WithStorage bool
KeysLimit int
CodeKeysLimit int
}
var DefaultCoherentConfig = CoherentConfig{
KeepViews: 50,
NewBlockWait: 50 * time.Millisecond,
KeysLimit: 1_000_000,
MetricsLabel: "default",
WithStorage: true,
KeepViews: 50,
NewBlockWait: 50 * time.Millisecond,
KeysLimit: 1_000_000,
CodeKeysLimit: 10_000,
MetricsLabel: "default",
WithStorage: true,
}
func New(cfg CoherentConfig) *Coherent {
if cfg.KeepViews == 0 {
panic("empty config passed")
}
return &Coherent{
roots: map[ViewID]*CoherentRoot{},
evictList: NewList(),
cfg: cfg,
miss: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_total{result="miss",name="%s"}`, cfg.MetricsLabel)),
hits: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_total{result="hit",name="%s"}`, cfg.MetricsLabel)),
timeout: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_timeout_total{name="%s"}`, cfg.MetricsLabel)),
keys: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_keys_total{name="%s"}`, cfg.MetricsLabel)),
evict: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_list_total{name="%s"}`, cfg.MetricsLabel)),
roots: map[ViewID]*CoherentRoot{},
stateEvict: &ThreadSafeEvictionList{l: NewList()},
codeEvict: &ThreadSafeEvictionList{l: NewList()},
hasher: sha3.NewLegacyKeccak256(),
cfg: cfg,
miss: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_total{result="miss",name="%s"}`, cfg.MetricsLabel)),
hits: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_total{result="hit",name="%s"}`, cfg.MetricsLabel)),
timeout: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_timeout_total{name="%s"}`, cfg.MetricsLabel)),
keys: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_keys_total{name="%s"}`, cfg.MetricsLabel)),
evict: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_list_total{name="%s"}`, cfg.MetricsLabel)),
codeMiss: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_code_total{result="miss",name="%s"}`, cfg.MetricsLabel)),
codeHits: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_code_total{result="hit",name="%s"}`, cfg.MetricsLabel)),
codeKeys: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_code_keys_total{name="%s"}`, cfg.MetricsLabel)),
}
}
@ -156,14 +174,11 @@ func (c *Coherent) selectOrCreateRoot(viewID ViewID) *CoherentRoot {
if ok {
return r
}
r = &CoherentRoot{ready: make(chan struct{})}
if prevView, ok := c.roots[viewID-1]; ok {
//log.Info("advance: clone", "from", viewID-1, "to", viewID)
r.cache = prevView.cache.Clone()
r.isCanonical = prevView.isCanonical
} else {
//log.Info("advance: new", "to", viewID)
r.cache = btree.New(DEGREE)
r = &CoherentRoot{
ready: make(chan struct{}),
cache: btree.New(DEGREE),
codeCache: btree.New(DEGREE),
}
c.roots[viewID] = r
return r
@ -176,17 +191,25 @@ func (c *Coherent) advanceRoot(viewID ViewID) (r *CoherentRoot) {
r = &CoherentRoot{ready: make(chan struct{})}
c.roots[viewID] = r
}
if prevView, ok := c.roots[viewID-1]; ok && prevView.isCanonical {
//log.Info("advance: clone", "from", viewID-1, "to", viewID)
r.cache = prevView.cache.Clone()
r.codeCache = prevView.codeCache.Clone()
} else {
c.evictList.Init()
c.stateEvict.Init()
c.codeEvict.Init()
if r.cache == nil {
//log.Info("advance: new", "to", viewID)
r.cache = btree.New(DEGREE)
r.codeCache = btree.New(DEGREE)
} else {
r.cache.Ascend(func(i btree.Item) bool {
c.evictList.PushFront(i.(*Element))
c.stateEvict.PushFront(i.(*Element))
return true
})
r.codeCache.Ascend(func(i btree.Item) bool {
c.codeEvict.PushFront(i.(*Element))
return true
})
}
@ -195,10 +218,11 @@ func (c *Coherent) advanceRoot(viewID ViewID) (r *CoherentRoot) {
c.evictRoots()
c.latestViewID = viewID
c.latestView = r
c.latestStateView = r
c.keys.Set(uint64(c.latestView.cache.Len()))
c.evict.Set(uint64(c.evictList.Len()))
c.keys.Set(uint64(c.latestStateView.cache.Len()))
c.codeKeys.Set(uint64(c.latestStateView.codeCache.Len()))
c.evict.Set(uint64(c.stateEvict.Len()))
return r
}
@ -210,16 +234,31 @@ func (c *Coherent) OnNewBlock(stateChanges *remote.StateChangeBatch) {
for _, sc := range stateChanges.ChangeBatch {
for i := range sc.Changes {
switch sc.Changes[i].Action {
case remote.Action_UPSERT, remote.Action_UPSERT_CODE:
case remote.Action_UPSERT:
addr := gointerfaces.ConvertH160toAddress(sc.Changes[i].Address)
v := sc.Changes[i].Data
//fmt.Printf("set: %x,%x\n", addr, v)
c.add(addr[:], v, r, id)
case remote.Action_UPSERT_CODE:
addr := gointerfaces.ConvertH160toAddress(sc.Changes[i].Address)
v := sc.Changes[i].Data
c.add(addr[:], v, r, id)
c.hasher.Reset()
c.hasher.Write(sc.Changes[i].Code)
k := make([]byte, 32)
c.hasher.Sum(k)
c.addCode(k, sc.Changes[i].Code, r, id)
case remote.Action_DELETE:
addr := gointerfaces.ConvertH160toAddress(sc.Changes[i].Address)
c.add(addr[:], nil, r, id)
case remote.Action_CODE, remote.Action_STORAGE:
//skip
case remote.Action_STORAGE:
//skip, will check later
case remote.Action_CODE:
c.hasher.Reset()
c.hasher.Write(sc.Changes[i].Code)
k := make([]byte, 32)
c.hasher.Sum(k)
c.addCode(k, sc.Changes[i].Code, r, id)
default:
panic("not implemented yet")
}
@ -268,33 +307,38 @@ func (c *Coherent) View(ctx context.Context, tx kv.Tx) (CacheView, error) {
return &CoherentView{viewID: ViewID(tx.ViewID()), tx: tx, cache: c}, nil
}
func (c *Coherent) getFromCache(k []byte, id ViewID) (btree.Item, *CoherentRoot, bool, error) {
func (c *Coherent) getFromCache(k []byte, id ViewID, code bool) (btree.Item, *CoherentRoot, error) {
c.lock.RLock()
defer c.lock.RUnlock()
isLatest := c.latestViewID == id
//TODO: create thread-safe list wrapper
r, ok := c.roots[id]
if !ok {
return nil, r, isLatest, fmt.Errorf("too old ViewID: %d, latestViewID=%d", id, c.latestViewID)
return nil, r, fmt.Errorf("too old ViewID: %d, latestViewID=%d", id, c.latestViewID)
}
isLatest := c.latestViewID == id
var it btree.Item
if code {
it = r.codeCache.Get(&Element{K: k})
} else {
it = r.cache.Get(&Element{K: k})
}
if it != nil && isLatest {
c.stateEvict.MoveToFront(it.(*Element))
}
it := r.cache.Get(&Element{K: k})
return it, r, isLatest, nil
return it, r, nil
}
func (c *Coherent) Get(k []byte, tx kv.Tx, id ViewID) ([]byte, error) {
it, r, isLatest, err := c.getFromCache(k, id)
it, r, err := c.getFromCache(k, id, false)
if err != nil {
return nil, err
}
if it != nil {
c.hits.Inc()
if isLatest {
c.evictList.MoveToFront(it.(*Element))
}
//fmt.Printf("from cache: %#x,%x\n", k, it.(*Element).V)
return it.(*Element).V, nil
}
@ -312,13 +356,50 @@ func (c *Coherent) Get(k []byte, tx kv.Tx, id ViewID) ([]byte, error) {
return v, nil
}
func (c *Coherent) GetCode(k []byte, tx kv.Tx, id ViewID) ([]byte, error) {
it, r, err := c.getFromCache(k, id, true)
if err != nil {
return nil, err
}
if it != nil {
c.codeHits.Inc()
//fmt.Printf("from cache: %#x,%x\n", k, it.(*Element).V)
return it.(*Element).V, nil
}
//{
// r, ok := c.roots[id]
// if ok {
// fmt.Printf("miss: %x,%d,%d\n", k, id, r.codeCache.Len())
// }
//}
c.codeMiss.Inc()
v, err := tx.GetOne(kv.Code, k)
if err != nil {
return nil, err
}
//fmt.Printf("from db: %#x,%x\n", k, v)
c.lock.Lock()
defer c.lock.Unlock()
v = c.addCode(common.Copy(k), common.Copy(v), r, id).V
return v, nil
}
func (c *Coherent) removeOldest(r *CoherentRoot) {
e := c.evictList.Back()
e := c.stateEvict.Oldest()
if e != nil {
c.evictList.Remove(e)
c.stateEvict.Remove(e)
r.cache.Delete(e)
}
}
func (c *Coherent) removeOldestCode(r *CoherentRoot) {
e := c.codeEvict.Oldest()
if e != nil {
c.codeEvict.Remove(e)
r.codeCache.Delete(e)
}
}
func (c *Coherent) add(k, v []byte, r *CoherentRoot, id ViewID) *Element {
it := &Element{K: k, V: v}
replaced := r.cache.ReplaceOrInsert(it)
@ -327,16 +408,34 @@ func (c *Coherent) add(k, v []byte, r *CoherentRoot, id ViewID) *Element {
return it
}
if replaced != nil {
c.evictList.Remove(replaced.(*Element))
c.stateEvict.Remove(replaced.(*Element))
}
c.evictList.PushFront(it)
evict := c.evictList.Len() > c.cfg.KeysLimit
c.stateEvict.PushFront(it)
evict := c.stateEvict.Len() > c.cfg.KeysLimit
// Verify size not exceeded
if evict {
c.removeOldest(r)
}
return it
}
func (c *Coherent) addCode(k, v []byte, r *CoherentRoot, id ViewID) *Element {
it := &Element{K: k, V: v}
replaced := r.codeCache.ReplaceOrInsert(it)
if c.latestViewID != id {
//fmt.Printf("add to non-last viewID: %d<%d\n", c.latestViewID, id)
return it
}
if replaced != nil {
c.codeEvict.Remove(replaced.(*Element))
}
c.codeEvict.PushFront(it)
evict := c.codeEvict.Len() > c.cfg.CodeKeysLimit
// Verify size not exceeded
if evict {
c.removeOldestCode(r)
}
return it
}
type Stat struct {
BlockNum uint64
@ -407,6 +506,7 @@ func (c *Coherent) evictRoots() {
return
}
to := c.latestViewID - ViewID(c.cfg.KeepViews)
fmt.Printf("evict: %d,%d,%d\n", c.cfg.KeepViews, c.latestViewID, to)
//fmt.Printf("collecting: %d\n", to)
var toDel []ViewID
for txId := range c.roots {
@ -415,7 +515,7 @@ func (c *Coherent) evictRoots() {
}
toDel = append(toDel, txId)
}
//log.Info("forget old roots", "list", fmt.Sprintf("%d", toDel))
log.Info("forget old roots", "list", fmt.Sprintf("%d", toDel))
for _, txId := range toDel {
delete(c.roots, txId)
}
@ -423,10 +523,10 @@ func (c *Coherent) evictRoots() {
func (c *Coherent) Len() int {
c.lock.RLock()
defer c.lock.RUnlock()
if c.latestView == nil {
if c.latestStateView == nil {
return 0
}
return c.latestView.cache.Len() //todo: is it same with cache.len()?
return c.latestStateView.cache.Len() //todo: is it same with cache.len()?
}
// Element is an element of a linked list.
@ -449,6 +549,48 @@ func (e *Element) Less(than btree.Item) bool {
return bytes.Compare(e.K, than.(*Element).K) < 0
}
type ThreadSafeEvictionList struct {
l *List
lock sync.RWMutex
}
func (l *ThreadSafeEvictionList) Init() {
l.lock.Lock()
l.l.Init()
l.lock.Unlock()
}
func (l *ThreadSafeEvictionList) PushFront(e *Element) {
l.lock.Lock()
l.l.PushFront(e)
l.lock.Unlock()
}
func (l *ThreadSafeEvictionList) MoveToFront(e *Element) {
l.lock.Lock()
l.l.MoveToFront(e)
l.lock.Unlock()
}
func (l *ThreadSafeEvictionList) Remove(e *Element) {
l.lock.Lock()
l.l.Remove(e)
l.lock.Unlock()
}
func (l *ThreadSafeEvictionList) Oldest() *Element {
l.lock.Lock()
e := l.l.Back()
l.lock.Unlock()
return e
}
func (l *ThreadSafeEvictionList) Len() int {
l.lock.RLock()
length := l.l.Len()
l.lock.RUnlock()
return length
}
// ========= copypaste of List implementation from stdlib ========
// Next returns the next list element or nil.

View File

@ -42,7 +42,7 @@ func TestEvictionInUnexpectedOrder(t *testing.T) {
require.False(c.roots[2].isCanonical)
c.add([]byte{1}, nil, c.roots[2], 2)
require.Equal(0, c.evictList.Len())
require.Equal(0, c.stateEvict.Len())
c.advanceRoot(2)
require.Equal(1, len(c.roots))
@ -50,7 +50,7 @@ func TestEvictionInUnexpectedOrder(t *testing.T) {
require.True(c.roots[2].isCanonical)
c.add([]byte{1}, nil, c.roots[2], 2)
require.Equal(1, c.evictList.Len())
require.Equal(1, c.stateEvict.Len())
c.selectOrCreateRoot(5)
require.Equal(2, len(c.roots))
@ -58,9 +58,9 @@ func TestEvictionInUnexpectedOrder(t *testing.T) {
require.False(c.roots[5].isCanonical)
c.add([]byte{2}, nil, c.roots[5], 5) // not added to evict list
require.Equal(1, c.evictList.Len())
require.Equal(1, c.stateEvict.Len())
c.add([]byte{2}, nil, c.roots[2], 2) // added to evict list, because it's latest view
require.Equal(2, c.evictList.Len())
require.Equal(2, c.stateEvict.Len())
c.selectOrCreateRoot(6)
require.Equal(3, len(c.roots))
@ -93,8 +93,8 @@ func TestEvictionInUnexpectedOrder(t *testing.T) {
require.True(c.roots[100].isCanonical)
//c.add([]byte{1}, nil, c.roots[2], 2)
require.Equal(0, c.latestView.cache.Len())
require.Equal(0, c.evictList.Len())
require.Equal(0, c.latestStateView.cache.Len())
require.Equal(0, c.stateEvict.Len())
}
func TestEviction(t *testing.T) {
@ -116,11 +116,11 @@ func TestEviction(t *testing.T) {
_, _ = c.Get([]byte{1}, tx, view.viewID)
_, _ = c.Get([]byte{2}, tx, view.viewID)
_, _ = c.Get([]byte{3}, tx, view.viewID)
//require.Equal(c.roots[c.latestViewID].cache.Len(), c.evictList.Len())
//require.Equal(c.roots[c.latestViewID].cache.Len(), c.stateEvict.Len())
return nil
})
require.Equal(0, c.evictList.Len())
//require.Equal(c.roots[c.latestViewID].cache.Len(), c.evictList.Len())
require.Equal(0, c.stateEvict.Len())
//require.Equal(c.roots[c.latestViewID].cache.Len(), c.stateEvict.Len())
c.OnNewBlock(&remote.StateChangeBatch{
DatabaseViewID: id + 1,
ChangeBatch: []*remote.StateChange{
@ -134,8 +134,8 @@ func TestEviction(t *testing.T) {
},
},
})
require.Equal(1, c.evictList.Len())
require.Equal(c.roots[c.latestViewID].cache.Len(), c.evictList.Len())
require.Equal(1, c.stateEvict.Len())
require.Equal(c.roots[c.latestViewID].cache.Len(), c.stateEvict.Len())
_ = db.Update(ctx, func(tx kv.RwTx) error {
_ = tx.Put(kv.PlainState, k1[:], []byte{1})
cacheView, _ := c.View(ctx, tx)
@ -147,8 +147,8 @@ func TestEviction(t *testing.T) {
_, _ = c.Get([]byte{6}, tx, view.viewID)
return nil
})
require.Equal(c.roots[c.latestViewID].cache.Len(), c.evictList.Len())
require.Equal(cfg.KeysLimit, c.evictList.Len())
require.Equal(c.roots[c.latestViewID].cache.Len(), c.stateEvict.Len())
require.Equal(cfg.KeysLimit, c.stateEvict.Len())
}
func TestAPI(t *testing.T) {
@ -337,3 +337,27 @@ func TestAPI(t *testing.T) {
wg.Wait()
}
func TestCode(t *testing.T) {
require, ctx := require.New(t), context.Background()
c := New(DefaultCoherentConfig)
db := memdb.NewTestDB(t)
k1, k2 := [20]byte{1}, [20]byte{2}
_ = db.Update(ctx, func(tx kv.RwTx) error {
_ = tx.Put(kv.Code, k1[:], k2[:])
cacheView, _ := c.View(ctx, tx)
view := cacheView.(*CoherentView)
v, err := c.GetCode(k1[:], tx, view.viewID)
require.NoError(err)
require.Equal(k2[:], v)
v, err = c.GetCode(k1[:], tx, view.viewID)
require.NoError(err)
require.Equal(k2[:], v)
//require.Equal(c.roots[c.latestViewID].cache.Len(), c.stateEvict.Len())
return nil
})
}

View File

@ -38,10 +38,14 @@ func (c *DummyCache) Len() int { return 0 }
func (c *DummyCache) Get(k []byte, tx kv.Tx, id ViewID) ([]byte, error) {
return tx.GetOne(kv.PlainState, k)
}
func (c *DummyCache) GetCode(k []byte, tx kv.Tx, id ViewID) ([]byte, error) {
return tx.GetOne(kv.Code, k)
}
type DummyView struct {
cache *DummyCache
tx kv.Tx
}
func (c *DummyView) Get(k []byte) ([]byte, error) { return c.cache.Get(k, c.tx, 0) }
func (c *DummyView) Get(k []byte) ([]byte, error) { return c.cache.Get(k, c.tx, 0) }
func (c *DummyView) GetCode(k []byte) ([]byte, error) { return c.cache.GetCode(k, c.tx, 0) }