From 6a7eeccad7739315f0691626325cecb112226360 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Mon, 20 Sep 2021 12:44:29 +0700 Subject: [PATCH] Pool: discard reasons, LRU (#80) --- gointerfaces/remote/kv_grpc.pb.go | 13 +- kv/kvcache/cache.go | 575 +++++++---- kv/kvcache/cache_test.go | 124 ++- kv/kvcache/dummy.go | 21 +- kv/mdbx/util.go | 8 + txpool/fetch.go | 10 +- txpool/grpc_server.go | 22 +- txpool/mocks_test.go | 2 +- txpool/packets_test.go | 2 +- txpool/pool.go | 1522 +++++++++++++++-------------- txpool/pool_fuzz_test.go | 10 +- txpool/pool_test.go | 39 + txpool/types.go | 13 +- 13 files changed, 1363 insertions(+), 998 deletions(-) diff --git a/gointerfaces/remote/kv_grpc.pb.go b/gointerfaces/remote/kv_grpc.pb.go index 2c7630aac..6405674ba 100644 --- a/gointerfaces/remote/kv_grpc.pb.go +++ b/gointerfaces/remote/kv_grpc.pb.go @@ -3,12 +3,13 @@ package remote import ( - context "context" - types "github.com/ledgerwatch/erigon-lib/gointerfaces/types" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" - emptypb "google.golang.org/protobuf/types/known/emptypb" + "context" + + "github.com/ledgerwatch/erigon-lib/gointerfaces/types" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" ) // This is a compile-time assertion to ensure that this generated file diff --git a/kv/kvcache/cache.go b/kv/kvcache/cache.go index 674f9af5e..7ba94f836 100644 --- a/kv/kvcache/cache.go +++ b/kv/kvcache/cache.go @@ -22,7 +22,6 @@ import ( "fmt" "sort" "sync" - goatomic "sync/atomic" "time" "github.com/VictoriaMetrics/metrics" @@ -31,19 +30,16 @@ import ( "github.com/ledgerwatch/erigon-lib/gointerfaces" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/log/v3" "go.uber.org/atomic" ) type Cache interface { // View - returns CacheView consistent with givent kv.Tx - View(ctx context.Context, tx kv.Tx) (CacheView, error) + View(ctx context.Context, tx kv.Tx) (ViewID, error) OnNewBlock(sc *remote.StateChangeBatch) - Evict() int + //Evict() int Len() int -} -type CacheView interface { - Get(k []byte, tx kv.Tx) ([]byte, error) + Get(k []byte, tx kv.Tx, id ViewID) ([]byte, error) } // Coherent works on top of Database Transaction and pair Coherent+ReadTransaction must @@ -68,36 +64,46 @@ type CacheView interface { // it until either cache with the given identifier appears, or timeout (indicating that the cache update // mechanism is broken and cache is likely invalidated). // -// Tech details: -// - If found in cache - return value without copy (reader can rely on fact that data are immutable until end of db transaction) -// - Otherwise just read from db (no requests deduplication for now - preliminary optimization). -// + // Pair.Value == nil - is a marker of absense key in db +// Coherent +// High-level guaranties: +// - Keys/Values returned by cache are valid/immutable until end of db transaction +// - CacheView is always coherent with given db transaction - +// +// Rules of set view.isCanonical value: +// - method View can't parent.Clone() - because parent view is not coherent with current kv.Tx +// - only OnNewBlock method may do parent.Clone() and apply StateChanges to create coherent view of kv.Tx +// - parent.Clone() can't be caled if parent.isCanonical=false +// - only OnNewBlock method can set view.isCanonical=true +// Rules of filling cache.evictList: +// - changes in Canonical View SHOULD reflect in evictList +// - changes in Non-Canonical View SHOULD NOT reflect in evictList type Coherent struct { - hits, miss, timeout, keys *metrics.Counter - evict *metrics.Summary - roots map[uint64]*CoherentView - rootsLock sync.RWMutex - cfg CoherentCacheConfig + hits, miss, timeout *metrics.Counter + keys, keys2 *metrics.Counter + latestViewID ViewID + latestView *CoherentView + evictList *List + roots map[ViewID]*CoherentView + lock sync.RWMutex + cfg CoherentCacheConfig } type CoherentView struct { - hits, miss *metrics.Counter cache *btree.BTree - lock sync.RWMutex - id atomic.Uint64 ready chan struct{} // close when ready readyChanClosed atomic.Bool // protecting `ready` field from double-close (on unwind). Consumers don't need check this field. + + // Views marked as `Canonical` if it received onNewBlock message + // we may drop `Non-Canonical` views even if they had fresh keys + // keys added to `Non-Canonical` views SHOULD NOT be added to evictList + // cache.latestView is always `Canonical` + isCanonical bool } -var _ Cache = (*Coherent)(nil) // compile-time interface check -var _ CacheView = (*CoherentView)(nil) // compile-time interface check -type Pair struct { - K, V []byte - t uint64 //TODO: can be uint32 if remember first txID and use it as base zero. because it's monotonic. -} - -func (p *Pair) Less(than btree.Item) bool { return bytes.Compare(p.K, than.(*Pair).K) < 0 } +var _ Cache = (*Coherent)(nil) // compile-time interface check +//var _ CacheView = (*CoherentView)(nil) // compile-time interface check type CoherentCacheConfig struct { KeepViews uint64 // keep in memory up to this amount of views, evict older @@ -110,63 +116,83 @@ type CoherentCacheConfig struct { var DefaultCoherentCacheConfig = CoherentCacheConfig{ KeepViews: 50, NewBlockWait: 50 * time.Millisecond, - KeysLimit: 1_000_000, + KeysLimit: 400_000, MetricsLabel: "default", - WithStorage: false, + WithStorage: true, } func New(cfg CoherentCacheConfig) *Coherent { - return &Coherent{roots: map[uint64]*CoherentView{}, cfg: cfg, - miss: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_total{result="miss",name="%s"}`, cfg.MetricsLabel)), - hits: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_total{result="hit",name="%s"}`, cfg.MetricsLabel)), - timeout: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_timeout_total{name="%s"}`, cfg.MetricsLabel)), - keys: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_keys_total{name="%s"}`, cfg.MetricsLabel)), - evict: metrics.GetOrCreateSummary(fmt.Sprintf(`cache_evict{name="%s"}`, cfg.MetricsLabel)), + return &Coherent{ + roots: map[ViewID]*CoherentView{}, + evictList: NewList(), + cfg: cfg, + miss: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_total{result="miss",name="%s"}`, cfg.MetricsLabel)), + hits: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_total{result="hit",name="%s"}`, cfg.MetricsLabel)), + timeout: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_timeout_total{name="%s"}`, cfg.MetricsLabel)), + keys: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_keys_total{name="%s"}`, cfg.MetricsLabel)), + keys2: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_list_total{name="%s"}`, cfg.MetricsLabel)), } } // selectOrCreateRoot - used for usual getting root -func (c *Coherent) selectOrCreateRoot(txID uint64) *CoherentView { - c.rootsLock.Lock() - defer c.rootsLock.Unlock() - r, ok := c.roots[txID] +func (c *Coherent) selectOrCreateRoot(viewID ViewID) *CoherentView { + c.lock.Lock() + defer c.lock.Unlock() + r, ok := c.roots[viewID] if ok { return r } - r = &CoherentView{ready: make(chan struct{}), hits: c.hits, miss: c.miss} - r.id.Store(txID) - c.roots[txID] = r + r = &CoherentView{ready: make(chan struct{})} + if prevView, ok := c.roots[viewID-1]; ok { + //log.Info("advance: clone", "from", viewID-1, "to", viewID) + r.cache = prevView.cache.Clone() + r.isCanonical = prevView.isCanonical + } else { + //log.Info("advance: new", "to", viewID) + r.cache = btree.New(32) + } + c.roots[viewID] = r return r } // advanceRoot - used for advancing root onNewBlock -func (c *Coherent) advanceRoot(viewID uint64) (r *CoherentView) { - c.rootsLock.Lock() - defer c.rootsLock.Unlock() +func (c *Coherent) advanceRoot(viewID ViewID) (r *CoherentView) { r, rootExists := c.roots[viewID] if !rootExists { - r = &CoherentView{ready: make(chan struct{}), hits: c.hits, miss: c.miss} - r.id.Store(viewID) + r = &CoherentView{ready: make(chan struct{})} c.roots[viewID] = r } - - //TODO: need check if c.latest hash is still canonical. If not - can't clone from it - if prevView, ok := c.roots[viewID-1]; ok { + if prevView, ok := c.roots[viewID-1]; ok && prevView.isCanonical { //log.Info("advance: clone", "from", viewID-1, "to", viewID) - r.cache = prevView.Clone() + r.cache = prevView.cache.Clone() } else { + c.evictList.Init() if r.cache == nil { //log.Info("advance: new", "to", viewID) r.cache = btree.New(32) + } else { + r.cache.Ascend(func(i btree.Item) bool { + c.evictList.PushFront(i.(*Element)) + return true + }) } } + r.isCanonical = true + + c.evictRoots() + c.latestViewID = viewID + c.latestView = r + + c.keys.Set(uint64(c.latestView.cache.Len())) + c.keys2.Set(uint64(c.evictList.Len())) return r } func (c *Coherent) OnNewBlock(stateChanges *remote.StateChangeBatch) { - r := c.advanceRoot(stateChanges.DatabaseViewID) - r.lock.Lock() - defer r.lock.Unlock() + c.lock.Lock() + defer c.lock.Unlock() + id := ViewID(stateChanges.DatabaseViewID) + r := c.advanceRoot(id) for _, sc := range stateChanges.ChangeBatch { for i := range sc.Changes { switch sc.Changes[i].Action { @@ -174,10 +200,10 @@ func (c *Coherent) OnNewBlock(stateChanges *remote.StateChangeBatch) { addr := gointerfaces.ConvertH160toAddress(sc.Changes[i].Address) v := sc.Changes[i].Data //fmt.Printf("set: %x,%x\n", addr, v) - r.cache.ReplaceOrInsert(&Pair{K: addr[:], V: v, t: sc.BlockHeight}) + c.add(addr[:], v, r, id) case remote.Action_DELETE: addr := gointerfaces.ConvertH160toAddress(sc.Changes[i].Address) - r.cache.ReplaceOrInsert(&Pair{K: addr[:], V: nil, t: sc.BlockHeight}) + c.add(addr[:], nil, r, id) case remote.Action_CODE, remote.Action_STORAGE: //skip default: @@ -191,11 +217,12 @@ func (c *Coherent) OnNewBlock(stateChanges *remote.StateChangeBatch) { copy(k, addr[:]) binary.BigEndian.PutUint64(k[20:], sc.Changes[i].Incarnation) copy(k[20+8:], loc[:]) - r.cache.ReplaceOrInsert(&Pair{K: k, V: change.Data, t: sc.BlockHeight}) + c.add(k, change.Data, r, id) } } } } + switched := r.readyChanClosed.CAS(false, true) if switched { close(r.ready) //broadcast @@ -203,48 +230,47 @@ func (c *Coherent) OnNewBlock(stateChanges *remote.StateChangeBatch) { //log.Info("on new block handled", "viewID", stateChanges.DatabaseViewID) } -func (c *Coherent) View(ctx context.Context, tx kv.Tx) (CacheView, error) { - r := c.selectOrCreateRoot(tx.ViewID()) +type ViewID uint64 + +func (c *Coherent) View(ctx context.Context, tx kv.Tx) (ViewID, error) { + id := ViewID(tx.ViewID()) + r := c.selectOrCreateRoot(id) select { // fast non-blocking path case <-r.ready: - //fmt.Printf("recv broadcast: %d,%d\n", r.id.Load(), tx.ViewID()) - return r, nil + //fmt.Printf("recv broadcast: %d\n", id) + return id, nil default: } select { // slow blocking path case <-r.ready: - //fmt.Printf("recv broadcast2: %d,%d\n", r.id.Load(), tx.ViewID()) + //fmt.Printf("recv broadcast2: %d\n", tx.ViewID()) case <-ctx.Done(): - return nil, fmt.Errorf("kvcache rootNum=%x, %w", tx.ViewID(), ctx.Err()) + return 0, fmt.Errorf("kvcache rootNum=%x, %w", tx.ViewID(), ctx.Err()) case <-time.After(c.cfg.NewBlockWait): //TODO: switch to timer to save resources c.timeout.Inc() - r.lock.Lock() - //log.Info("timeout", "mem_id", r.id.Load(), "db_id", tx.ViewID(), "has_btree", r.cache != nil) - if r.cache == nil { - //parent := c.selectOrCreateRoot(tx.ViewID() - 1) - //if parent.cache != nil { - // r.cache = parent.Clone() - //} else { - // r.cache = btree.New(32) - //} - r.cache = btree.New(32) - } - r.lock.Unlock() + //log.Info("timeout", "db_id", id, "has_btree", r.cache != nil) } - return r, nil + return ViewID(tx.ViewID()), nil } -func (c *CoherentView) Get(k []byte, tx kv.Tx) ([]byte, error) { +func (c *Coherent) Get(k []byte, tx kv.Tx, id ViewID) ([]byte, error) { c.lock.RLock() - it := c.cache.Get(&Pair{K: k}) + isLatest := c.latestViewID == id + r, ok := c.roots[id] + if !ok { + return nil, fmt.Errorf("too old ViewID: %d, latestViewID=%d", id, c.latestViewID) + } + it := r.cache.Get(&Element{K: k}) c.lock.RUnlock() if it != nil { c.hits.Inc() - goatomic.StoreUint64(&it.(*Pair).t, c.id.Load()) - //fmt.Printf("from cache %x: %#x,%x\n", c.id.Load(), k, it.(*Pair).V) - return it.(*Pair).V, nil + if isLatest { + c.evictList.MoveToFront(it.(*Element)) + } + //fmt.Printf("from cache: %#x,%x\n", k, it.(*Element).V) + return it.(*Element).V, nil } c.miss.Inc() @@ -252,27 +278,38 @@ func (c *CoherentView) Get(k []byte, tx kv.Tx) ([]byte, error) { if err != nil { return nil, err } - //fmt.Printf("from db %x: %#x,%x\n", c.id.Load(), k, v) + //fmt.Printf("from db: %#x,%x\n", k, v) - it = &Pair{K: k, V: common.Copy(v), t: c.id.Load()} c.lock.Lock() - c.cache.ReplaceOrInsert(it) + v = c.add(common.Copy(k), common.Copy(v), r, id).V c.lock.Unlock() - return it.(*Pair).V, nil + return v, nil } -func (c *CoherentView) Len() int { - c.lock.RLock() - defer c.lock.RUnlock() - return c.cache.Len() -} -func (c *CoherentView) Clone() *btree.BTree { - c.lock.Lock() - defer c.lock.Unlock() - if c.cache == nil { - c.cache = btree.New(32) - return btree.New(32) // return independent tree because nothing to share + +func (c *Coherent) removeOldest(r *CoherentView) { + e := c.evictList.Back() + if e != nil { + c.evictList.Remove(e) + r.cache.Delete(e) } - return c.cache.Clone() +} +func (c *Coherent) add(k, v []byte, r *CoherentView, id ViewID) *Element { + it := &Element{K: k, V: v} + replaced := r.cache.ReplaceOrInsert(it) + if c.latestViewID != id { + //fmt.Printf("add to non-last viewID: %d<%d\n", c.latestViewID, id) + return it + } + if replaced != nil { + c.evictList.Remove(replaced.(*Element)) + } + c.evictList.PushFront(it) + evict := c.evictList.Len() > c.cfg.KeysLimit + // Verify size not exceeded + if evict { + c.removeOldest(r) + } + return it } type Stat struct { @@ -287,51 +324,24 @@ func DebugStats(cache Cache) []Stat { if !ok { return res } - casted.rootsLock.RLock() - defer casted.rootsLock.RUnlock() + casted.lock.RLock() + defer casted.lock.RUnlock() for root, r := range casted.roots { res = append(res, Stat{ - BlockNum: root, - Lenght: r.Len(), + BlockNum: uint64(root), + Lenght: r.cache.Len(), }) } sort.Slice(res, func(i, j int) bool { return res[i].BlockNum < res[j].BlockNum }) return res } -func DebugAges(cache Cache) []Stat { - res := []Stat{} - casted, ok := cache.(*Coherent) - if !ok { - return res - } - casted.rootsLock.RLock() - defer casted.rootsLock.RUnlock() - _, latestView := cache.(*Coherent).lastRoot() - latestView.lock.RLock() - defer latestView.lock.RUnlock() - counters := map[uint64]int{} - latestView.cache.Ascend(func(it btree.Item) bool { - age := goatomic.LoadUint64(&it.(*Pair).t) - _, ok := counters[age] - if !ok { - counters[age] = 0 - } - counters[age]++ - return true - }) - for i, j := range counters { - res = append(res, Stat{BlockNum: i, Lenght: j}) - } - sort.Slice(res, func(i, j int) bool { return res[i].BlockNum < res[j].BlockNum }) - return res -} func AssertCheckValues(ctx context.Context, tx kv.Tx, cache Cache) (int, error) { defer func(t time.Time) { fmt.Printf("AssertCheckValues:327: %s\n", time.Since(t)) }(time.Now()) - c, err := cache.View(ctx, tx) + viewID, err := cache.View(ctx, tx) if err != nil { return 0, err } - casted, ok := c.(*CoherentView) + casted, ok := cache.(*Coherent) if !ok { return 0, nil } @@ -339,15 +349,19 @@ func AssertCheckValues(ctx context.Context, tx kv.Tx, cache Cache) (int, error) casted.lock.RLock() defer casted.lock.RUnlock() //log.Info("AssertCheckValues start", "db_id", tx.ViewID(), "mem_id", casted.id.Load(), "len", casted.cache.Len()) - casted.cache.Ascend(func(i btree.Item) bool { - k, v := i.(*Pair).K, i.(*Pair).V + view, ok := casted.roots[viewID] + if !ok { + return 0, nil + } + view.cache.Ascend(func(i btree.Item) bool { + k, v := i.(*Element).K, i.(*Element).V var dbV []byte dbV, err = tx.GetOne(kv.PlainState, k) if err != nil { return false } if !bytes.Equal(dbV, v) { - err = fmt.Errorf("key: %x, has different values: %x != %x, viewID: %d", k, v, dbV, casted.id.Load()) + err = fmt.Errorf("key: %x, has different values: %x != %x", k, v, dbV) return false } checked++ @@ -355,21 +369,16 @@ func AssertCheckValues(ctx context.Context, tx kv.Tx, cache Cache) (int, error) }) return checked, err } - -func (c *Coherent) lastRoot() (latestTxId uint64, view *CoherentView) { - c.rootsLock.RLock() - defer c.rootsLock.RUnlock() - for txID := range c.roots { // max - if txID > latestTxId { - latestTxId = txID - } +func (c *Coherent) evictRoots() { + if c.latestViewID <= ViewID(c.cfg.KeepViews) { + return } - return latestTxId, c.roots[latestTxId] -} -func (c *Coherent) evictRoots(to uint64) { - c.rootsLock.Lock() - defer c.rootsLock.Unlock() - var toDel []uint64 + if len(c.roots) < int(c.cfg.KeepViews) { + return + } + to := c.latestViewID - ViewID(c.cfg.KeepViews) + //fmt.Printf("collecting: %d\n", to) + var toDel []ViewID for txId := range c.roots { if txId > to { continue @@ -382,78 +391,222 @@ func (c *Coherent) evictRoots(to uint64) { } } func (c *Coherent) Len() int { - _, lastView := c.lastRoot() - return lastView.Len() -} - -func (c *Coherent) Evict() int { - defer c.evict.UpdateDuration(time.Now()) - latestBlockNum, lastView := c.lastRoot() - c.evictRoots(latestBlockNum - 10) - if lastView == nil { + c.lock.RLock() + defer c.lock.RUnlock() + if c.latestView == nil { return 0 } - keysAmount := lastView.Len() - c.keys.Set(uint64(keysAmount)) - lastView.evictOld(c.cfg.KeepViews, c.cfg.KeysLimit) - //lastView.evictNew2Random(c.cfg.KeysLimit) - return lastView.Len() + return c.latestView.cache.Len() //todo: is it same with cache.len()? } -//nolint -func (c *CoherentView) evictOld(dropOlder uint64, keysLimit int) { - if c.Len() < keysLimit { +// Element is an element of a linked list. +type Element struct { + // Next and previous pointers in the doubly-linked list of elements. + // To simplify the implementation, internally a list l is implemented + // as a ring, such that &l.root is both the next element of the last + // list element (l.Back()) and the previous element of the first list + // element (l.Front()). + next, prev *Element + + // The list to which this element belongs. + list *List + + // The value stored with this element. + K, V []byte +} + +func (p *Element) Less(than btree.Item) bool { return bytes.Compare(p.K, than.(*Element).K) < 0 } + +// ========= copypaste of List implementation from stdlib ======== + +// Next returns the next list element or nil. +func (e *Element) Next() *Element { + if p := e.next; e.list != nil && p != &e.list.root { + return p + } + return nil +} + +// Prev returns the previous list element or nil. +func (e *Element) Prev() *Element { + if p := e.prev; e.list != nil && p != &e.list.root { + return p + } + return nil +} + +// List represents a doubly linked list. +// The zero value for List is an empty list ready to use. +type List struct { + root Element // sentinel list element, only &root, root.prev, and root.next are used + len int // current list length excluding (this) sentinel element +} + +// Init initializes or clears list l. +func (l *List) Init() *List { + l.root.next = &l.root + l.root.prev = &l.root + l.len = 0 + return l +} + +// New returns an initialized list. +func NewList() *List { return new(List).Init() } + +// Len returns the number of elements of list l. +// The complexity is O(1). +func (l *List) Len() int { return l.len } + +// Front returns the first element of list l or nil if the list is empty. +func (l *List) Front() *Element { + if l.len == 0 { + return nil + } + return l.root.next +} + +// Back returns the last element of list l or nil if the list is empty. +func (l *List) Back() *Element { + if l.len == 0 { + return nil + } + return l.root.prev +} + +// lazyInit lazily initializes a zero List value. +func (l *List) lazyInit() { + if l.root.next == nil { + l.Init() + } +} + +// insert inserts e after at, increments l.len, and returns e. +func (l *List) insert(e, at *Element) *Element { + e.prev = at + e.next = at.next + e.prev.next = e + e.next.prev = e + e.list = l + l.len++ + return e +} + +// insertValue is a convenience wrapper for insert(&Element{Value: v}, at). +func (l *List) insertValue(e, at *Element) *Element { + return l.insert(e, at) +} + +// remove removes e from its list, decrements l.len, and returns e. +func (l *List) remove(e *Element) *Element { + e.prev.next = e.next + e.next.prev = e.prev + e.next = nil // avoid memory leaks + e.prev = nil // avoid memory leaks + e.list = nil + l.len-- + return e +} + +// move moves e to next to at and returns e. +func (l *List) move(e, at *Element) *Element { + if e == at { + return e + } + e.prev.next = e.next + e.next.prev = e.prev + + e.prev = at + e.next = at.next + e.prev.next = e + e.next.prev = e + + return e +} + +// Remove removes e from l if e is an element of list l. +// It returns the element value e.Value. +// The element must not be nil. +func (l *List) Remove(e *Element) ([]byte, []byte) { + if e.list == l { + // if e.list == l, l must have been initialized when e was inserted + // in l or l == nil (e is a zero Element) and l.remove will crash + l.remove(e) + } + return e.K, e.V +} + +// PushFront inserts a new element e with value v at the front of list l and returns e. +func (l *List) PushFront(e *Element) *Element { + l.lazyInit() + return l.insertValue(e, &l.root) +} + +// PushBack inserts a new element e with value v at the back of list l and returns e. +func (l *List) PushBack(e *Element) *Element { + l.lazyInit() + return l.insertValue(e, l.root.prev) +} + +// InsertBefore inserts a new element e with value v immediately before mark and returns e. +// If mark is not an element of l, the list is not modified. +// The mark must not be nil. +func (l *List) InsertBefore(e *Element, mark *Element) *Element { + if mark.list != l { + return nil + } + // see comment in List.Remove about initialization of l + return l.insertValue(e, mark.prev) +} + +// InsertAfter inserts a new element e with value v immediately after mark and returns e. +// If mark is not an element of l, the list is not modified. +// The mark must not be nil. +func (l *List) InsertAfter(e *Element, mark *Element) *Element { + if mark.list != l { + return nil + } + // see comment in List.Remove about initialization of l + return l.insertValue(e, mark) +} + +// MoveToFront moves element e to the front of list l. +// If e is not an element of l, the list is not modified. +// The element must not be nil. +func (l *List) MoveToFront(e *Element) { + if e.list != l || l.root.next == e { return } - c.lock.Lock() - defer c.lock.Unlock() - - var toDel []btree.Item - c.cache.Ascend(func(it btree.Item) bool { - age := goatomic.LoadUint64(&it.(*Pair).t) - if age < dropOlder { - toDel = append(toDel, it) - } - return true - }) - for _, it := range toDel { - c.cache.Delete(it) - } - log.Info("evicted", "too_old_amount", len(toDel)) + // see comment in List.Remove about initialization of l + l.move(e, &l.root) } -//nolint -func (c *CoherentView) evictNew2Random(keysLimit int) { - if c.Len() < keysLimit { +// MoveToBack moves element e to the back of list l. +// If e is not an element of l, the list is not modified. +// The element must not be nil. +func (l *List) MoveToBack(e *Element) { + if e.list != l || l.root.prev == e { return } - c.lock.Lock() - defer c.lock.Unlock() - i := 0 - var toDel []btree.Item - var fst, snd btree.Item - firstPrime, secondPrime := 11, 13 // to choose 2-pseudo-random elements and evict worse one - c.cache.Ascend(func(it btree.Item) bool { - if i%firstPrime == 0 { - fst = it - } - if i%secondPrime == 0 { - snd = it - } - if fst != nil && snd != nil { - if goatomic.LoadUint64(&fst.(*Pair).t) < goatomic.LoadUint64(&snd.(*Pair).t) { - toDel = append(toDel, fst) - } else { - toDel = append(toDel, snd) - } - fst = nil - snd = nil - } - return true - }) - - for _, it := range toDel { - c.cache.Delete(it) - } - log.Info("evicted", "2_random__amount", len(toDel)) + // see comment in List.Remove about initialization of l + l.move(e, l.root.prev) +} + +// MoveBefore moves element e to its new position before mark. +// If e or mark is not an element of l, or e == mark, the list is not modified. +// The element and mark must not be nil. +func (l *List) MoveBefore(e, mark *Element) { + if e.list != l || e == mark || mark.list != l { + return + } + l.move(e, mark.prev) +} + +// MoveAfter moves element e to its new position after mark. +// If e or mark is not an element of l, or e == mark, the list is not modified. +// The element and mark must not be nil. +func (l *List) MoveAfter(e, mark *Element) { + if e.list != l || e == mark || mark.list != l { + return + } + l.move(e, mark) } diff --git a/kv/kvcache/cache_test.go b/kv/kvcache/cache_test.go index 6391d7e31..9388ce853 100644 --- a/kv/kvcache/cache_test.go +++ b/kv/kvcache/cache_test.go @@ -29,6 +29,126 @@ import ( "github.com/stretchr/testify/require" ) +func TestEvictionInUnexpectedOrder(t *testing.T) { + // Order: View - 2, OnNewBlock - 2, View - 5, View - 6, OnNewBlock - 3, OnNewBlock - 4, View - 5, OnNewBlock - 5, OnNewBlock - 100 + require := require.New(t) + cfg := DefaultCoherentCacheConfig + cfg.KeysLimit = 3 + cfg.NewBlockWait = 0 + c := New(cfg) + c.selectOrCreateRoot(2) + require.Equal(1, len(c.roots)) + require.Equal(0, int(c.latestViewID)) + require.False(c.roots[2].isCanonical) + + c.add([]byte{1}, nil, c.roots[2], 2) + require.Equal(0, c.evictList.Len()) + + c.advanceRoot(2) + require.Equal(1, len(c.roots)) + require.Equal(2, int(c.latestViewID)) + require.True(c.roots[2].isCanonical) + + c.add([]byte{1}, nil, c.roots[2], 2) + require.Equal(1, c.evictList.Len()) + + c.selectOrCreateRoot(5) + require.Equal(2, len(c.roots)) + require.Equal(2, int(c.latestViewID)) + require.False(c.roots[5].isCanonical) + + c.add([]byte{2}, nil, c.roots[5], 5) // not added to evict list + require.Equal(1, c.evictList.Len()) + c.add([]byte{2}, nil, c.roots[2], 2) // added to evict list, because it's latest view + require.Equal(2, c.evictList.Len()) + + c.selectOrCreateRoot(6) + require.Equal(3, len(c.roots)) + require.Equal(2, int(c.latestViewID)) + require.False(c.roots[6].isCanonical) // parrent exists, but parent has isCanonical=false + + c.advanceRoot(3) + require.Equal(4, len(c.roots)) + require.Equal(3, int(c.latestViewID)) + require.True(c.roots[3].isCanonical) + + c.advanceRoot(4) + require.Equal(5, len(c.roots)) + require.Equal(4, int(c.latestViewID)) + require.True(c.roots[4].isCanonical) + + c.selectOrCreateRoot(5) + require.Equal(5, len(c.roots)) + require.Equal(4, int(c.latestViewID)) + require.False(c.roots[5].isCanonical) + + c.advanceRoot(5) + require.Equal(5, len(c.roots)) + require.Equal(5, int(c.latestViewID)) + require.True(c.roots[5].isCanonical) + + c.advanceRoot(100) + require.Equal(6, len(c.roots)) + require.Equal(100, int(c.latestViewID)) + require.True(c.roots[100].isCanonical) + + //c.add([]byte{1}, nil, c.roots[2], 2) + require.Equal(0, c.latestView.cache.Len()) + require.Equal(0, c.evictList.Len()) +} + +func TestEviction(t *testing.T) { + require, ctx := require.New(t), context.Background() + cfg := DefaultCoherentCacheConfig + cfg.KeysLimit = 3 + cfg.NewBlockWait = 0 + c := New(cfg) + db := memdb.NewTestDB(t) + k1, k2 := [20]byte{1}, [20]byte{2} + + var id uint64 + _ = db.Update(ctx, func(tx kv.RwTx) error { + _ = tx.Put(kv.PlainState, k1[:], []byte{1}) + viewID, _ := c.View(ctx, tx) + id = tx.ViewID() + _, _ = c.Get(k1[:], tx, viewID) + _, _ = c.Get([]byte{1}, tx, viewID) + _, _ = c.Get([]byte{2}, tx, viewID) + _, _ = c.Get([]byte{3}, tx, viewID) + //require.Equal(c.roots[c.latestViewID].cache.Len(), c.evictList.Len()) + return nil + }) + require.Equal(0, c.evictList.Len()) + //require.Equal(c.roots[c.latestViewID].cache.Len(), c.evictList.Len()) + c.OnNewBlock(&remote.StateChangeBatch{ + DatabaseViewID: id + 1, + ChangeBatch: []*remote.StateChange{ + { + Direction: remote.Direction_FORWARD, + Changes: []*remote.AccountChange{{ + Action: remote.Action_UPSERT, + Address: gointerfaces.ConvertAddressToH160(k1), + Data: []byte{2}, + }}, + }, + }, + }) + require.Equal(1, c.evictList.Len()) + require.Equal(c.roots[c.latestViewID].cache.Len(), c.evictList.Len()) + _ = db.Update(ctx, func(tx kv.RwTx) error { + _ = tx.Put(kv.PlainState, k1[:], []byte{1}) + viewID, _ := c.View(ctx, tx) + id = tx.ViewID() + _, _ = c.Get(k1[:], tx, viewID) + _, _ = c.Get(k2[:], tx, viewID) + _, _ = c.Get([]byte{5}, tx, viewID) + _, _ = c.Get([]byte{6}, tx, viewID) + return nil + }) + require.Equal(c.roots[c.latestViewID].cache.Len(), c.evictList.Len()) + require.Equal(cfg.KeysLimit, c.evictList.Len()) +} + func TestAPI(t *testing.T) { require := require.New(t) c := New(DefaultCoherentCacheConfig) @@ -45,11 +165,11 @@ func TestAPI(t *testing.T) { panic(fmt.Sprintf("epxected: %d, got: %d", expectTxnID, tx.ViewID())) } wg.Done() - cache, err := c.View(context.Background(), tx) + viewID, err := c.View(context.Background(), tx) if err != nil { panic(err) } - v, err := cache.Get(key[:], tx) + v, err := c.Get(key[:], tx, viewID) if err != nil { panic(err) } diff --git a/kv/kvcache/dummy.go b/kv/kvcache/dummy.go index c9f9c5d39..9b8203cf9 100644 --- a/kv/kvcache/dummy.go +++ b/kv/kvcache/dummy.go @@ -24,16 +24,15 @@ import ( // DummyCache - doesn't remember anything - can be used when service is not remote type DummyCache struct{} -type DummyView struct{} -var _ Cache = (*DummyCache)(nil) // compile-time interface check -var _ CacheView = (*DummyView)(nil) // compile-time interface check +var _ Cache = (*DummyCache)(nil) // compile-time interface check +//var _ CacheView = (*DummyView)(nil) // compile-time interface check -var dummyView = &DummyView{} - -func NewDummy() *DummyCache { return &DummyCache{} } -func (c *DummyCache) View(ctx context.Context, tx kv.Tx) (CacheView, error) { return dummyView, nil } -func (c *DummyCache) OnNewBlock(sc *remote.StateChangeBatch) {} -func (c *DummyCache) Evict() int { return 0 } -func (c *DummyCache) Len() int { return 0 } -func (c *DummyView) Get(k []byte, tx kv.Tx) ([]byte, error) { return tx.GetOne(kv.PlainState, k) } +func NewDummy() *DummyCache { return &DummyCache{} } +func (c *DummyCache) View(ctx context.Context, tx kv.Tx) (ViewID, error) { return 0, nil } +func (c *DummyCache) OnNewBlock(sc *remote.StateChangeBatch) {} +func (c *DummyCache) Evict() int { return 0 } +func (c *DummyCache) Len() int { return 0 } +func (c *DummyCache) Get(k []byte, tx kv.Tx, id ViewID) ([]byte, error) { + return tx.GetOne(kv.PlainState, k) +} diff --git a/kv/mdbx/util.go b/kv/mdbx/util.go index cd4f49fbb..c6daf23a1 100644 --- a/kv/mdbx/util.go +++ b/kv/mdbx/util.go @@ -30,6 +30,14 @@ func MustOpen(path string) kv.RwDB { return db } +func MustOpenRo(path string) kv.RoDB { + db, err := Open(path, log.New(), true) + if err != nil { + panic(err) + } + return db +} + // Open - main method to open database. func Open(path string, logger log.Logger, readOnly bool) (kv.RwDB, error) { var db kv.RwDB diff --git a/txpool/fetch.go b/txpool/fetch.go index c2f45a2f9..305888a29 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -302,9 +302,11 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes } case sentry.MessageId_POOLED_TRANSACTIONS_65, sentry.MessageId_POOLED_TRANSACTIONS_66: txs := TxSlots{} - f.pooledTxsParseCtx.Reject(func(hash []byte) bool { - known, _ := f.pool.IdHashKnown(tx, hash) - return known + f.pooledTxsParseCtx.Reject(func(hash []byte) error { + if known, _ := f.pool.IdHashKnown(tx, hash); known { + return ErrRejected + } + return nil }) switch req.Id { case sentry.MessageId_POOLED_TRANSACTIONS_65: @@ -449,7 +451,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien } } if err := f.db.View(ctx, func(tx kv.Tx) error { - return f.pool.OnNewBlock(ctx, req, unwindTxs, minedTxs) + return f.pool.OnNewBlock(ctx, req, unwindTxs, minedTxs, tx) }); err != nil { log.Warn("onNewBlock", "err", err) } diff --git a/txpool/grpc_server.go b/txpool/grpc_server.go index 5dd8285f3..f4b9b477f 100644 --- a/txpool/grpc_server.go +++ b/txpool/grpc_server.go @@ -18,6 +18,7 @@ package txpool import ( "context" + "errors" "fmt" "net" "sync" @@ -130,28 +131,35 @@ func (s *GrpcServer) Add(ctx context.Context, in *txpool_proto.AddRequest) (*txp var slots TxSlots slots.Resize(uint(len(in.RlpTxs))) parseCtx := NewTxParseContext(s.rules, s.chainID) - parseCtx.Reject(func(hash []byte) bool { - known, _ := s.txPool.IdHashKnown(tx, hash) - return known + parseCtx.Reject(func(hash []byte) error { + if known, _ := s.txPool.IdHashKnown(tx, hash); known { + return ErrAlreadyKnown + } + return nil }) + reply := &txpool_proto.AddReply{Imported: make([]txpool_proto.ImportResult, len(in.RlpTxs)), Errors: make([]string, len(in.RlpTxs))} + for i := range in.RlpTxs { slots.txs[i] = &TxSlot{} slots.isLocal[i] = true if _, err := parseCtx.ParseTransaction(in.RlpTxs[i], 0, slots.txs[i], slots.senders.At(i)); err != nil { - log.Warn("pool add", "err", err) + if errors.Is(err, ErrAlreadyKnown) { + + } else { + log.Warn("pool add", "err", err) + } continue } } - reply := &txpool_proto.AddReply{Imported: make([]txpool_proto.ImportResult, len(in.RlpTxs)), Errors: make([]string, len(in.RlpTxs))} discardReasons, err := s.txPool.AddLocalTxs(ctx, slots) if err != nil { return nil, err } - //TODO: concept of discardReasons not really implemented yet + //TODO: concept of discardReasonsLRU not really implemented yet _ = discardReasons /* - for i, err := range discardReasons { + for i, err := range discardReasonsLRU { if err == nil { continue } diff --git a/txpool/mocks_test.go b/txpool/mocks_test.go index c75d93195..6dc75361b 100644 --- a/txpool/mocks_test.go +++ b/txpool/mocks_test.go @@ -312,7 +312,7 @@ func (mock *PoolMock) IdHashKnownCalls() []struct { } // OnNewBlock calls OnNewBlockFunc. -func (mock *PoolMock) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs TxSlots, minedTxs TxSlots) error { +func (mock *PoolMock) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs TxSlots, tx kv.Tx) error { callInfo := struct { Ctx context.Context StateChanges *remote.StateChangeBatch diff --git a/txpool/packets_test.go b/txpool/packets_test.go index 9cb433bf3..71fa028ca 100644 --- a/txpool/packets_test.go +++ b/txpool/packets_test.go @@ -159,7 +159,7 @@ func TestPooledTransactionsPacket66(t *testing.T) { require.Equal(tt.encoded, fmt.Sprintf("%x", encodeBuf)) ctx := NewTxParseContext(chain.MainnetRules, *u256.N1) - ctx.reject = func(bytes []byte) bool { return true } + ctx.checkHash = func(bytes []byte) error { return ErrRejected } slots := &TxSlots{} requestId, _, err := ParsePooledTransactions66(encodeBuf, 0, ctx, slots) require.NoError(err) diff --git a/txpool/pool.go b/txpool/pool.go index eaec1d6ec..bace55511 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -54,7 +54,8 @@ var ( addRemoteTxsTimer = metrics.NewSummary(`pool_add_remote_txs`) newBlockTimer = metrics.NewSummary(`pool_new_block`) writeToDbTimer = metrics.NewSummary(`pool_write_to_db`) - propagateToNewPeerTimer = metrics.NewSummary(`pool_propagate_new_peer`) + propagateToNewPeerTimer = metrics.NewSummary(`pool_propagate_to_new_peer`) + propagateNewTxsTimer = metrics.NewSummary(`pool_propagate_new_txs`) writeToDbBytesCounter = metrics.GetOrCreateCounter(`pool_write_to_db_bytes`) ) @@ -81,8 +82,8 @@ var DefaultConfig = Config{ CacheEvictEvery: 1 * time.Minute, PendingSubPoolLimit: 50_000, - BaseFeeSubPoolLimit: 50_000, - QueuedSubPoolLimit: 50_000, + BaseFeeSubPoolLimit: 200_000, + QueuedSubPoolLimit: 90_000, } // Pool is interface for the transaction pool @@ -92,7 +93,7 @@ type Pool interface { // Handle 3 main events - new remote txs from p2p, new local txs from RPC, new blocks from execution layer AddRemoteTxs(ctx context.Context, newTxs TxSlots) AddLocalTxs(ctx context.Context, newTxs TxSlots) ([]DiscardReason, error) - OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs TxSlots) error + OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs TxSlots, tx kv.Tx) error // IdHashKnown check whether transaction with given Id hash is known to the pool IdHashKnown(tx kv.Tx, hash []byte) (bool, error) @@ -124,13 +125,18 @@ type DiscardReason uint8 const ( //TODO: all below codes are not fixed yet. Need add them to discardLocked func. Need save discard reasons to LRU or DB. - Success DiscardReason = 1 - AlreadyKnown DiscardReason = 2 - UnderPriced DiscardReason = 3 - FeeTooLow DiscardReason = 4 - OversizedData DiscardReason = 5 - InvalidSender DiscardReason = 6 - NegativeValue DiscardReason = 7 + Success DiscardReason = 1 + AlreadyKnown DiscardReason = 2 + Mined DiscardReason = 3 + ReplacedByHigherTip DiscardReason = 4 + UnderPriced DiscardReason = 5 + FeeTooLow DiscardReason = 6 + OversizedData DiscardReason = 7 + InvalidSender DiscardReason = 8 + NegativeValue DiscardReason = 9 + PendingPoolOverflow DiscardReason = 10 + BaseFeePoolOverflow DiscardReason = 11 + QueuedPoolOverflow DiscardReason = 12 ) // metaTx holds transaction and some metadata @@ -179,159 +185,18 @@ func (i *sortByNonce) Less(than btree.Item) bool { return i.metaTx.Tx.nonce < than.(*sortByNonce).metaTx.Tx.nonce } -// sendersBatch stores in-memory senders-related objects - which are different from DB (updated/dirty) -// flushing to db periodicaly. it doesn't play as read-cache (because db is small and memory-mapped - doesn't need cache) -// non thread-safe -type sendersBatch struct { - senderID uint64 - senderIDs map[string]uint64 - senderID2Addr map[uint64]string - cache kvcache.Cache -} - -func newSendersCache(cache kvcache.Cache) *sendersBatch { - return &sendersBatch{senderIDs: map[string]uint64{}, senderID2Addr: map[uint64]string{}, cache: cache} -} - -func (sc *sendersBatch) id(addr string) (uint64, bool) { - id, ok := sc.senderIDs[addr] - return id, ok -} -func (sc *sendersBatch) info(cache kvcache.CacheView, coreTx kv.Tx, id uint64) (nonce uint64, balance uint256.Int, err error) { - //cacheTotalCounter.Inc() - addr, ok := sc.senderID2Addr[id] - if !ok { - panic("must not happen") - } - encoded, err := cache.Get([]byte(addr), coreTx) - if err != nil { - return 0, emptySender.balance, err - } - if len(encoded) == 0 { - return emptySender.nonce, emptySender.balance, nil - } - nonce, balance, err = DecodeSender(encoded) - if err != nil { - return 0, emptySender.balance, err - } - return nonce, balance, nil -} - -//nolint -func (sc *sendersBatch) printDebug(prefix string) { - fmt.Printf("%s.sendersBatch.sender\n", prefix) - //for i, j := range sc.senderInfo { - // fmt.Printf("\tid=%d,nonce=%d,balance=%d\n", i, j.nonce, j.balance.Uint64()) - //} -} - -func (sc *sendersBatch) onNewTxs(newTxs TxSlots) (err error) { - for i := 0; i < len(newTxs.txs); i++ { - id, ok := sc.id(string(newTxs.senders.At(i))) - if ok { - newTxs.txs[i].senderID = id - continue - } - sc.senderID++ - sc.senderIDs[string(newTxs.senders.At(i))] = sc.senderID - sc.senderID2Addr[sc.senderID] = string(newTxs.senders.At(i)) - - newTxs.txs[i].senderID = sc.senderID - } - return nil -} -func (sc *sendersBatch) onNewBlock(stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs TxSlots) error { - for _, diff := range stateChanges.ChangeBatch { - for _, change := range diff.Changes { // merge state changes - addrB := gointerfaces.ConvertH160toAddress(change.Address) - addr := string(addrB[:]) - _, ok := sc.id(addr) - if !ok { - sc.senderID++ - sc.senderIDs[addr] = sc.senderID - sc.senderID2Addr[sc.senderID] = addr - } - } - - for i := 0; i < unwindTxs.senders.Len(); i++ { - addr := string(unwindTxs.senders.At(i)) - id, ok := sc.id(addr) - if !ok { - sc.senderID++ - id = sc.senderID - sc.senderIDs[addr] = sc.senderID - sc.senderID2Addr[sc.senderID] = addr - } - unwindTxs.txs[i].senderID = id - } - - for i := 0; i < len(minedTxs.txs); i++ { - addr := string(minedTxs.senders.At(i)) - id, ok := sc.id(addr) - if !ok { - sc.senderID++ - id = sc.senderID - sc.senderIDs[addr] = sc.senderID - sc.senderID2Addr[sc.senderID] = addr - } - minedTxs.txs[i].senderID = id - } - } - return nil -} - func calcProtocolBaseFee(baseFee uint64) uint64 { return 7 } -type ByNonce struct { - tree *btree.BTree -} - -func (b *ByNonce) ascend(senderID uint64, f func(*metaTx) bool) { - b.tree.AscendGreaterOrEqual(&sortByNonce{&metaTx{Tx: &TxSlot{senderID: senderID}}}, func(i btree.Item) bool { - mt := i.(*sortByNonce).metaTx - if mt.Tx.senderID != senderID { - return false - } - return f(mt) - }) -} -func (b *ByNonce) hasTxs(senderID uint64) bool { - has := false - b.ascend(senderID, func(*metaTx) bool { - has = true - return false - }) - return has -} -func (b *ByNonce) get(senderID, txNonce uint64) *metaTx { - if found := b.tree.Get(&sortByNonce{&metaTx{Tx: &TxSlot{senderID: senderID, nonce: txNonce}}}); found != nil { - return found.(*sortByNonce).metaTx - } - return nil -} - -//nolint -func (b *ByNonce) has(mt *metaTx) bool { - found := b.tree.Get(&sortByNonce{mt}) - return found != nil -} -func (b *ByNonce) delete(mt *metaTx) { b.tree.Delete(&sortByNonce{mt}) } -func (b *ByNonce) replaceOrInsert(mt *metaTx) *metaTx { - it := b.tree.ReplaceOrInsert(&sortByNonce{mt}) - if it != nil { - return it.(*sortByNonce).metaTx - } - return nil -} - // TxPool - holds all pool-related data structures and lock-based tiny methods // most of logic implemented by pure tests-friendly functions // // txpool doesn't start any goroutines - "leave concurrency to user" design // txpool has no DB or TX fields - "leave db transactions management to user" design // txpool has _coreDB field - but it must maximize local state cache hit-rate - and perform minimum _coreDB transactions +// +// It preserve TxSlot objects immutable type TxPool struct { lock *sync.RWMutex @@ -340,21 +205,22 @@ type TxPool struct { protocolBaseFee atomic.Uint64 currentBaseFee atomic.Uint64 - senderID uint64 - byHash map[string]*metaTx // tx_hash => tx - pending *PendingPool - baseFee, queued *SubPool + senderID uint64 + byHash map[string]*metaTx // tx_hash => tx + discardReasonsLRU *simplelru.LRU // tx_hash => discard_reason + pending *PendingPool + baseFee, queued *SubPool // track isLocal flag of already mined transactions. used at unwind. - isLocalHashLRU *simplelru.LRU - _coreDB kv.RoDB + isLocalLRU *simplelru.LRU // tx_hash => is_local + _coreDB kv.RoDB // fields for transaction propagation recentlyConnectedPeers *recentlyConnectedPeers newTxs chan Hashes deletedTxs []*metaTx - discardReasons []DiscardReason senders *sendersBatch + _cache kvcache.Cache byNonce *ByNonce // senderID => (sorted map of tx nonce => *metaTx) // batch processing of remote transactions @@ -371,21 +237,27 @@ type TxPool struct { } func New(newTxs chan Hashes, coreDB kv.RoDB, cfg Config, cache kvcache.Cache, rules chain.Rules, chainID uint256.Int) (*TxPool, error) { - localsHistory, err := simplelru.NewLRU(1024, nil) + localsHistory, err := simplelru.NewLRU(10_000, nil) + if err != nil { + return nil, err + } + discardHistory, err := simplelru.NewLRU(10_000, nil) if err != nil { return nil, err } return &TxPool{ lock: &sync.RWMutex{}, byHash: map[string]*metaTx{}, + isLocalLRU: localsHistory, + discardReasonsLRU: discardHistory, byNonce: &ByNonce{btree.New(32)}, - isLocalHashLRU: localsHistory, recentlyConnectedPeers: &recentlyConnectedPeers{}, pending: NewPendingSubPool(PendingSubPool), baseFee: NewSubPool(BaseFeeSubPool), queued: NewSubPool(QueuedSubPool), newTxs: newTxs, - senders: newSendersCache(cache), + _cache: cache, + senders: newSendersCache(), _coreDB: coreDB, cfg: cfg, rules: rules, @@ -396,22 +268,33 @@ func New(newTxs chan Hashes, coreDB kv.RoDB, cfg Config, cache kvcache.Cache, ru }, nil } -func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs TxSlots) error { +func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs TxSlots, tx kv.Tx) error { defer newBlockTimer.UpdateDuration(time.Now()) + //t := time.Now() - p.cache().OnNewBlock(stateChanges) - + cache := p.cache() + cache.OnNewBlock(stateChanges) coreTx, err := p.coreDB().BeginRo(ctx) if err != nil { return err } defer coreTx.Rollback() - cache, err := p.cache().View(ctx, coreTx) + + p.lock.Lock() + defer p.lock.Unlock() + + if !p.started.Load() { + if err := p.fromDB(ctx, tx, coreTx); err != nil { + return err + } + } + + viewID, err := cache.View(ctx, coreTx) if err != nil { return err } if ASSERT { - if _, err := kvcache.AssertCheckValues(context.Background(), coreTx, p.cache()); err != nil { + if _, err := kvcache.AssertCheckValues(ctx, coreTx, cache); err != nil { log.Error("AssertCheckValues", "err", err, "stack", stack.Trace().String()) } } @@ -425,16 +308,31 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang baseFee := stateChanges.ChangeBatch[len(stateChanges.ChangeBatch)-1].ProtocolBaseFee blockHeight := stateChanges.ChangeBatch[len(stateChanges.ChangeBatch)-1].BlockHeight - p.lock.Lock() - defer p.lock.Unlock() - protocolBaseFee, baseFee := p.setBaseFee(baseFee) p.lastSeenBlock.Store(blockHeight) if err := p.senders.onNewBlock(stateChanges, unwindTxs, minedTxs); err != nil { return err } + + if ASSERT { + for i := range unwindTxs.txs { + if unwindTxs.txs[i].senderID == 0 { + panic(fmt.Errorf("onNewBlock.unwindTxs: senderID can't be zero")) + } + } + for i := range minedTxs.txs { + if minedTxs.txs[i].senderID == 0 { + panic(fmt.Errorf("onNewBlock.minedTxs: senderID can't be zero")) + } + } + } + + if err := removeMined(p.byNonce, minedTxs.txs, p.pending, p.baseFee, p.queued, p.discardLocked); err != nil { + return err + } + //log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "baseFee", baseFee, "blockHeight", blockHeight) - if err := onNewBlock(p.lastSeenBlock.Load(), cache, coreTx, p.cfg, p.senders, unwindTxs, minedTxs.txs, protocolBaseFee, baseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil { + if err := addTxs(p.lastSeenBlock.Load(), cache, viewID, coreTx, p.cfg, p.senders, unwindTxs, protocolBaseFee, baseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil { return err } @@ -457,6 +355,7 @@ func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChang } } + //log.Info("[txpool] new block", "number", p.lastSeenBlock.Load(), "in", time.Since(t)) return nil } @@ -465,13 +364,14 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error { return fmt.Errorf("txpool not started yet") } + cache := p.cache() defer processBatchTxsTimer.UpdateDuration(time.Now()) coreTx, err := p.coreDB().BeginRo(ctx) if err != nil { return err } defer coreTx.Rollback() - cache, err := p.cache().View(ctx, coreTx) + viewID, err := cache.View(ctx, coreTx) if err != nil { return err } @@ -494,7 +394,7 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error { return err } - if err := onNewTxs(p.lastSeenBlock.Load(), cache, coreTx, p.cfg, p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil { + if err := addTxs(p.lastSeenBlock.Load(), cache, viewID, coreTx, p.cfg, p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil { return err } @@ -587,13 +487,16 @@ func (p *TxPool) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) { func (p *TxPool) IsLocal(idHash []byte) bool { p.lock.RLock() defer p.lock.RUnlock() - - txn, ok := p.byHash[string(idHash)] - if ok && txn.subPool&IsLocal != 0 { - return true + return p.isLocalLRU.Contains(string(idHash)) +} +func (p *TxPool) DiscardReason(idHash []byte) DiscardReason { + p.lock.RLock() + defer p.lock.RUnlock() + reason, ok := p.discardReasonsLRU.Get(string(idHash)) + if ok { + return reason.(DiscardReason) } - _, ok = p.isLocalHashLRU.Get(string(idHash)) - return ok + return 0 } func (p *TxPool) AddNewGoodPeer(peerID PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) } func (p *TxPool) Started() bool { return p.started.Load() } @@ -650,7 +553,7 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTxs TxSlots) ([]DiscardReas } defer coreTx.Rollback() - cache, err := p.cache().View(ctx, coreTx) + viewID, err := p.cache().View(ctx, coreTx) if err != nil { return nil, err } @@ -661,12 +564,11 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTxs TxSlots) ([]DiscardReas p.lock.Lock() defer p.lock.Unlock() - discardReasonsIndex := len(p.discardReasons) if err = p.senders.onNewTxs(newTxs); err != nil { return nil, err } - if err := onNewTxs(p.lastSeenBlock.Load(), cache, coreTx, p.cfg, p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil { + if err := addTxs(p.lastSeenBlock.Load(), p._cache, viewID, coreTx, p.cfg, p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil { return nil, err } @@ -685,12 +587,15 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTxs TxSlots) ([]DiscardReas default: } } - return p.copyDiscardReasons(discardReasonsIndex), nil -} -func (p *TxPool) copyDiscardReasons(from int) []DiscardReason { - cpy := make([]DiscardReason, len(p.discardReasons)-from) - copy(cpy, p.discardReasons[from:]) - return cpy + + reasons := make([]DiscardReason, len(newTxs.txs)) + for i := range newTxs.txs { + reason, ok := p.discardReasonsLRU.Get(string(newTxs.txs[i].idHash[:])) + if ok { + reasons[i] = reason.(DiscardReason) + } + } + return reasons, nil } func (p *TxPool) coreDB() kv.RoDB { p.lock.RLock() @@ -701,29 +606,41 @@ func (p *TxPool) coreDB() kv.RoDB { func (p *TxPool) cache() kvcache.Cache { p.lock.RLock() defer p.lock.RUnlock() - return p.senders.cache + return p._cache } -func onNewTxs(blockNum uint64, cache kvcache.CacheView, coreTx kv.Tx, cfg Config, senders *sendersBatch, newTxs TxSlots, protocolBaseFee, currentBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, discard func(*metaTx)) error { - for i := range newTxs.txs { - if newTxs.txs[i].senderID == 0 { - return fmt.Errorf("senderID can't be zero") +func addTxs(blockNum uint64, cache kvcache.Cache, viewID kvcache.ViewID, coreTx kv.Tx, cfg Config, senders *sendersBatch, newTxs TxSlots, protocolBaseFee, currentBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, add func(*metaTx) bool, discard func(*metaTx, DiscardReason)) error { + if ASSERT { + for i := range newTxs.txs { + if newTxs.txs[i].senderID == 0 { + panic(fmt.Errorf("senderID can't be zero")) + } } } - - changedSenders := unsafeAddToPendingPool(blockNum, byNonce, newTxs, pending, baseFee, queued, byHash, discard) + //defer func(t time.Time) { fmt.Printf("pool.go:611: %s\n", time.Since(t)) }(time.Now()) + // This can be thought of a reverse operation from the one described before. + // When a block that was deemed "the best" of its height, is no longer deemed "the best", the + // transactions contained in it, are now viable for inclusion in other blocks, and therefore should + // be returned into the transaction pool. + // An interesting note here is that if the block contained any transactions local to the node, + // by being first removed from the pool (from the "local" part of it), and then re-injected, + // they effective lose their priority over the "remote" transactions. In order to prevent that, + // somehow the fact that certain transactions were local, needs to be remembered for some + // time (up to some "immutability threshold"). + changedSenders := unsafeAddToPendingPool(blockNum, newTxs, byHash, add) for id := range changedSenders { - nonce, balance, err := senders.info(cache, coreTx, id) + nonce, balance, err := senders.info(cache, viewID, coreTx, id) if err != nil { return err } - onSenderChange(id, nonce, balance, byNonce, protocolBaseFee, currentBaseFee) + onSenderChange(id, nonce, balance, byNonce, protocolBaseFee, currentBaseFee, pending, baseFee, queued) } - pending.EnforceWorstInvariants() - baseFee.EnforceInvariants() - queued.EnforceInvariants() - + //defer func(t time.Time) { fmt.Printf("pool.go:630: %s\n", time.Since(t)) }(time.Now()) + //pending.EnforceWorstInvariants() + //baseFee.EnforceInvariants() + //queued.EnforceInvariants() promote(pending, baseFee, queued, cfg, discard) + //pending.EnforceWorstInvariants() pending.EnforceBestInvariants() return nil @@ -736,56 +653,48 @@ func (p *TxPool) setBaseFee(baseFee uint64) (uint64, uint64) { } return p.protocolBaseFee.Load(), p.currentBaseFee.Load() } -func (p *TxPool) discardLocked(mt *metaTx) { + +func (p *TxPool) addLocked(mt *metaTx) bool { + // Insert to pending pool, if pool doesn't have txn with same Nonce and bigger Tip + found := p.byNonce.get(mt.Tx.senderID, mt.Tx.nonce) + if found != nil { + if mt.Tx.tip <= found.Tx.tip { + return false + } + + switch found.currentSubPool { + case PendingSubPool: + p.pending.UnsafeRemove(found) + case BaseFeeSubPool: + p.baseFee.UnsafeRemove(found) + case QueuedSubPool: + p.queued.UnsafeRemove(found) + default: + //already removed + } + + p.discardLocked(found, ReplacedByHigherTip) + } + + p.byHash[string(mt.Tx.idHash[:])] = mt + + if replaced := p.byNonce.replaceOrInsert(mt); replaced != nil { + if ASSERT { + panic("must neve happen") + } + } + + if mt.subPool&IsLocal != 0 { + p.isLocalLRU.Add(string(mt.Tx.idHash[:]), struct{}{}) + } + p.pending.Add(mt) + return true +} +func (p *TxPool) discardLocked(mt *metaTx, reason DiscardReason) { delete(p.byHash, string(mt.Tx.idHash[:])) p.deletedTxs = append(p.deletedTxs, mt) p.byNonce.delete(mt) - if mt.subPool&IsLocal != 0 { - p.isLocalHashLRU.Add(string(mt.Tx.idHash[:]), struct{}{}) - } -} -func onNewBlock(blockNum uint64, cache kvcache.CacheView, coreTx kv.Tx, cfg Config, senders *sendersBatch, unwindTxs TxSlots, minedTxs []*TxSlot, protocolBaseFee, pendingBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, discard func(*metaTx)) error { - for i := range unwindTxs.txs { - if unwindTxs.txs[i].senderID == 0 { - return fmt.Errorf("onNewBlock.unwindTxs: senderID can't be zero") - } - } - for i := range minedTxs { - if minedTxs[i].senderID == 0 { - return fmt.Errorf("onNewBlock.minedTxs: senderID can't be zero") - } - } - - if err := removeMined(byNonce, minedTxs, pending, baseFee, queued, discard); err != nil { - return err - } - - // This can be thought of a reverse operation from the one described before. - // When a block that was deemed "the best" of its height, is no longer deemed "the best", the - // transactions contained in it, are now viable for inclusion in other blocks, and therefore should - // be returned into the transaction pool. - // An interesting note here is that if the block contained any transactions local to the node, - // by being first removed from the pool (from the "local" part of it), and then re-injected, - // they effective lose their priority over the "remote" transactions. In order to prevent that, - // somehow the fact that certain transactions were local, needs to be remembered for some - // time (up to some "immutability threshold"). - changedSenders := unsafeAddToPendingPool(blockNum, byNonce, unwindTxs, pending, baseFee, queued, byHash, discard) - for id := range changedSenders { - nonce, balance, err := senders.info(cache, coreTx, id) - if err != nil { - return err - } - onSenderChange(id, nonce, balance, byNonce, protocolBaseFee, pendingBaseFee) - } - - pending.EnforceWorstInvariants() - baseFee.EnforceInvariants() - queued.EnforceInvariants() - - promote(pending, baseFee, queued, cfg, discard) - pending.EnforceBestInvariants() - - return nil + p.discardReasonsLRU.Add(string(mt.Tx.idHash[:]), reason) } // removeMined - apply new highest block (or batch of blocks) @@ -795,7 +704,7 @@ func onNewBlock(blockNum uint64, cache kvcache.CacheView, coreTx kv.Tx, cfg Conf // modify state_balance and state_nonce, potentially remove some elements (if transaction with some nonce is // included into a block), and finally, walk over the transaction records and update SubPool fields depending on // the actual presence of nonce gaps and what the balance is. -func removeMined(byNonce *ByNonce, minedTxs []*TxSlot, pending *PendingPool, baseFee, queued *SubPool, discard func(tx *metaTx)) error { +func removeMined(byNonce *ByNonce, minedTxs []*TxSlot, pending *PendingPool, baseFee, queued *SubPool, discard func(*metaTx, DiscardReason)) error { noncesToRemove := map[uint64]uint64{} for _, txn := range minedTxs { nonce, ok := noncesToRemove[txn.senderID] @@ -831,7 +740,7 @@ func removeMined(byNonce *ByNonce, minedTxs []*TxSlot, pending *PendingPool, bas }) for i := range toDel { - discard(toDel[i]) + discard(toDel[i], Mined) } toDel = toDel[:0] } @@ -839,7 +748,7 @@ func removeMined(byNonce *ByNonce, minedTxs []*TxSlot, pending *PendingPool, bas } // unwind -func unsafeAddToPendingPool(blockNum uint64, byNonce *ByNonce, newTxs TxSlots, pending *PendingPool, baseFee, queued *SubPool, byHash map[string]*metaTx, discard func(tx *metaTx)) (changedSenders map[uint64]struct{}) { +func unsafeAddToPendingPool(blockNum uint64, newTxs TxSlots, byHash map[string]*metaTx, add func(*metaTx) bool) (changedSenders map[uint64]struct{}) { changedSenders = map[uint64]struct{}{} for i, txn := range newTxs.txs { if _, ok := byHash[string(txn.idHash[:])]; ok { @@ -847,41 +756,14 @@ func unsafeAddToPendingPool(blockNum uint64, byNonce *ByNonce, newTxs TxSlots, p } mt := newMetaTx(txn, newTxs.isLocal[i], blockNum) - // Insert to pending pool, if pool doesn't have txn with same Nonce and bigger Tip - found := byNonce.get(txn.senderID, txn.nonce) - if found != nil { - if txn.tip <= found.Tx.tip { - continue - } - - switch found.currentSubPool { - case PendingSubPool: - pending.UnsafeRemove(found) - case BaseFeeSubPool: - baseFee.UnsafeRemove(found) - case QueuedSubPool: - queued.UnsafeRemove(found) - default: - //already removed - } - - discard(found) + if add(mt) { + changedSenders[txn.senderID] = struct{}{} } - - byHash[string(txn.idHash[:])] = mt - if replaced := byNonce.replaceOrInsert(mt); replaced != nil { - if ASSERT { - panic("must neve happen") - } - } - - changedSenders[txn.senderID] = struct{}{} - pending.UnsafeAdd(mt) } return changedSenders } -func onSenderChange(senderID uint64, senderNonce uint64, senderBalance uint256.Int, byNonce *ByNonce, protocolBaseFee, currentBaseFee uint64) { +func onSenderChange(senderID uint64, senderNonce uint64, senderBalance uint256.Int, byNonce *ByNonce, protocolBaseFee, currentBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool) { noGapsNonce := senderNonce cumulativeRequiredBalance := uint256.NewInt(0) minFeeCap := uint64(math.MaxUint64) @@ -942,11 +824,19 @@ func onSenderChange(senderID uint64, senderNonce uint64, senderBalance uint256.I // 5. Local transaction. Set to 1 if transaction is local. // can't change + switch mt.currentSubPool { + case PendingSubPool: + pending.Updated(mt) + case BaseFeeSubPool: + baseFee.Updated(mt) + case QueuedSubPool: + queued.Updated(mt) + } return true }) } -func promote(pending *PendingPool, baseFee, queued *SubPool, cfg Config, discard func(*metaTx)) { +func promote(pending *PendingPool, baseFee, queued *SubPool, cfg Config, discard func(*metaTx, DiscardReason)) { //1. If top element in the worst green queue has subPool != 0b1111 (binary), it needs to be removed from the green pool. // If subPool < 0b1000 (not satisfying minimum fee), discard. // If subPool == 0b1110, demote to the yellow pool, otherwise demote to the red pool. @@ -962,7 +852,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, cfg Config, discard queued.Add(pending.PopWorst()) continue } - discard(pending.PopWorst()) + discard(pending.PopWorst(), FeeTooLow) } //2. If top element in the worst green queue has subPool == 0b1111, but there is not enough room in the pool, discard. @@ -970,7 +860,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, cfg Config, discard if worst.subPool >= 0b11111 { // TODO: here must 'subPool == 0b1111' or 'subPool <= 0b1111' ? break } - discard(pending.PopWorst()) + discard(pending.PopWorst(), PendingPoolOverflow) } //3. If the top element in the best yellow queue has subPool == 0b1111, promote to the green pool. @@ -978,7 +868,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, cfg Config, discard if best.subPool < 0b11110 { break } - pending.UnsafeAdd(baseFee.PopBest()) + pending.Add(baseFee.PopBest()) } //4. If the top element in the worst yellow queue has subPool != 0x1110, it needs to be removed from the yellow pool. @@ -991,7 +881,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, cfg Config, discard queued.Add(baseFee.PopWorst()) continue } - discard(baseFee.PopWorst()) + discard(baseFee.PopWorst(), FeeTooLow) } //5. If the top element in the worst yellow queue has subPool == 0x1110, but there is not enough room in the pool, discard. @@ -999,7 +889,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, cfg Config, discard if worst.subPool >= 0b11110 { break } - discard(baseFee.PopWorst()) + discard(baseFee.PopWorst(), BaseFeePoolOverflow) } //6. If the top element in the best red queue has subPool == 0x1110, promote to the yellow pool. If subPool == 0x1111, promote to the green pool. @@ -1012,7 +902,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, cfg Config, discard continue } - pending.UnsafeAdd(queued.PopBest()) + pending.Add(queued.PopBest()) } //7. If the top element in the worst red queue has subPool < 0b1000 (not satisfying minimum fee), discard. @@ -1021,15 +911,592 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, cfg Config, discard break } - discard(queued.PopWorst()) + discard(queued.PopWorst(), FeeTooLow) } //8. If the top element in the worst red queue has subPool >= 0b100, but there is not enough room in the pool, discard. for _ = queued.Worst(); queued.Len() > cfg.QueuedSubPoolLimit; _ = queued.Worst() { - discard(queued.PopWorst()) + discard(queued.PopWorst(), QueuedPoolOverflow) } } +// MainLoop - does: +// send pending byHash to p2p: +// - new byHash +// - all pooled byHash to recently connected peers +// - all local pooled byHash to random peers periodically +// promote/demote transactions +// reorgs +func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs chan Hashes, send *Send, newSlotsStreams *NewSlotsStreams, notifyMiningAboutNewSlots func()) { + syncToNewPeersEvery := time.NewTicker(p.cfg.SyncToNewPeersEvery) + defer syncToNewPeersEvery.Stop() + processRemoteTxsEvery := time.NewTicker(p.cfg.ProcessRemoteTxsEvery) + defer processRemoteTxsEvery.Stop() + commitEvery := time.NewTicker(p.cfg.CommitEvery) + defer commitEvery.Stop() + logEvery := time.NewTicker(p.cfg.LogEvery) + defer logEvery.Stop() + + localTxHashes := make([]byte, 0, 128) + remoteTxHashes := make([]byte, 0, 128) + + for { + select { + case <-ctx.Done(): + return + case <-logEvery.C: + p.logStats() + case <-processRemoteTxsEvery.C: + if !p.Started() { + continue + } + if err := p.processRemoteTxs(ctx); err != nil { + if s, ok := status.FromError(err); ok && retryLater(s.Code()) { + continue + } + if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + continue + } + log.Error("[txpool] process batch remote txs", "err", err) + } + case <-commitEvery.C: + if db != nil { + t := time.Now() + written, err := p.flush(db) + if err != nil { + log.Error("[txpool] flush is local history", "err", err) + continue + } + writeToDbBytesCounter.Set(written) + log.Debug("[txpool] Commit", "written_kb", written/1024, "in", time.Since(t)) + } + case h := <-newTxs: + t := time.Now() + notifyMiningAboutNewSlots() + if err := db.View(ctx, func(tx kv.Tx) error { + slotsRlp := make([][]byte, 0, h.Len()) + for i := 0; i < h.Len(); i++ { + slotRlp, err := p.GetRlp(tx, h.At(i)) + if err != nil { + return err + } + slotsRlp = append(slotsRlp, slotRlp) + } + newSlotsStreams.Broadcast(&proto_txpool.OnAddReply{RplTxs: slotsRlp}) + return nil + }); err != nil { + log.Error("[txpool] send new slots by grpc", "err", err) + } + + // first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers + localTxHashes = localTxHashes[:0] + remoteTxHashes = remoteTxHashes[:0] + + for i := 0; i < h.Len(); i++ { + if p.IsLocal(h.At(i)) { + localTxHashes = append(localTxHashes, h.At(i)...) + } else { + remoteTxHashes = append(localTxHashes, h.At(i)...) + } + } + + send.BroadcastLocalPooledTxs(localTxHashes) + send.BroadcastRemotePooledTxs(remoteTxHashes) + propagateNewTxsTimer.UpdateDuration(t) + case <-syncToNewPeersEvery.C: // new peer + newPeers := p.recentlyConnectedPeers.GetAndClean() + if len(newPeers) == 0 { + continue + } + t := time.Now() + remoteTxHashes = p.AppendAllHashes(remoteTxHashes[:0]) + send.PropagatePooledTxsToPeersList(newPeers, remoteTxHashes) + propagateToNewPeerTimer.UpdateDuration(t) + } + } +} + +func (p *TxPool) flush(db kv.RwDB) (written uint64, err error) { + defer writeToDbTimer.UpdateDuration(time.Now()) + p.lock.Lock() + defer p.lock.Unlock() + //it's important that write db tx is done inside lock, to make last writes visible for all read operations + if err := db.Update(context.Background(), func(tx kv.RwTx) error { + err = p.flushLocked(tx) + if err != nil { + return err + } + written, _, err = tx.(*mdbx.MdbxTx).SpaceDirty() + if err != nil { + return err + } + return nil + }); err != nil { + return 0, err + } + return written, nil +} +func (p *TxPool) flushLocked(tx kv.RwTx) (err error) { + for i := 0; i < len(p.deletedTxs); i++ { + if !p.byNonce.hasTxs(p.deletedTxs[i].Tx.senderID) { + addr, ok := p.senders.senderID2Addr[p.deletedTxs[i].Tx.senderID] + if ok { + delete(p.senders.senderID2Addr, p.deletedTxs[i].Tx.senderID) + delete(p.senders.senderIDs, addr) + } + } + if err := tx.Delete(kv.PoolTransaction, p.deletedTxs[i].Tx.idHash[:], nil); err != nil { + return err + } + p.deletedTxs[i] = nil // for gc + } + + txHashes := p.isLocalLRU.Keys() + encID := make([]byte, 8) + if err := tx.ClearBucket(kv.RecentLocalTransaction); err != nil { + return err + } + for i := range txHashes { + binary.BigEndian.PutUint64(encID, uint64(i)) + if err := tx.Append(kv.RecentLocalTransaction, encID, []byte(txHashes[i].(string))); err != nil { + return err + } + } + + v := make([]byte, 0, 1024) + for txHash, metaTx := range p.byHash { + if metaTx.Tx.rlp == nil { + continue + } + v = common.EnsureEnoughSize(v, 20+len(metaTx.Tx.rlp)) + for addr, id := range p.senders.senderIDs { // no inverted index - tradeoff flush speed for memory usage + if id == metaTx.Tx.senderID { + copy(v[:20], addr) + break + } + } + copy(v[20:], metaTx.Tx.rlp) + + has, _ := tx.Has(kv.PoolTransaction, []byte(txHash)) + if has { + panic("must not happen") + } + if err := tx.Put(kv.PoolTransaction, []byte(txHash), v); err != nil { + return err + } + metaTx.Tx.rlp = nil + } + + binary.BigEndian.PutUint64(encID, p.protocolBaseFee.Load()) + if err := tx.Put(kv.PoolInfo, PoolProtocolBaseFeeKey, encID); err != nil { + return err + } + binary.BigEndian.PutUint64(encID, p.currentBaseFee.Load()) + if err := tx.Put(kv.PoolInfo, PoolPendingBaseFeeKey, encID); err != nil { + return err + } + + // clean - in-memory data structure as later as possible - because if during this Tx will happen error, + // DB will stay consitant but some in-memory structures may be alread cleaned, and retry will not work + // failed write transaction must not create side-effects + p.deletedTxs = p.deletedTxs[:0] + return nil +} + +func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { + lastSeenBlock, err := LastSeenBlock(tx) + if err != nil { + return err + } + p.lastSeenBlock.Store(lastSeenBlock) + + viewID, err := p._cache.View(ctx, coreTx) + if err != nil { + return err + } + + if err := tx.ForEach(kv.RecentLocalTransaction, nil, func(k, v []byte) error { + p.isLocalLRU.Add(string(v), struct{}{}) + return nil + }); err != nil { + return err + } + + txs := TxSlots{} + parseCtx := NewTxParseContext(p.rules, p.chainID) + parseCtx.WithSender(false) + + i := 0 + if err := tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { + addr, txRlp := v[:20], v[20:] + txs.Resize(uint(i + 1)) + txs.txs[i] = &TxSlot{} + + _, err := parseCtx.ParseTransaction(txRlp, 0, txs.txs[i], nil) + if err != nil { + return fmt.Errorf("err: %w, rlp: %x\n", err, txRlp) + } + txs.txs[i].rlp = nil // means that we don't need store it in db anymore + copy(txs.senders.At(i), addr) + + id, ok := p.senders.senderIDs[string(addr)] + if !ok { + p.senders.senderID++ + id = p.senders.senderID + p.senders.senderIDs[string(addr)] = id + p.senders.senderID2Addr[id] = string(addr) + } + txs.txs[i].senderID = id + binary.BigEndian.Uint64(v) + + isLocalTx := p.isLocalLRU.Contains(string(k)) + txs.isLocal[i] = isLocalTx + i++ + return nil + }); err != nil { + return err + } + + var protocolBaseFee, currentBaseFee uint64 + { + v, err := tx.GetOne(kv.PoolInfo, PoolProtocolBaseFeeKey) + if err != nil { + return err + } + if len(v) > 0 { + protocolBaseFee = binary.BigEndian.Uint64(v) + } + } + { + v, err := tx.GetOne(kv.PoolInfo, PoolPendingBaseFeeKey) + if err != nil { + return err + } + if len(v) > 0 { + currentBaseFee = binary.BigEndian.Uint64(v) + } + } + err = p.senders.onNewTxs(txs) + if err != nil { + return err + } + if err := addTxs(0, p._cache, viewID, coreTx, p.cfg, p.senders, txs, protocolBaseFee, currentBaseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.addLocked, p.discardLocked); err != nil { + return err + } + p.currentBaseFee.Store(currentBaseFee) + p.protocolBaseFee.Store(protocolBaseFee) + + return nil +} +func LastSeenBlock(tx kv.Getter) (uint64, error) { + v, err := tx.GetOne(kv.PoolInfo, PoolLastSeenBlockKey) + if err != nil { + return 0, err + } + if len(v) == 0 { + return 0, nil + } + return binary.BigEndian.Uint64(v), nil +} +func PutLastSeenBlock(tx kv.Putter, n uint64, buf []byte) error { + buf = common.EnsureEnoughSize(buf, 8) + binary.BigEndian.PutUint64(buf, n) + err := tx.Put(kv.PoolInfo, PoolLastSeenBlockKey, buf) + if err != nil { + return err + } + return nil +} +func ChainConfig(tx kv.Getter) (*chain.Config, error) { + v, err := tx.GetOne(kv.PoolInfo, PoolChainConfigKey) + if err != nil { + return nil, err + } + if len(v) == 0 { + return nil, nil + } + var config chain.Config + if err := json.Unmarshal(v, &config); err != nil { + return nil, fmt.Errorf("invalid chain config JSON in pool db: %w", err) + } + return &config, nil +} +func PutChainConfig(tx kv.Putter, cc *chain.Config, buf []byte) error { + wr := bytes.NewBuffer(buf) + if err := json.NewEncoder(wr).Encode(cc); err != nil { + return fmt.Errorf("invalid chain config JSON in pool db: %w", err) + } + if err := tx.Put(kv.PoolInfo, PoolChainConfigKey, wr.Bytes()); err != nil { + return err + } + return nil +} + +//nolint +func (p *TxPool) printDebug(prefix string) { + fmt.Printf("%s.pool.byHash\n", prefix) + for _, j := range p.byHash { + fmt.Printf("\tsenderID=%d, nonce=%d, tip=%d\n", j.Tx.senderID, j.Tx.nonce, j.Tx.tip) + } + fmt.Printf("%s.pool.queues.len: %d,%d,%d\n", prefix, p.pending.Len(), p.baseFee.Len(), p.queued.Len()) + for i := range p.pending.best { + p.pending.best[i].Tx.printDebug(fmt.Sprintf("%s.pending: %b", prefix, p.pending.best[i].subPool)) + } + for i := range *p.queued.best { + (*p.queued.best)[i].Tx.printDebug(fmt.Sprintf("%s.queued : %b", prefix, (*p.queued.best)[i].subPool)) + } +} +func (p *TxPool) logStats() { + //protocolBaseFee, currentBaseFee := p.protocolBaseFee.Load(), p.currentBaseFee.Load() + + p.lock.RLock() + defer p.lock.RUnlock() + + //idsInMem := p.senders.idsCount() + var m runtime.MemStats + runtime.ReadMemStats(&m) + + ctx := []interface{}{ + //"baseFee", fmt.Sprintf("%d, %dm", protocolBaseFee, currentBaseFee/1_000_000), + "block", p.lastSeenBlock.Load(), + "pending", p.pending.Len(), + "baseFee", p.baseFee.Len(), + "queued", p.queued.Len(), + } + cacheKeys := p._cache.Len() + if cacheKeys > 0 { + ctx = append(ctx, "cache_keys", cacheKeys) + } + ctx = append(ctx, "alloc_mb", m.Alloc/1024/1024, "sys_mb", m.Sys/1024/1024) + log.Info("[txpool] stat", ctx...) + //if ASSERT { + //stats := kvcache.DebugStats(p.senders.cache) + //log.Info(fmt.Sprintf("[txpool] cache %T, roots amount %d", p.senders.cache, len(stats))) + //for i := range stats { + // log.Info("[txpool] cache", "root", stats[i].BlockNum, "len", stats[i].Lenght) + //} + //stats := kvcache.DebugStats(p.senders.cache) + //log.Info(fmt.Sprintf("[txpool] cache %T, roots amount %d", p.senders.cache, len(stats))) + //for i := range stats { + // log.Info("[txpool] cache", "root", stats[i].BlockNum, "len", stats[i].Lenght) + //} + //ages := kvcache.DebugAges(p.senders.cache) + //for i := range ages { + // log.Info("[txpool] age", "age", ages[i].BlockNum, "amount", ages[i].Lenght) + //} + //} +} + +//Deprecated need switch to streaming-like +func (p *TxPool) deprecatedForEach(_ context.Context, f func(rlp, sender []byte, t SubPoolType), tx kv.Tx) error { + p.lock.RLock() + defer p.lock.RUnlock() + + p.byNonce.tree.Ascend(func(i btree.Item) bool { + mt := i.(*sortByNonce).metaTx + slot := mt.Tx + slotRlp := slot.rlp + if slot.rlp == nil { + v, err := tx.GetOne(kv.PoolTransaction, slot.idHash[:]) + if err != nil { + log.Error("[txpool] get tx from db", "err", err) + return false + } + if v == nil { + log.Error("[txpool] tx not found in db") + return false + } + slotRlp = v[20:] + } + + var sender []byte + found := false + for addr, senderID := range p.senders.senderIDs { // TODO: do we need inverted index here? + if slot.senderID == senderID { + sender = []byte(addr) + found = true + break + } + } + if !found { + return true + } + f(slotRlp, sender, mt.currentSubPool) + return true + }) + return nil +} + +var PoolChainConfigKey = []byte("pending_chain_config") +var PoolLastSeenBlockKey = []byte("pending_last_seen_block") +var PoolPendingBaseFeeKey = []byte("pending_base_fee") +var PoolProtocolBaseFeeKey = []byte("protocol_base_fee") + +// recentlyConnectedPeers does buffer IDs of recently connected good peers +// then sync of pooled Transaction can happen to all of then at once +// DoS protection and performance saving +// it doesn't track if peer disconnected, it's fine +type recentlyConnectedPeers struct { + lock sync.RWMutex + peers []PeerID +} + +func (l *recentlyConnectedPeers) AddPeer(p PeerID) { + l.lock.Lock() + defer l.lock.Unlock() + l.peers = append(l.peers, p) +} + +func (l *recentlyConnectedPeers) GetAndClean() []PeerID { + l.lock.Lock() + defer l.lock.Unlock() + peers := l.peers + l.peers = nil + return peers +} + +//nolint +func (sc *sendersBatch) printDebug(prefix string) { + fmt.Printf("%s.sendersBatch.sender\n", prefix) + //for i, j := range sc.senderInfo { + // fmt.Printf("\tid=%d,nonce=%d,balance=%d\n", i, j.nonce, j.balance.Uint64()) + //} +} + +// sendersBatch stores in-memory senders-related objects - which are different from DB (updated/dirty) +// flushing to db periodicaly. it doesn't play as read-cache (because db is small and memory-mapped - doesn't need cache) +// non thread-safe +type sendersBatch struct { + senderID uint64 + senderIDs map[string]uint64 + senderID2Addr map[uint64]string +} + +func newSendersCache() *sendersBatch { + return &sendersBatch{senderIDs: map[string]uint64{}, senderID2Addr: map[uint64]string{}} +} + +func (sc *sendersBatch) id(addr string) (uint64, bool) { + id, ok := sc.senderIDs[addr] + return id, ok +} +func (sc *sendersBatch) info(cache kvcache.Cache, viewID kvcache.ViewID, coreTx kv.Tx, id uint64) (nonce uint64, balance uint256.Int, err error) { + //cacheTotalCounter.Inc() + addr, ok := sc.senderID2Addr[id] + if !ok { + panic("must not happen") + } + encoded, err := cache.Get([]byte(addr), coreTx, viewID) + if err != nil { + return 0, emptySender.balance, err + } + if len(encoded) == 0 { + return emptySender.nonce, emptySender.balance, nil + } + nonce, balance, err = DecodeSender(encoded) + if err != nil { + return 0, emptySender.balance, err + } + return nonce, balance, nil +} + +func (sc *sendersBatch) onNewTxs(newTxs TxSlots) (err error) { + for i := 0; i < len(newTxs.txs); i++ { + id, ok := sc.id(string(newTxs.senders.At(i))) + if ok { + newTxs.txs[i].senderID = id + continue + } + sc.senderID++ + sc.senderIDs[string(newTxs.senders.At(i))] = sc.senderID + sc.senderID2Addr[sc.senderID] = string(newTxs.senders.At(i)) + + newTxs.txs[i].senderID = sc.senderID + } + return nil +} +func (sc *sendersBatch) onNewBlock(stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs TxSlots) error { + for _, diff := range stateChanges.ChangeBatch { + for _, change := range diff.Changes { // merge state changes + addrB := gointerfaces.ConvertH160toAddress(change.Address) + addr := string(addrB[:]) + _, ok := sc.id(addr) + if !ok { + sc.senderID++ + sc.senderIDs[addr] = sc.senderID + sc.senderID2Addr[sc.senderID] = addr + } + } + + for i := 0; i < unwindTxs.senders.Len(); i++ { + addr := string(unwindTxs.senders.At(i)) + id, ok := sc.id(addr) + if !ok { + sc.senderID++ + id = sc.senderID + sc.senderIDs[addr] = sc.senderID + sc.senderID2Addr[sc.senderID] = addr + } + unwindTxs.txs[i].senderID = id + } + + for i := 0; i < len(minedTxs.txs); i++ { + addr := string(minedTxs.senders.At(i)) + id, ok := sc.id(addr) + if !ok { + sc.senderID++ + id = sc.senderID + sc.senderIDs[addr] = sc.senderID + sc.senderID2Addr[sc.senderID] = addr + } + minedTxs.txs[i].senderID = id + } + } + return nil +} + +type ByNonce struct { + tree *btree.BTree +} + +func (b *ByNonce) ascend(senderID uint64, f func(*metaTx) bool) { + b.tree.AscendGreaterOrEqual(&sortByNonce{&metaTx{Tx: &TxSlot{senderID: senderID}}}, func(i btree.Item) bool { + mt := i.(*sortByNonce).metaTx + if mt.Tx.senderID != senderID { + return false + } + return f(mt) + }) +} +func (b *ByNonce) hasTxs(senderID uint64) bool { + has := false + b.ascend(senderID, func(*metaTx) bool { + has = true + return false + }) + return has +} +func (b *ByNonce) get(senderID, txNonce uint64) *metaTx { + if found := b.tree.Get(&sortByNonce{&metaTx{Tx: &TxSlot{senderID: senderID, nonce: txNonce}}}); found != nil { + return found.(*sortByNonce).metaTx + } + return nil +} + +//nolint +func (b *ByNonce) has(mt *metaTx) bool { + found := b.tree.Get(&sortByNonce{mt}) + return found != nil +} +func (b *ByNonce) delete(mt *metaTx) { b.tree.Delete(&sortByNonce{mt}) } +func (b *ByNonce) replaceOrInsert(mt *metaTx) *metaTx { + it := b.tree.ReplaceOrInsert(&sortByNonce{mt}) + if it != nil { + return it.(*sortByNonce).metaTx + } + return nil +} + type PendingPool struct { t SubPoolType best bestSlice @@ -1086,6 +1553,9 @@ func (p *PendingPool) PopWorst() *metaTx { p.best = p.best.UnsafeRemove(i) return i } +func (p *PendingPool) Updated(mt *metaTx) { + heap.Fix(p.worst, mt.worstIndex) +} func (p *PendingPool) Len() int { return len(p.best) } // UnsafeRemove - does break Heap invariants, but it has O(1) instead of O(log(n)) complexity. @@ -1111,6 +1581,11 @@ func (p *PendingPool) UnsafeAdd(i *metaTx) { p.worst.Push(i) p.best = p.best.UnsafeAdd(i) } +func (p *PendingPool) Add(i *metaTx) { + i.currentSubPool = p.t + heap.Push(p.worst, i) + p.best = p.best.UnsafeAdd(i) +} func (p *PendingPool) DebugPrint(prefix string) { for i, it := range p.best { fmt.Printf("%s.best: %d, %d, %d,%d\n", prefix, i, it.subPool, it.bestIndex, it.Tx.nonce) @@ -1163,6 +1638,16 @@ func (p *SubPool) Add(i *metaTx) { heap.Push(p.worst, i) } +func (p *SubPool) Remove(i *metaTx) { + heap.Remove(p.best, i.bestIndex) + heap.Remove(p.worst, i.worstIndex) + i.currentSubPool = 0 +} +func (p *SubPool) Updated(i *metaTx) { + heap.Fix(p.best, i.bestIndex) + heap.Fix(p.worst, i.worstIndex) +} + // UnsafeRemove - does break Heap invariants, but it has O(1) instead of O(log(n)) complexity. // Must manually call heap.Init after such changes. // Make sense to batch unsafe changes @@ -1266,459 +1751,6 @@ func (p *WorstQueue) Pop() interface{} { return item } -// MainLoop - does: -// send pending byHash to p2p: -// - new byHash -// - all pooled byHash to recently connected peers -// - all local pooled byHash to random peers periodically -// promote/demote transactions -// reorgs -func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs chan Hashes, send *Send, newSlotsStreams *NewSlotsStreams, notifyMiningAboutNewSlots func()) { - for { - if err := db.Update(ctx, func(tx kv.RwTx) error { - return coreDB.View(ctx, func(coreTx kv.Tx) error { - return p.fromDB(ctx, tx, coreTx) - }) - }); err != nil { - log.Error("[txpool] restore from db", "err", err) - } else { - break - } - time.Sleep(time.Second) - } - //p.logStats() - - syncToNewPeersEvery := time.NewTicker(p.cfg.SyncToNewPeersEvery) - defer syncToNewPeersEvery.Stop() - processRemoteTxsEvery := time.NewTicker(p.cfg.ProcessRemoteTxsEvery) - defer processRemoteTxsEvery.Stop() - commitEvery := time.NewTicker(p.cfg.CommitEvery) - defer commitEvery.Stop() - logEvery := time.NewTicker(p.cfg.LogEvery) - defer logEvery.Stop() - cacheEvictEvery := time.NewTicker(p.cfg.CacheEvictEvery) - defer cacheEvictEvery.Stop() - - localTxHashes := make([]byte, 0, 128) - remoteTxHashes := make([]byte, 0, 128) - - for { - select { - case <-ctx.Done(): - return - case <-logEvery.C: - p.logStats() - case <-processRemoteTxsEvery.C: - if !p.Started() { - continue - } - if err := p.processRemoteTxs(ctx); err != nil { - if s, ok := status.FromError(err); ok && retryLater(s.Code()) { - continue - } - if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { - continue - } - log.Error("[txpool] process batch remote txs", "err", err) - } - case <-commitEvery.C: - if db != nil { - t := time.Now() - written, err := p.flush(db) - if err != nil { - log.Error("[txpool] flush is local history", "err", err) - continue - } - writeToDbBytesCounter.Set(written) - log.Debug("[txpool] Commit", "written_kb", written/1024, "in", time.Since(t)) - } - case h := <-newTxs: //TODO: maybe send TxSlots object instead of Hashes? - notifyMiningAboutNewSlots() - if err := db.View(ctx, func(tx kv.Tx) error { - slotsRlp := make([][]byte, 0, h.Len()) - for i := 0; i < h.Len(); i++ { - slotRlp, err := p.GetRlp(tx, h.At(i)) - if err != nil { - return err - } - slotsRlp = append(slotsRlp, slotRlp) - } - newSlotsStreams.Broadcast(&proto_txpool.OnAddReply{RplTxs: slotsRlp}) - return nil - }); err != nil { - log.Error("[txpool] send new slots by grpc", "err", err) - } - - // first broadcast all local txs to all peers, then non-local to random sqrt(peersAmount) peers - localTxHashes = localTxHashes[:0] - remoteTxHashes = remoteTxHashes[:0] - - for i := 0; i < h.Len(); i++ { - if p.IsLocal(h.At(i)) { - localTxHashes = append(localTxHashes, h.At(i)...) - } else { - remoteTxHashes = append(localTxHashes, h.At(i)...) - } - } - - send.BroadcastLocalPooledTxs(localTxHashes) - send.BroadcastRemotePooledTxs(remoteTxHashes) - case <-syncToNewPeersEvery.C: // new peer - newPeers := p.recentlyConnectedPeers.GetAndClean() - if len(newPeers) == 0 { - continue - } - t := time.Now() - remoteTxHashes = p.AppendAllHashes(remoteTxHashes[:0]) - send.PropagatePooledTxsToPeersList(newPeers, remoteTxHashes) - propagateToNewPeerTimer.UpdateDuration(t) - case <-cacheEvictEvery.C: - p.senders.cache.Evict() - } - } -} - -func (p *TxPool) flush(db kv.RwDB) (written uint64, err error) { - defer writeToDbTimer.UpdateDuration(time.Now()) - p.lock.Lock() - defer p.lock.Unlock() - //it's important that write db tx is done inside lock, to make last writes visible for all read operations - if err := db.Update(context.Background(), func(tx kv.RwTx) error { - err = p.flushLocked(tx) - if err != nil { - return err - } - written, _, err = tx.(*mdbx.MdbxTx).SpaceDirty() - if err != nil { - return err - } - return nil - }); err != nil { - return 0, err - } - return written, nil -} -func (p *TxPool) flushLocked(tx kv.RwTx) (err error) { - for i := 0; i < len(p.deletedTxs); i++ { - if !p.byNonce.hasTxs(p.deletedTxs[i].Tx.senderID) { - addr, ok := p.senders.senderID2Addr[p.deletedTxs[i].Tx.senderID] - if ok { - delete(p.senders.senderID2Addr, p.deletedTxs[i].Tx.senderID) - delete(p.senders.senderIDs, addr) - } - } - if err := tx.Delete(kv.PoolTransaction, p.deletedTxs[i].Tx.idHash[:], nil); err != nil { - return err - } - p.deletedTxs[i] = nil // for gc - } - - txHashes := p.isLocalHashLRU.Keys() - encID := make([]byte, 8) - if err := tx.ClearBucket(kv.RecentLocalTransaction); err != nil { - return err - } - for i := range txHashes { - binary.BigEndian.PutUint64(encID, uint64(i)) - if err := tx.Append(kv.RecentLocalTransaction, encID, []byte(txHashes[i].(string))); err != nil { - return err - } - } - - v := make([]byte, 0, 1024) - for txHash, metaTx := range p.byHash { - if metaTx.Tx.rlp == nil { - continue - } - v = common.EnsureEnoughSize(v, 20+len(metaTx.Tx.rlp)) - for addr, id := range p.senders.senderIDs { // no inverted index - tradeoff flush speed for memory usage - if id == metaTx.Tx.senderID { - copy(v[:20], addr) - break - } - } - copy(v[20:], metaTx.Tx.rlp) - - has, _ := tx.Has(kv.PoolTransaction, []byte(txHash)) - if has { - panic("must not happen") - } - if err := tx.Put(kv.PoolTransaction, []byte(txHash), v); err != nil { - return err - } - metaTx.Tx.rlp = nil - } - - binary.BigEndian.PutUint64(encID, p.protocolBaseFee.Load()) - if err := tx.Put(kv.PoolInfo, PoolProtocolBaseFeeKey, encID); err != nil { - return err - } - binary.BigEndian.PutUint64(encID, p.currentBaseFee.Load()) - if err := tx.Put(kv.PoolInfo, PoolPendingBaseFeeKey, encID); err != nil { - return err - } - - // clean - in-memory data structure as later as possible - because if during this Tx will happen error, - // DB will stay consitant but some in-memory structures may be alread cleaned, and retry will not work - // failed write transaction must not create side-effects - p.deletedTxs = p.deletedTxs[:0] - p.discardReasons = p.discardReasons[:0] - return nil -} - -func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error { - p.lock.Lock() - defer p.lock.Unlock() - cache, err := p.senders.cache.View(ctx, coreTx) - if err != nil { - return err - } - - if err := tx.ForEach(kv.RecentLocalTransaction, nil, func(k, v []byte) error { - p.isLocalHashLRU.Add(string(v), struct{}{}) - return nil - }); err != nil { - return err - } - lastSeenBlock, err := LastSeenBlock(tx) - if err != nil { - return err - } - p.lastSeenBlock.Store(lastSeenBlock) - - txs := TxSlots{} - parseCtx := NewTxParseContext(p.rules, p.chainID) - parseCtx.WithSender(false) - - i := 0 - if err := tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { - addr, txRlp := v[:20], v[20:] - txs.Resize(uint(i + 1)) - txs.txs[i] = &TxSlot{} - - _, err := parseCtx.ParseTransaction(txRlp, 0, txs.txs[i], nil) - if err != nil { - return fmt.Errorf("err: %w, rlp: %x\n", err, txRlp) - } - txs.txs[i].rlp = nil // means that we don't need store it in db anymore - copy(txs.senders.At(i), addr) - - id, ok := p.senders.senderIDs[string(addr)] - if !ok { - p.senders.senderID++ - id = p.senders.senderID - p.senders.senderIDs[string(addr)] = id - p.senders.senderID2Addr[id] = string(addr) - } - txs.txs[i].senderID = id - binary.BigEndian.Uint64(v) - - _, isLocalTx := p.isLocalHashLRU.Get(string(k)) - txs.isLocal[i] = isLocalTx - i++ - return nil - }); err != nil { - return err - } - - var protocolBaseFee, currentBaseFee uint64 - { - v, err := tx.GetOne(kv.PoolInfo, PoolProtocolBaseFeeKey) - if err != nil { - return err - } - if len(v) > 0 { - protocolBaseFee = binary.BigEndian.Uint64(v) - } - } - { - v, err := tx.GetOne(kv.PoolInfo, PoolPendingBaseFeeKey) - if err != nil { - return err - } - if len(v) > 0 { - currentBaseFee = binary.BigEndian.Uint64(v) - } - } - err = p.senders.onNewTxs(txs) - if err != nil { - return err - } - if err := onNewTxs(0, cache, coreTx, p.cfg, p.senders, txs, protocolBaseFee, currentBaseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil { - return err - } - p.currentBaseFee.Store(currentBaseFee) - p.protocolBaseFee.Store(protocolBaseFee) - - return nil -} -func LastSeenBlock(tx kv.Getter) (uint64, error) { - v, err := tx.GetOne(kv.PoolInfo, PoolLastSeenBlockKey) - if err != nil { - return 0, err - } - if len(v) == 0 { - return 0, nil - } - return binary.BigEndian.Uint64(v), nil -} -func PutLastSeenBlock(tx kv.Putter, n uint64, buf []byte) error { - buf = common.EnsureEnoughSize(buf, 8) - binary.BigEndian.PutUint64(buf, n) - err := tx.Put(kv.PoolInfo, PoolLastSeenBlockKey, buf) - if err != nil { - return err - } - return nil -} -func ChainConfig(tx kv.Getter) (*chain.Config, error) { - v, err := tx.GetOne(kv.PoolInfo, PoolChainConfigKey) - if err != nil { - return nil, err - } - if len(v) == 0 { - return nil, nil - } - var config chain.Config - if err := json.Unmarshal(v, &config); err != nil { - return nil, fmt.Errorf("invalid chain config JSON in pool db: %w", err) - } - return &config, nil -} -func PutChainConfig(tx kv.Putter, cc *chain.Config, buf []byte) error { - wr := bytes.NewBuffer(buf) - if err := json.NewEncoder(wr).Encode(cc); err != nil { - return fmt.Errorf("invalid chain config JSON in pool db: %w", err) - } - if err := tx.Put(kv.PoolInfo, PoolChainConfigKey, wr.Bytes()); err != nil { - return err - } - return nil -} - -//nolint -func (p *TxPool) printDebug(prefix string) { - fmt.Printf("%s.pool.byHash\n", prefix) - for _, j := range p.byHash { - fmt.Printf("\tsenderID=%d, nonce=%d, tip=%d\n", j.Tx.senderID, j.Tx.nonce, j.Tx.tip) - } - fmt.Printf("%s.pool.queues.len: %d,%d,%d\n", prefix, p.pending.Len(), p.baseFee.Len(), p.queued.Len()) - for i := range p.pending.best { - p.pending.best[i].Tx.printDebug(fmt.Sprintf("%s.pending: %b", prefix, p.pending.best[i].subPool)) - } - for i := range *p.queued.best { - (*p.queued.best)[i].Tx.printDebug(fmt.Sprintf("%s.queued : %b", prefix, (*p.queued.best)[i].subPool)) - } -} -func (p *TxPool) logStats() { - //protocolBaseFee, currentBaseFee := p.protocolBaseFee.Load(), p.currentBaseFee.Load() - - p.lock.RLock() - defer p.lock.RUnlock() - - //idsInMem := p.senders.idsCount() - var m runtime.MemStats - runtime.ReadMemStats(&m) - - ctx := []interface{}{ - //"baseFee", fmt.Sprintf("%d, %dm", protocolBaseFee, currentBaseFee/1_000_000), - "block", p.lastSeenBlock.Load(), - "pending", p.pending.Len(), - "baseFee", p.baseFee.Len(), - "queued", p.queued.Len(), - } - cacheKeys := p.senders.cache.Len() - if cacheKeys > 0 { - ctx = append(ctx, "state_cache_keys", cacheKeys) - } - ctx = append(ctx, "alloc_mb", m.Alloc/1024/1024, "sys_mb", m.Sys/1024/1024) - log.Info("[txpool] stat", ctx...) - //if ASSERT { - //stats := kvcache.DebugStats(p.senders.cache) - //log.Info(fmt.Sprintf("[txpool] cache %T, roots amount %d", p.senders.cache, len(stats))) - //for i := range stats { - // log.Info("[txpool] cache", "root", stats[i].BlockNum, "len", stats[i].Lenght) - //} - //stats := kvcache.DebugStats(p.senders.cache) - //log.Info(fmt.Sprintf("[txpool] cache %T, roots amount %d", p.senders.cache, len(stats))) - //for i := range stats { - // log.Info("[txpool] cache", "root", stats[i].BlockNum, "len", stats[i].Lenght) - //} - //ages := kvcache.DebugAges(p.senders.cache) - //for i := range ages { - // log.Info("[txpool] age", "age", ages[i].BlockNum, "amount", ages[i].Lenght) - //} - //} -} - -//Deprecated need switch to streaming-like -func (p *TxPool) deprecatedForEach(_ context.Context, f func(rlp, sender []byte, t SubPoolType), tx kv.Tx) error { - p.lock.RLock() - defer p.lock.RUnlock() - - p.byNonce.tree.Ascend(func(i btree.Item) bool { - mt := i.(*sortByNonce).metaTx - slot := mt.Tx - slotRlp := slot.rlp - if slot.rlp == nil { - v, err := tx.GetOne(kv.PoolTransaction, slot.idHash[:]) - if err != nil { - log.Error("[txpool] get tx from db", "err", err) - return false - } - if v == nil { - log.Error("[txpool] tx not found in db") - return false - } - slotRlp = v[20:] - } - - var sender []byte - found := false - for addr, senderID := range p.senders.senderIDs { // TODO: do we need inverted index here? - if slot.senderID == senderID { - sender = []byte(addr) - found = true - break - } - } - if !found { - return true - } - f(slotRlp, sender, mt.currentSubPool) - return true - }) - return nil -} - -var PoolChainConfigKey = []byte("pending_chain_config") -var PoolLastSeenBlockKey = []byte("pending_last_seen_block") -var PoolPendingBaseFeeKey = []byte("pending_base_fee") -var PoolProtocolBaseFeeKey = []byte("protocol_base_fee") - -// recentlyConnectedPeers does buffer IDs of recently connected good peers -// then sync of pooled Transaction can happen to all of then at once -// DoS protection and performance saving -// it doesn't track if peer disconnected, it's fine -type recentlyConnectedPeers struct { - lock sync.RWMutex - peers []PeerID -} - -func (l *recentlyConnectedPeers) AddPeer(p PeerID) { - l.lock.Lock() - defer l.lock.Unlock() - l.peers = append(l.peers, p) -} - -func (l *recentlyConnectedPeers) GetAndClean() []PeerID { - l.lock.Lock() - defer l.lock.Unlock() - peers := l.peers - l.peers = nil - return peers -} - func min(a, b uint64) uint64 { if a <= b { return a diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index fdecb38fa..1220cd2e6 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -234,7 +234,7 @@ func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawFeeCap, rawSender []b panic(err) } txs.senders = append(txs.senders, senders.At(i%senders.Len())...) - txs.isLocal = append(txs.isLocal, false) + txs.isLocal = append(txs.isLocal, true) } return sendersInfo, senderIDs, txs, true @@ -542,7 +542,7 @@ func FuzzOnNewBlocks(f *testing.F) { // go to first fork //fmt.Printf("ll1: %d,%d,%d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len()) txs1, txs2, p2pReceived, txs3 := splitDataset(txs) - err = pool.OnNewBlock(ctx, change, txs1, TxSlots{}) + err = pool.OnNewBlock(ctx, change, txs1, TxSlots{}, nil) assert.NoError(err) check(txs1, TxSlots{}, "fork1") checkNotify(txs1, TxSlots{}, "fork1") @@ -554,7 +554,7 @@ func FuzzOnNewBlocks(f *testing.F) { {BlockHeight: 1, BlockHash: h1, PrevBlockHeight: 0, PrevBlockHash: h1, ProtocolBaseFee: currentBaseFee}, }, } - err = pool.OnNewBlock(ctx, change, TxSlots{}, txs2) + err = pool.OnNewBlock(ctx, change, TxSlots{}, txs2, nil) check(TxSlots{}, txs2, "fork1 mined") checkNotify(TxSlots{}, txs2, "fork1 mined") @@ -565,7 +565,7 @@ func FuzzOnNewBlocks(f *testing.F) { {BlockHeight: 0, BlockHash: h1, Direction: remote.Direction_UNWIND, PrevBlockHeight: 1, PrevBlockHash: h1, ProtocolBaseFee: currentBaseFee}, }, } - err = pool.OnNewBlock(ctx, change, txs2, TxSlots{}) + err = pool.OnNewBlock(ctx, change, txs2, TxSlots{}, nil) assert.NoError(err) check(txs2, TxSlots{}, "fork2") checkNotify(txs2, TxSlots{}, "fork2") @@ -576,7 +576,7 @@ func FuzzOnNewBlocks(f *testing.F) { {BlockHeight: 1, BlockHash: h22, PrevBlockHeight: 0, PrevBlockHash: h1, ProtocolBaseFee: currentBaseFee}, }, } - err = pool.OnNewBlock(ctx, change, TxSlots{}, txs3) + err = pool.OnNewBlock(ctx, change, TxSlots{}, txs3, nil) assert.NoError(err) check(TxSlots{}, txs3, "fork2 mined") checkNotify(TxSlots{}, txs3, "fork2 mined") diff --git a/txpool/pool_test.go b/txpool/pool_test.go index 889e19f68..23d97c0dd 100644 --- a/txpool/pool_test.go +++ b/txpool/pool_test.go @@ -15,3 +15,42 @@ */ package txpool + +import ( + "container/heap" + "testing" +) + +func BenchmarkName(b *testing.B) { + txs := make([]*metaTx, 10_000) + p := NewSubPool(BaseFeeSubPool) + for i := 0; i < len(txs); i++ { + txs[i] = &metaTx{Tx: &TxSlot{}} + } + for i := 0; i < len(txs); i++ { + p.UnsafeAdd(txs[i]) + } + p.EnforceInvariants() + b.ResetTimer() + for i := 0; i < b.N; i++ { + txs[0].timestamp = 1 + heap.Fix(p.best, txs[0].bestIndex) + heap.Fix(p.worst, txs[0].worstIndex) + } +} + +func BenchmarkName2(b *testing.B) { + txs := make([]*metaTx, 10_000) + p := NewSubPool(BaseFeeSubPool) + for i := 0; i < len(txs); i++ { + txs[i] = &metaTx{Tx: &TxSlot{}} + } + for i := 0; i < len(txs); i++ { + p.UnsafeAdd(txs[i]) + } + p.EnforceInvariants() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + } +} diff --git a/txpool/types.go b/txpool/types.go index 879b510d1..6cffe872d 100644 --- a/txpool/types.go +++ b/txpool/types.go @@ -55,7 +55,7 @@ type TxParseContext struct { sig [65]byte withSender bool isProtected bool - reject func([]byte) bool + checkHash func([]byte) error cfg TxParsseConfig } @@ -125,9 +125,10 @@ const ( const ParseTransactionErrorPrefix = "parse transaction payload" var ErrRejected = errors.New("rejected") +var ErrAlreadyKnown = errors.New("already known") -func (ctx *TxParseContext) Reject(f func(hash []byte) bool) { ctx.reject = f } -func (ctx *TxParseContext) WithSender(v bool) { ctx.withSender = v } +func (ctx *TxParseContext) Reject(f func(hash []byte) error) { ctx.checkHash = f } +func (ctx *TxParseContext) WithSender(v bool) { ctx.withSender = v } // ParseTransaction extracts all the information from the transactions's payload (RLP) necessary to build TxSlot // it also performs syntactic validation of the transactions @@ -371,8 +372,10 @@ func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int, slot *TxSlo if !ctx.withSender { return p, nil } - if ctx.reject != nil && ctx.reject(slot.idHash[:32]) { - return p, ErrRejected + if ctx.checkHash != nil { + if err := ctx.checkHash(slot.idHash[:32]); err != nil { + return p, err + } } // Computing sigHash (hash used to recover sender from the signature)