Pool: discard reasons, LRU (#80)

This commit is contained in:
Alex Sharov 2021-09-20 12:44:29 +07:00 committed by GitHub
parent 1c9602c785
commit 6a7eeccad7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1363 additions and 998 deletions

View File

@ -3,12 +3,13 @@
package remote package remote
import ( import (
context "context" "context"
types "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
grpc "google.golang.org/grpc" "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
codes "google.golang.org/grpc/codes" "google.golang.org/grpc"
status "google.golang.org/grpc/status" "google.golang.org/grpc/codes"
emptypb "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
) )
// This is a compile-time assertion to ensure that this generated file // This is a compile-time assertion to ensure that this generated file

View File

@ -22,7 +22,6 @@ import (
"fmt" "fmt"
"sort" "sort"
"sync" "sync"
goatomic "sync/atomic"
"time" "time"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -31,19 +30,16 @@ import (
"github.com/ledgerwatch/erigon-lib/gointerfaces" "github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"go.uber.org/atomic" "go.uber.org/atomic"
) )
type Cache interface { type Cache interface {
// View - returns CacheView consistent with givent kv.Tx // View - returns CacheView consistent with givent kv.Tx
View(ctx context.Context, tx kv.Tx) (CacheView, error) View(ctx context.Context, tx kv.Tx) (ViewID, error)
OnNewBlock(sc *remote.StateChangeBatch) OnNewBlock(sc *remote.StateChangeBatch)
Evict() int //Evict() int
Len() int Len() int
} Get(k []byte, tx kv.Tx, id ViewID) ([]byte, error)
type CacheView interface {
Get(k []byte, tx kv.Tx) ([]byte, error)
} }
// Coherent works on top of Database Transaction and pair Coherent+ReadTransaction must // Coherent works on top of Database Transaction and pair Coherent+ReadTransaction must
@ -68,36 +64,46 @@ type CacheView interface {
// it until either cache with the given identifier appears, or timeout (indicating that the cache update // it until either cache with the given identifier appears, or timeout (indicating that the cache update
// mechanism is broken and cache is likely invalidated). // mechanism is broken and cache is likely invalidated).
// //
// Tech details:
// - If found in cache - return value without copy (reader can rely on fact that data are immutable until end of db transaction)
// - Otherwise just read from db (no requests deduplication for now - preliminary optimization).
//
// Pair.Value == nil - is a marker of absense key in db // Pair.Value == nil - is a marker of absense key in db
// Coherent
// High-level guaranties:
// - Keys/Values returned by cache are valid/immutable until end of db transaction
// - CacheView is always coherent with given db transaction -
//
// Rules of set view.isCanonical value:
// - method View can't parent.Clone() - because parent view is not coherent with current kv.Tx
// - only OnNewBlock method may do parent.Clone() and apply StateChanges to create coherent view of kv.Tx
// - parent.Clone() can't be caled if parent.isCanonical=false
// - only OnNewBlock method can set view.isCanonical=true
// Rules of filling cache.evictList:
// - changes in Canonical View SHOULD reflect in evictList
// - changes in Non-Canonical View SHOULD NOT reflect in evictList
type Coherent struct { type Coherent struct {
hits, miss, timeout, keys *metrics.Counter hits, miss, timeout *metrics.Counter
evict *metrics.Summary keys, keys2 *metrics.Counter
roots map[uint64]*CoherentView latestViewID ViewID
rootsLock sync.RWMutex latestView *CoherentView
cfg CoherentCacheConfig evictList *List
roots map[ViewID]*CoherentView
lock sync.RWMutex
cfg CoherentCacheConfig
} }
type CoherentView struct { type CoherentView struct {
hits, miss *metrics.Counter
cache *btree.BTree cache *btree.BTree
lock sync.RWMutex
id atomic.Uint64
ready chan struct{} // close when ready ready chan struct{} // close when ready
readyChanClosed atomic.Bool // protecting `ready` field from double-close (on unwind). Consumers don't need check this field. readyChanClosed atomic.Bool // protecting `ready` field from double-close (on unwind). Consumers don't need check this field.
// Views marked as `Canonical` if it received onNewBlock message
// we may drop `Non-Canonical` views even if they had fresh keys
// keys added to `Non-Canonical` views SHOULD NOT be added to evictList
// cache.latestView is always `Canonical`
isCanonical bool
} }
var _ Cache = (*Coherent)(nil) // compile-time interface check var _ Cache = (*Coherent)(nil) // compile-time interface check
var _ CacheView = (*CoherentView)(nil) // compile-time interface check //var _ CacheView = (*CoherentView)(nil) // compile-time interface check
type Pair struct {
K, V []byte
t uint64 //TODO: can be uint32 if remember first txID and use it as base zero. because it's monotonic.
}
func (p *Pair) Less(than btree.Item) bool { return bytes.Compare(p.K, than.(*Pair).K) < 0 }
type CoherentCacheConfig struct { type CoherentCacheConfig struct {
KeepViews uint64 // keep in memory up to this amount of views, evict older KeepViews uint64 // keep in memory up to this amount of views, evict older
@ -110,63 +116,83 @@ type CoherentCacheConfig struct {
var DefaultCoherentCacheConfig = CoherentCacheConfig{ var DefaultCoherentCacheConfig = CoherentCacheConfig{
KeepViews: 50, KeepViews: 50,
NewBlockWait: 50 * time.Millisecond, NewBlockWait: 50 * time.Millisecond,
KeysLimit: 1_000_000, KeysLimit: 400_000,
MetricsLabel: "default", MetricsLabel: "default",
WithStorage: false, WithStorage: true,
} }
func New(cfg CoherentCacheConfig) *Coherent { func New(cfg CoherentCacheConfig) *Coherent {
return &Coherent{roots: map[uint64]*CoherentView{}, cfg: cfg, return &Coherent{
miss: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_total{result="miss",name="%s"}`, cfg.MetricsLabel)), roots: map[ViewID]*CoherentView{},
hits: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_total{result="hit",name="%s"}`, cfg.MetricsLabel)), evictList: NewList(),
timeout: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_timeout_total{name="%s"}`, cfg.MetricsLabel)), cfg: cfg,
keys: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_keys_total{name="%s"}`, cfg.MetricsLabel)), miss: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_total{result="miss",name="%s"}`, cfg.MetricsLabel)),
evict: metrics.GetOrCreateSummary(fmt.Sprintf(`cache_evict{name="%s"}`, cfg.MetricsLabel)), hits: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_total{result="hit",name="%s"}`, cfg.MetricsLabel)),
timeout: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_timeout_total{name="%s"}`, cfg.MetricsLabel)),
keys: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_keys_total{name="%s"}`, cfg.MetricsLabel)),
keys2: metrics.GetOrCreateCounter(fmt.Sprintf(`cache_list_total{name="%s"}`, cfg.MetricsLabel)),
} }
} }
// selectOrCreateRoot - used for usual getting root // selectOrCreateRoot - used for usual getting root
func (c *Coherent) selectOrCreateRoot(txID uint64) *CoherentView { func (c *Coherent) selectOrCreateRoot(viewID ViewID) *CoherentView {
c.rootsLock.Lock() c.lock.Lock()
defer c.rootsLock.Unlock() defer c.lock.Unlock()
r, ok := c.roots[txID] r, ok := c.roots[viewID]
if ok { if ok {
return r return r
} }
r = &CoherentView{ready: make(chan struct{}), hits: c.hits, miss: c.miss} r = &CoherentView{ready: make(chan struct{})}
r.id.Store(txID) if prevView, ok := c.roots[viewID-1]; ok {
c.roots[txID] = r //log.Info("advance: clone", "from", viewID-1, "to", viewID)
r.cache = prevView.cache.Clone()
r.isCanonical = prevView.isCanonical
} else {
//log.Info("advance: new", "to", viewID)
r.cache = btree.New(32)
}
c.roots[viewID] = r
return r return r
} }
// advanceRoot - used for advancing root onNewBlock // advanceRoot - used for advancing root onNewBlock
func (c *Coherent) advanceRoot(viewID uint64) (r *CoherentView) { func (c *Coherent) advanceRoot(viewID ViewID) (r *CoherentView) {
c.rootsLock.Lock()
defer c.rootsLock.Unlock()
r, rootExists := c.roots[viewID] r, rootExists := c.roots[viewID]
if !rootExists { if !rootExists {
r = &CoherentView{ready: make(chan struct{}), hits: c.hits, miss: c.miss} r = &CoherentView{ready: make(chan struct{})}
r.id.Store(viewID)
c.roots[viewID] = r c.roots[viewID] = r
} }
if prevView, ok := c.roots[viewID-1]; ok && prevView.isCanonical {
//TODO: need check if c.latest hash is still canonical. If not - can't clone from it
if prevView, ok := c.roots[viewID-1]; ok {
//log.Info("advance: clone", "from", viewID-1, "to", viewID) //log.Info("advance: clone", "from", viewID-1, "to", viewID)
r.cache = prevView.Clone() r.cache = prevView.cache.Clone()
} else { } else {
c.evictList.Init()
if r.cache == nil { if r.cache == nil {
//log.Info("advance: new", "to", viewID) //log.Info("advance: new", "to", viewID)
r.cache = btree.New(32) r.cache = btree.New(32)
} else {
r.cache.Ascend(func(i btree.Item) bool {
c.evictList.PushFront(i.(*Element))
return true
})
} }
} }
r.isCanonical = true
c.evictRoots()
c.latestViewID = viewID
c.latestView = r
c.keys.Set(uint64(c.latestView.cache.Len()))
c.keys2.Set(uint64(c.evictList.Len()))
return r return r
} }
func (c *Coherent) OnNewBlock(stateChanges *remote.StateChangeBatch) { func (c *Coherent) OnNewBlock(stateChanges *remote.StateChangeBatch) {
r := c.advanceRoot(stateChanges.DatabaseViewID) c.lock.Lock()
r.lock.Lock() defer c.lock.Unlock()
defer r.lock.Unlock() id := ViewID(stateChanges.DatabaseViewID)
r := c.advanceRoot(id)
for _, sc := range stateChanges.ChangeBatch { for _, sc := range stateChanges.ChangeBatch {
for i := range sc.Changes { for i := range sc.Changes {
switch sc.Changes[i].Action { switch sc.Changes[i].Action {
@ -174,10 +200,10 @@ func (c *Coherent) OnNewBlock(stateChanges *remote.StateChangeBatch) {
addr := gointerfaces.ConvertH160toAddress(sc.Changes[i].Address) addr := gointerfaces.ConvertH160toAddress(sc.Changes[i].Address)
v := sc.Changes[i].Data v := sc.Changes[i].Data
//fmt.Printf("set: %x,%x\n", addr, v) //fmt.Printf("set: %x,%x\n", addr, v)
r.cache.ReplaceOrInsert(&Pair{K: addr[:], V: v, t: sc.BlockHeight}) c.add(addr[:], v, r, id)
case remote.Action_DELETE: case remote.Action_DELETE:
addr := gointerfaces.ConvertH160toAddress(sc.Changes[i].Address) addr := gointerfaces.ConvertH160toAddress(sc.Changes[i].Address)
r.cache.ReplaceOrInsert(&Pair{K: addr[:], V: nil, t: sc.BlockHeight}) c.add(addr[:], nil, r, id)
case remote.Action_CODE, remote.Action_STORAGE: case remote.Action_CODE, remote.Action_STORAGE:
//skip //skip
default: default:
@ -191,11 +217,12 @@ func (c *Coherent) OnNewBlock(stateChanges *remote.StateChangeBatch) {
copy(k, addr[:]) copy(k, addr[:])
binary.BigEndian.PutUint64(k[20:], sc.Changes[i].Incarnation) binary.BigEndian.PutUint64(k[20:], sc.Changes[i].Incarnation)
copy(k[20+8:], loc[:]) copy(k[20+8:], loc[:])
r.cache.ReplaceOrInsert(&Pair{K: k, V: change.Data, t: sc.BlockHeight}) c.add(k, change.Data, r, id)
} }
} }
} }
} }
switched := r.readyChanClosed.CAS(false, true) switched := r.readyChanClosed.CAS(false, true)
if switched { if switched {
close(r.ready) //broadcast close(r.ready) //broadcast
@ -203,48 +230,47 @@ func (c *Coherent) OnNewBlock(stateChanges *remote.StateChangeBatch) {
//log.Info("on new block handled", "viewID", stateChanges.DatabaseViewID) //log.Info("on new block handled", "viewID", stateChanges.DatabaseViewID)
} }
func (c *Coherent) View(ctx context.Context, tx kv.Tx) (CacheView, error) { type ViewID uint64
r := c.selectOrCreateRoot(tx.ViewID())
func (c *Coherent) View(ctx context.Context, tx kv.Tx) (ViewID, error) {
id := ViewID(tx.ViewID())
r := c.selectOrCreateRoot(id)
select { // fast non-blocking path select { // fast non-blocking path
case <-r.ready: case <-r.ready:
//fmt.Printf("recv broadcast: %d,%d\n", r.id.Load(), tx.ViewID()) //fmt.Printf("recv broadcast: %d\n", id)
return r, nil return id, nil
default: default:
} }
select { // slow blocking path select { // slow blocking path
case <-r.ready: case <-r.ready:
//fmt.Printf("recv broadcast2: %d,%d\n", r.id.Load(), tx.ViewID()) //fmt.Printf("recv broadcast2: %d\n", tx.ViewID())
case <-ctx.Done(): case <-ctx.Done():
return nil, fmt.Errorf("kvcache rootNum=%x, %w", tx.ViewID(), ctx.Err()) return 0, fmt.Errorf("kvcache rootNum=%x, %w", tx.ViewID(), ctx.Err())
case <-time.After(c.cfg.NewBlockWait): //TODO: switch to timer to save resources case <-time.After(c.cfg.NewBlockWait): //TODO: switch to timer to save resources
c.timeout.Inc() c.timeout.Inc()
r.lock.Lock() //log.Info("timeout", "db_id", id, "has_btree", r.cache != nil)
//log.Info("timeout", "mem_id", r.id.Load(), "db_id", tx.ViewID(), "has_btree", r.cache != nil)
if r.cache == nil {
//parent := c.selectOrCreateRoot(tx.ViewID() - 1)
//if parent.cache != nil {
// r.cache = parent.Clone()
//} else {
// r.cache = btree.New(32)
//}
r.cache = btree.New(32)
}
r.lock.Unlock()
} }
return r, nil return ViewID(tx.ViewID()), nil
} }
func (c *CoherentView) Get(k []byte, tx kv.Tx) ([]byte, error) { func (c *Coherent) Get(k []byte, tx kv.Tx, id ViewID) ([]byte, error) {
c.lock.RLock() c.lock.RLock()
it := c.cache.Get(&Pair{K: k}) isLatest := c.latestViewID == id
r, ok := c.roots[id]
if !ok {
return nil, fmt.Errorf("too old ViewID: %d, latestViewID=%d", id, c.latestViewID)
}
it := r.cache.Get(&Element{K: k})
c.lock.RUnlock() c.lock.RUnlock()
if it != nil { if it != nil {
c.hits.Inc() c.hits.Inc()
goatomic.StoreUint64(&it.(*Pair).t, c.id.Load()) if isLatest {
//fmt.Printf("from cache %x: %#x,%x\n", c.id.Load(), k, it.(*Pair).V) c.evictList.MoveToFront(it.(*Element))
return it.(*Pair).V, nil }
//fmt.Printf("from cache: %#x,%x\n", k, it.(*Element).V)
return it.(*Element).V, nil
} }
c.miss.Inc() c.miss.Inc()
@ -252,27 +278,38 @@ func (c *CoherentView) Get(k []byte, tx kv.Tx) ([]byte, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
//fmt.Printf("from db %x: %#x,%x\n", c.id.Load(), k, v) //fmt.Printf("from db: %#x,%x\n", k, v)
it = &Pair{K: k, V: common.Copy(v), t: c.id.Load()}
c.lock.Lock() c.lock.Lock()
c.cache.ReplaceOrInsert(it) v = c.add(common.Copy(k), common.Copy(v), r, id).V
c.lock.Unlock() c.lock.Unlock()
return it.(*Pair).V, nil return v, nil
} }
func (c *CoherentView) Len() int {
c.lock.RLock() func (c *Coherent) removeOldest(r *CoherentView) {
defer c.lock.RUnlock() e := c.evictList.Back()
return c.cache.Len() if e != nil {
} c.evictList.Remove(e)
func (c *CoherentView) Clone() *btree.BTree { r.cache.Delete(e)
c.lock.Lock()
defer c.lock.Unlock()
if c.cache == nil {
c.cache = btree.New(32)
return btree.New(32) // return independent tree because nothing to share
} }
return c.cache.Clone() }
func (c *Coherent) add(k, v []byte, r *CoherentView, id ViewID) *Element {
it := &Element{K: k, V: v}
replaced := r.cache.ReplaceOrInsert(it)
if c.latestViewID != id {
//fmt.Printf("add to non-last viewID: %d<%d\n", c.latestViewID, id)
return it
}
if replaced != nil {
c.evictList.Remove(replaced.(*Element))
}
c.evictList.PushFront(it)
evict := c.evictList.Len() > c.cfg.KeysLimit
// Verify size not exceeded
if evict {
c.removeOldest(r)
}
return it
} }
type Stat struct { type Stat struct {
@ -287,51 +324,24 @@ func DebugStats(cache Cache) []Stat {
if !ok { if !ok {
return res return res
} }
casted.rootsLock.RLock() casted.lock.RLock()
defer casted.rootsLock.RUnlock() defer casted.lock.RUnlock()
for root, r := range casted.roots { for root, r := range casted.roots {
res = append(res, Stat{ res = append(res, Stat{
BlockNum: root, BlockNum: uint64(root),
Lenght: r.Len(), Lenght: r.cache.Len(),
}) })
} }
sort.Slice(res, func(i, j int) bool { return res[i].BlockNum < res[j].BlockNum }) sort.Slice(res, func(i, j int) bool { return res[i].BlockNum < res[j].BlockNum })
return res return res
} }
func DebugAges(cache Cache) []Stat {
res := []Stat{}
casted, ok := cache.(*Coherent)
if !ok {
return res
}
casted.rootsLock.RLock()
defer casted.rootsLock.RUnlock()
_, latestView := cache.(*Coherent).lastRoot()
latestView.lock.RLock()
defer latestView.lock.RUnlock()
counters := map[uint64]int{}
latestView.cache.Ascend(func(it btree.Item) bool {
age := goatomic.LoadUint64(&it.(*Pair).t)
_, ok := counters[age]
if !ok {
counters[age] = 0
}
counters[age]++
return true
})
for i, j := range counters {
res = append(res, Stat{BlockNum: i, Lenght: j})
}
sort.Slice(res, func(i, j int) bool { return res[i].BlockNum < res[j].BlockNum })
return res
}
func AssertCheckValues(ctx context.Context, tx kv.Tx, cache Cache) (int, error) { func AssertCheckValues(ctx context.Context, tx kv.Tx, cache Cache) (int, error) {
defer func(t time.Time) { fmt.Printf("AssertCheckValues:327: %s\n", time.Since(t)) }(time.Now()) defer func(t time.Time) { fmt.Printf("AssertCheckValues:327: %s\n", time.Since(t)) }(time.Now())
c, err := cache.View(ctx, tx) viewID, err := cache.View(ctx, tx)
if err != nil { if err != nil {
return 0, err return 0, err
} }
casted, ok := c.(*CoherentView) casted, ok := cache.(*Coherent)
if !ok { if !ok {
return 0, nil return 0, nil
} }
@ -339,15 +349,19 @@ func AssertCheckValues(ctx context.Context, tx kv.Tx, cache Cache) (int, error)
casted.lock.RLock() casted.lock.RLock()
defer casted.lock.RUnlock() defer casted.lock.RUnlock()
//log.Info("AssertCheckValues start", "db_id", tx.ViewID(), "mem_id", casted.id.Load(), "len", casted.cache.Len()) //log.Info("AssertCheckValues start", "db_id", tx.ViewID(), "mem_id", casted.id.Load(), "len", casted.cache.Len())
casted.cache.Ascend(func(i btree.Item) bool { view, ok := casted.roots[viewID]
k, v := i.(*Pair).K, i.(*Pair).V if !ok {
return 0, nil
}
view.cache.Ascend(func(i btree.Item) bool {
k, v := i.(*Element).K, i.(*Element).V
var dbV []byte var dbV []byte
dbV, err = tx.GetOne(kv.PlainState, k) dbV, err = tx.GetOne(kv.PlainState, k)
if err != nil { if err != nil {
return false return false
} }
if !bytes.Equal(dbV, v) { if !bytes.Equal(dbV, v) {
err = fmt.Errorf("key: %x, has different values: %x != %x, viewID: %d", k, v, dbV, casted.id.Load()) err = fmt.Errorf("key: %x, has different values: %x != %x", k, v, dbV)
return false return false
} }
checked++ checked++
@ -355,21 +369,16 @@ func AssertCheckValues(ctx context.Context, tx kv.Tx, cache Cache) (int, error)
}) })
return checked, err return checked, err
} }
func (c *Coherent) evictRoots() {
func (c *Coherent) lastRoot() (latestTxId uint64, view *CoherentView) { if c.latestViewID <= ViewID(c.cfg.KeepViews) {
c.rootsLock.RLock() return
defer c.rootsLock.RUnlock()
for txID := range c.roots { // max
if txID > latestTxId {
latestTxId = txID
}
} }
return latestTxId, c.roots[latestTxId] if len(c.roots) < int(c.cfg.KeepViews) {
} return
func (c *Coherent) evictRoots(to uint64) { }
c.rootsLock.Lock() to := c.latestViewID - ViewID(c.cfg.KeepViews)
defer c.rootsLock.Unlock() //fmt.Printf("collecting: %d\n", to)
var toDel []uint64 var toDel []ViewID
for txId := range c.roots { for txId := range c.roots {
if txId > to { if txId > to {
continue continue
@ -382,78 +391,222 @@ func (c *Coherent) evictRoots(to uint64) {
} }
} }
func (c *Coherent) Len() int { func (c *Coherent) Len() int {
_, lastView := c.lastRoot() c.lock.RLock()
return lastView.Len() defer c.lock.RUnlock()
} if c.latestView == nil {
func (c *Coherent) Evict() int {
defer c.evict.UpdateDuration(time.Now())
latestBlockNum, lastView := c.lastRoot()
c.evictRoots(latestBlockNum - 10)
if lastView == nil {
return 0 return 0
} }
keysAmount := lastView.Len() return c.latestView.cache.Len() //todo: is it same with cache.len()?
c.keys.Set(uint64(keysAmount))
lastView.evictOld(c.cfg.KeepViews, c.cfg.KeysLimit)
//lastView.evictNew2Random(c.cfg.KeysLimit)
return lastView.Len()
} }
//nolint // Element is an element of a linked list.
func (c *CoherentView) evictOld(dropOlder uint64, keysLimit int) { type Element struct {
if c.Len() < keysLimit { // Next and previous pointers in the doubly-linked list of elements.
// To simplify the implementation, internally a list l is implemented
// as a ring, such that &l.root is both the next element of the last
// list element (l.Back()) and the previous element of the first list
// element (l.Front()).
next, prev *Element
// The list to which this element belongs.
list *List
// The value stored with this element.
K, V []byte
}
func (p *Element) Less(than btree.Item) bool { return bytes.Compare(p.K, than.(*Element).K) < 0 }
// ========= copypaste of List implementation from stdlib ========
// Next returns the next list element or nil.
func (e *Element) Next() *Element {
if p := e.next; e.list != nil && p != &e.list.root {
return p
}
return nil
}
// Prev returns the previous list element or nil.
func (e *Element) Prev() *Element {
if p := e.prev; e.list != nil && p != &e.list.root {
return p
}
return nil
}
// List represents a doubly linked list.
// The zero value for List is an empty list ready to use.
type List struct {
root Element // sentinel list element, only &root, root.prev, and root.next are used
len int // current list length excluding (this) sentinel element
}
// Init initializes or clears list l.
func (l *List) Init() *List {
l.root.next = &l.root
l.root.prev = &l.root
l.len = 0
return l
}
// New returns an initialized list.
func NewList() *List { return new(List).Init() }
// Len returns the number of elements of list l.
// The complexity is O(1).
func (l *List) Len() int { return l.len }
// Front returns the first element of list l or nil if the list is empty.
func (l *List) Front() *Element {
if l.len == 0 {
return nil
}
return l.root.next
}
// Back returns the last element of list l or nil if the list is empty.
func (l *List) Back() *Element {
if l.len == 0 {
return nil
}
return l.root.prev
}
// lazyInit lazily initializes a zero List value.
func (l *List) lazyInit() {
if l.root.next == nil {
l.Init()
}
}
// insert inserts e after at, increments l.len, and returns e.
func (l *List) insert(e, at *Element) *Element {
e.prev = at
e.next = at.next
e.prev.next = e
e.next.prev = e
e.list = l
l.len++
return e
}
// insertValue is a convenience wrapper for insert(&Element{Value: v}, at).
func (l *List) insertValue(e, at *Element) *Element {
return l.insert(e, at)
}
// remove removes e from its list, decrements l.len, and returns e.
func (l *List) remove(e *Element) *Element {
e.prev.next = e.next
e.next.prev = e.prev
e.next = nil // avoid memory leaks
e.prev = nil // avoid memory leaks
e.list = nil
l.len--
return e
}
// move moves e to next to at and returns e.
func (l *List) move(e, at *Element) *Element {
if e == at {
return e
}
e.prev.next = e.next
e.next.prev = e.prev
e.prev = at
e.next = at.next
e.prev.next = e
e.next.prev = e
return e
}
// Remove removes e from l if e is an element of list l.
// It returns the element value e.Value.
// The element must not be nil.
func (l *List) Remove(e *Element) ([]byte, []byte) {
if e.list == l {
// if e.list == l, l must have been initialized when e was inserted
// in l or l == nil (e is a zero Element) and l.remove will crash
l.remove(e)
}
return e.K, e.V
}
// PushFront inserts a new element e with value v at the front of list l and returns e.
func (l *List) PushFront(e *Element) *Element {
l.lazyInit()
return l.insertValue(e, &l.root)
}
// PushBack inserts a new element e with value v at the back of list l and returns e.
func (l *List) PushBack(e *Element) *Element {
l.lazyInit()
return l.insertValue(e, l.root.prev)
}
// InsertBefore inserts a new element e with value v immediately before mark and returns e.
// If mark is not an element of l, the list is not modified.
// The mark must not be nil.
func (l *List) InsertBefore(e *Element, mark *Element) *Element {
if mark.list != l {
return nil
}
// see comment in List.Remove about initialization of l
return l.insertValue(e, mark.prev)
}
// InsertAfter inserts a new element e with value v immediately after mark and returns e.
// If mark is not an element of l, the list is not modified.
// The mark must not be nil.
func (l *List) InsertAfter(e *Element, mark *Element) *Element {
if mark.list != l {
return nil
}
// see comment in List.Remove about initialization of l
return l.insertValue(e, mark)
}
// MoveToFront moves element e to the front of list l.
// If e is not an element of l, the list is not modified.
// The element must not be nil.
func (l *List) MoveToFront(e *Element) {
if e.list != l || l.root.next == e {
return return
} }
c.lock.Lock() // see comment in List.Remove about initialization of l
defer c.lock.Unlock() l.move(e, &l.root)
var toDel []btree.Item
c.cache.Ascend(func(it btree.Item) bool {
age := goatomic.LoadUint64(&it.(*Pair).t)
if age < dropOlder {
toDel = append(toDel, it)
}
return true
})
for _, it := range toDel {
c.cache.Delete(it)
}
log.Info("evicted", "too_old_amount", len(toDel))
} }
//nolint // MoveToBack moves element e to the back of list l.
func (c *CoherentView) evictNew2Random(keysLimit int) { // If e is not an element of l, the list is not modified.
if c.Len() < keysLimit { // The element must not be nil.
func (l *List) MoveToBack(e *Element) {
if e.list != l || l.root.prev == e {
return return
} }
c.lock.Lock() // see comment in List.Remove about initialization of l
defer c.lock.Unlock() l.move(e, l.root.prev)
i := 0 }
var toDel []btree.Item
var fst, snd btree.Item // MoveBefore moves element e to its new position before mark.
firstPrime, secondPrime := 11, 13 // to choose 2-pseudo-random elements and evict worse one // If e or mark is not an element of l, or e == mark, the list is not modified.
c.cache.Ascend(func(it btree.Item) bool { // The element and mark must not be nil.
if i%firstPrime == 0 { func (l *List) MoveBefore(e, mark *Element) {
fst = it if e.list != l || e == mark || mark.list != l {
} return
if i%secondPrime == 0 { }
snd = it l.move(e, mark.prev)
} }
if fst != nil && snd != nil {
if goatomic.LoadUint64(&fst.(*Pair).t) < goatomic.LoadUint64(&snd.(*Pair).t) { // MoveAfter moves element e to its new position after mark.
toDel = append(toDel, fst) // If e or mark is not an element of l, or e == mark, the list is not modified.
} else { // The element and mark must not be nil.
toDel = append(toDel, snd) func (l *List) MoveAfter(e, mark *Element) {
} if e.list != l || e == mark || mark.list != l {
fst = nil return
snd = nil }
} l.move(e, mark)
return true
})
for _, it := range toDel {
c.cache.Delete(it)
}
log.Info("evicted", "2_random__amount", len(toDel))
} }

View File

@ -29,6 +29,126 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestEvictionInUnexpectedOrder(t *testing.T) {
// Order: View - 2, OnNewBlock - 2, View - 5, View - 6, OnNewBlock - 3, OnNewBlock - 4, View - 5, OnNewBlock - 5, OnNewBlock - 100
require := require.New(t)
cfg := DefaultCoherentCacheConfig
cfg.KeysLimit = 3
cfg.NewBlockWait = 0
c := New(cfg)
c.selectOrCreateRoot(2)
require.Equal(1, len(c.roots))
require.Equal(0, int(c.latestViewID))
require.False(c.roots[2].isCanonical)
c.add([]byte{1}, nil, c.roots[2], 2)
require.Equal(0, c.evictList.Len())
c.advanceRoot(2)
require.Equal(1, len(c.roots))
require.Equal(2, int(c.latestViewID))
require.True(c.roots[2].isCanonical)
c.add([]byte{1}, nil, c.roots[2], 2)
require.Equal(1, c.evictList.Len())
c.selectOrCreateRoot(5)
require.Equal(2, len(c.roots))
require.Equal(2, int(c.latestViewID))
require.False(c.roots[5].isCanonical)
c.add([]byte{2}, nil, c.roots[5], 5) // not added to evict list
require.Equal(1, c.evictList.Len())
c.add([]byte{2}, nil, c.roots[2], 2) // added to evict list, because it's latest view
require.Equal(2, c.evictList.Len())
c.selectOrCreateRoot(6)
require.Equal(3, len(c.roots))
require.Equal(2, int(c.latestViewID))
require.False(c.roots[6].isCanonical) // parrent exists, but parent has isCanonical=false
c.advanceRoot(3)
require.Equal(4, len(c.roots))
require.Equal(3, int(c.latestViewID))
require.True(c.roots[3].isCanonical)
c.advanceRoot(4)
require.Equal(5, len(c.roots))
require.Equal(4, int(c.latestViewID))
require.True(c.roots[4].isCanonical)
c.selectOrCreateRoot(5)
require.Equal(5, len(c.roots))
require.Equal(4, int(c.latestViewID))
require.False(c.roots[5].isCanonical)
c.advanceRoot(5)
require.Equal(5, len(c.roots))
require.Equal(5, int(c.latestViewID))
require.True(c.roots[5].isCanonical)
c.advanceRoot(100)
require.Equal(6, len(c.roots))
require.Equal(100, int(c.latestViewID))
require.True(c.roots[100].isCanonical)
//c.add([]byte{1}, nil, c.roots[2], 2)
require.Equal(0, c.latestView.cache.Len())
require.Equal(0, c.evictList.Len())
}
func TestEviction(t *testing.T) {
require, ctx := require.New(t), context.Background()
cfg := DefaultCoherentCacheConfig
cfg.KeysLimit = 3
cfg.NewBlockWait = 0
c := New(cfg)
db := memdb.NewTestDB(t)
k1, k2 := [20]byte{1}, [20]byte{2}
var id uint64
_ = db.Update(ctx, func(tx kv.RwTx) error {
_ = tx.Put(kv.PlainState, k1[:], []byte{1})
viewID, _ := c.View(ctx, tx)
id = tx.ViewID()
_, _ = c.Get(k1[:], tx, viewID)
_, _ = c.Get([]byte{1}, tx, viewID)
_, _ = c.Get([]byte{2}, tx, viewID)
_, _ = c.Get([]byte{3}, tx, viewID)
//require.Equal(c.roots[c.latestViewID].cache.Len(), c.evictList.Len())
return nil
})
require.Equal(0, c.evictList.Len())
//require.Equal(c.roots[c.latestViewID].cache.Len(), c.evictList.Len())
c.OnNewBlock(&remote.StateChangeBatch{
DatabaseViewID: id + 1,
ChangeBatch: []*remote.StateChange{
{
Direction: remote.Direction_FORWARD,
Changes: []*remote.AccountChange{{
Action: remote.Action_UPSERT,
Address: gointerfaces.ConvertAddressToH160(k1),
Data: []byte{2},
}},
},
},
})
require.Equal(1, c.evictList.Len())
require.Equal(c.roots[c.latestViewID].cache.Len(), c.evictList.Len())
_ = db.Update(ctx, func(tx kv.RwTx) error {
_ = tx.Put(kv.PlainState, k1[:], []byte{1})
viewID, _ := c.View(ctx, tx)
id = tx.ViewID()
_, _ = c.Get(k1[:], tx, viewID)
_, _ = c.Get(k2[:], tx, viewID)
_, _ = c.Get([]byte{5}, tx, viewID)
_, _ = c.Get([]byte{6}, tx, viewID)
return nil
})
require.Equal(c.roots[c.latestViewID].cache.Len(), c.evictList.Len())
require.Equal(cfg.KeysLimit, c.evictList.Len())
}
func TestAPI(t *testing.T) { func TestAPI(t *testing.T) {
require := require.New(t) require := require.New(t)
c := New(DefaultCoherentCacheConfig) c := New(DefaultCoherentCacheConfig)
@ -45,11 +165,11 @@ func TestAPI(t *testing.T) {
panic(fmt.Sprintf("epxected: %d, got: %d", expectTxnID, tx.ViewID())) panic(fmt.Sprintf("epxected: %d, got: %d", expectTxnID, tx.ViewID()))
} }
wg.Done() wg.Done()
cache, err := c.View(context.Background(), tx) viewID, err := c.View(context.Background(), tx)
if err != nil { if err != nil {
panic(err) panic(err)
} }
v, err := cache.Get(key[:], tx) v, err := c.Get(key[:], tx, viewID)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -24,16 +24,15 @@ import (
// DummyCache - doesn't remember anything - can be used when service is not remote // DummyCache - doesn't remember anything - can be used when service is not remote
type DummyCache struct{} type DummyCache struct{}
type DummyView struct{}
var _ Cache = (*DummyCache)(nil) // compile-time interface check var _ Cache = (*DummyCache)(nil) // compile-time interface check
var _ CacheView = (*DummyView)(nil) // compile-time interface check //var _ CacheView = (*DummyView)(nil) // compile-time interface check
var dummyView = &DummyView{} func NewDummy() *DummyCache { return &DummyCache{} }
func (c *DummyCache) View(ctx context.Context, tx kv.Tx) (ViewID, error) { return 0, nil }
func NewDummy() *DummyCache { return &DummyCache{} } func (c *DummyCache) OnNewBlock(sc *remote.StateChangeBatch) {}
func (c *DummyCache) View(ctx context.Context, tx kv.Tx) (CacheView, error) { return dummyView, nil } func (c *DummyCache) Evict() int { return 0 }
func (c *DummyCache) OnNewBlock(sc *remote.StateChangeBatch) {} func (c *DummyCache) Len() int { return 0 }
func (c *DummyCache) Evict() int { return 0 } func (c *DummyCache) Get(k []byte, tx kv.Tx, id ViewID) ([]byte, error) {
func (c *DummyCache) Len() int { return 0 } return tx.GetOne(kv.PlainState, k)
func (c *DummyView) Get(k []byte, tx kv.Tx) ([]byte, error) { return tx.GetOne(kv.PlainState, k) } }

View File

@ -30,6 +30,14 @@ func MustOpen(path string) kv.RwDB {
return db return db
} }
func MustOpenRo(path string) kv.RoDB {
db, err := Open(path, log.New(), true)
if err != nil {
panic(err)
}
return db
}
// Open - main method to open database. // Open - main method to open database.
func Open(path string, logger log.Logger, readOnly bool) (kv.RwDB, error) { func Open(path string, logger log.Logger, readOnly bool) (kv.RwDB, error) {
var db kv.RwDB var db kv.RwDB

View File

@ -302,9 +302,11 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
} }
case sentry.MessageId_POOLED_TRANSACTIONS_65, sentry.MessageId_POOLED_TRANSACTIONS_66: case sentry.MessageId_POOLED_TRANSACTIONS_65, sentry.MessageId_POOLED_TRANSACTIONS_66:
txs := TxSlots{} txs := TxSlots{}
f.pooledTxsParseCtx.Reject(func(hash []byte) bool { f.pooledTxsParseCtx.Reject(func(hash []byte) error {
known, _ := f.pool.IdHashKnown(tx, hash) if known, _ := f.pool.IdHashKnown(tx, hash); known {
return known return ErrRejected
}
return nil
}) })
switch req.Id { switch req.Id {
case sentry.MessageId_POOLED_TRANSACTIONS_65: case sentry.MessageId_POOLED_TRANSACTIONS_65:
@ -449,7 +451,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
} }
} }
if err := f.db.View(ctx, func(tx kv.Tx) error { if err := f.db.View(ctx, func(tx kv.Tx) error {
return f.pool.OnNewBlock(ctx, req, unwindTxs, minedTxs) return f.pool.OnNewBlock(ctx, req, unwindTxs, minedTxs, tx)
}); err != nil { }); err != nil {
log.Warn("onNewBlock", "err", err) log.Warn("onNewBlock", "err", err)
} }

View File

@ -18,6 +18,7 @@ package txpool
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"net" "net"
"sync" "sync"
@ -130,28 +131,35 @@ func (s *GrpcServer) Add(ctx context.Context, in *txpool_proto.AddRequest) (*txp
var slots TxSlots var slots TxSlots
slots.Resize(uint(len(in.RlpTxs))) slots.Resize(uint(len(in.RlpTxs)))
parseCtx := NewTxParseContext(s.rules, s.chainID) parseCtx := NewTxParseContext(s.rules, s.chainID)
parseCtx.Reject(func(hash []byte) bool { parseCtx.Reject(func(hash []byte) error {
known, _ := s.txPool.IdHashKnown(tx, hash) if known, _ := s.txPool.IdHashKnown(tx, hash); known {
return known return ErrAlreadyKnown
}
return nil
}) })
reply := &txpool_proto.AddReply{Imported: make([]txpool_proto.ImportResult, len(in.RlpTxs)), Errors: make([]string, len(in.RlpTxs))}
for i := range in.RlpTxs { for i := range in.RlpTxs {
slots.txs[i] = &TxSlot{} slots.txs[i] = &TxSlot{}
slots.isLocal[i] = true slots.isLocal[i] = true
if _, err := parseCtx.ParseTransaction(in.RlpTxs[i], 0, slots.txs[i], slots.senders.At(i)); err != nil { if _, err := parseCtx.ParseTransaction(in.RlpTxs[i], 0, slots.txs[i], slots.senders.At(i)); err != nil {
log.Warn("pool add", "err", err) if errors.Is(err, ErrAlreadyKnown) {
} else {
log.Warn("pool add", "err", err)
}
continue continue
} }
} }
reply := &txpool_proto.AddReply{Imported: make([]txpool_proto.ImportResult, len(in.RlpTxs)), Errors: make([]string, len(in.RlpTxs))}
discardReasons, err := s.txPool.AddLocalTxs(ctx, slots) discardReasons, err := s.txPool.AddLocalTxs(ctx, slots)
if err != nil { if err != nil {
return nil, err return nil, err
} }
//TODO: concept of discardReasons not really implemented yet //TODO: concept of discardReasonsLRU not really implemented yet
_ = discardReasons _ = discardReasons
/* /*
for i, err := range discardReasons { for i, err := range discardReasonsLRU {
if err == nil { if err == nil {
continue continue
} }

View File

@ -312,7 +312,7 @@ func (mock *PoolMock) IdHashKnownCalls() []struct {
} }
// OnNewBlock calls OnNewBlockFunc. // OnNewBlock calls OnNewBlockFunc.
func (mock *PoolMock) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs TxSlots, minedTxs TxSlots) error { func (mock *PoolMock) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs TxSlots, tx kv.Tx) error {
callInfo := struct { callInfo := struct {
Ctx context.Context Ctx context.Context
StateChanges *remote.StateChangeBatch StateChanges *remote.StateChangeBatch

View File

@ -159,7 +159,7 @@ func TestPooledTransactionsPacket66(t *testing.T) {
require.Equal(tt.encoded, fmt.Sprintf("%x", encodeBuf)) require.Equal(tt.encoded, fmt.Sprintf("%x", encodeBuf))
ctx := NewTxParseContext(chain.MainnetRules, *u256.N1) ctx := NewTxParseContext(chain.MainnetRules, *u256.N1)
ctx.reject = func(bytes []byte) bool { return true } ctx.checkHash = func(bytes []byte) error { return ErrRejected }
slots := &TxSlots{} slots := &TxSlots{}
requestId, _, err := ParsePooledTransactions66(encodeBuf, 0, ctx, slots) requestId, _, err := ParsePooledTransactions66(encodeBuf, 0, ctx, slots)
require.NoError(err) require.NoError(err)

File diff suppressed because it is too large Load Diff

View File

@ -234,7 +234,7 @@ func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawFeeCap, rawSender []b
panic(err) panic(err)
} }
txs.senders = append(txs.senders, senders.At(i%senders.Len())...) txs.senders = append(txs.senders, senders.At(i%senders.Len())...)
txs.isLocal = append(txs.isLocal, false) txs.isLocal = append(txs.isLocal, true)
} }
return sendersInfo, senderIDs, txs, true return sendersInfo, senderIDs, txs, true
@ -542,7 +542,7 @@ func FuzzOnNewBlocks(f *testing.F) {
// go to first fork // go to first fork
//fmt.Printf("ll1: %d,%d,%d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len()) //fmt.Printf("ll1: %d,%d,%d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len())
txs1, txs2, p2pReceived, txs3 := splitDataset(txs) txs1, txs2, p2pReceived, txs3 := splitDataset(txs)
err = pool.OnNewBlock(ctx, change, txs1, TxSlots{}) err = pool.OnNewBlock(ctx, change, txs1, TxSlots{}, nil)
assert.NoError(err) assert.NoError(err)
check(txs1, TxSlots{}, "fork1") check(txs1, TxSlots{}, "fork1")
checkNotify(txs1, TxSlots{}, "fork1") checkNotify(txs1, TxSlots{}, "fork1")
@ -554,7 +554,7 @@ func FuzzOnNewBlocks(f *testing.F) {
{BlockHeight: 1, BlockHash: h1, PrevBlockHeight: 0, PrevBlockHash: h1, ProtocolBaseFee: currentBaseFee}, {BlockHeight: 1, BlockHash: h1, PrevBlockHeight: 0, PrevBlockHash: h1, ProtocolBaseFee: currentBaseFee},
}, },
} }
err = pool.OnNewBlock(ctx, change, TxSlots{}, txs2) err = pool.OnNewBlock(ctx, change, TxSlots{}, txs2, nil)
check(TxSlots{}, txs2, "fork1 mined") check(TxSlots{}, txs2, "fork1 mined")
checkNotify(TxSlots{}, txs2, "fork1 mined") checkNotify(TxSlots{}, txs2, "fork1 mined")
@ -565,7 +565,7 @@ func FuzzOnNewBlocks(f *testing.F) {
{BlockHeight: 0, BlockHash: h1, Direction: remote.Direction_UNWIND, PrevBlockHeight: 1, PrevBlockHash: h1, ProtocolBaseFee: currentBaseFee}, {BlockHeight: 0, BlockHash: h1, Direction: remote.Direction_UNWIND, PrevBlockHeight: 1, PrevBlockHash: h1, ProtocolBaseFee: currentBaseFee},
}, },
} }
err = pool.OnNewBlock(ctx, change, txs2, TxSlots{}) err = pool.OnNewBlock(ctx, change, txs2, TxSlots{}, nil)
assert.NoError(err) assert.NoError(err)
check(txs2, TxSlots{}, "fork2") check(txs2, TxSlots{}, "fork2")
checkNotify(txs2, TxSlots{}, "fork2") checkNotify(txs2, TxSlots{}, "fork2")
@ -576,7 +576,7 @@ func FuzzOnNewBlocks(f *testing.F) {
{BlockHeight: 1, BlockHash: h22, PrevBlockHeight: 0, PrevBlockHash: h1, ProtocolBaseFee: currentBaseFee}, {BlockHeight: 1, BlockHash: h22, PrevBlockHeight: 0, PrevBlockHash: h1, ProtocolBaseFee: currentBaseFee},
}, },
} }
err = pool.OnNewBlock(ctx, change, TxSlots{}, txs3) err = pool.OnNewBlock(ctx, change, TxSlots{}, txs3, nil)
assert.NoError(err) assert.NoError(err)
check(TxSlots{}, txs3, "fork2 mined") check(TxSlots{}, txs3, "fork2 mined")
checkNotify(TxSlots{}, txs3, "fork2 mined") checkNotify(TxSlots{}, txs3, "fork2 mined")

View File

@ -15,3 +15,42 @@
*/ */
package txpool package txpool
import (
"container/heap"
"testing"
)
func BenchmarkName(b *testing.B) {
txs := make([]*metaTx, 10_000)
p := NewSubPool(BaseFeeSubPool)
for i := 0; i < len(txs); i++ {
txs[i] = &metaTx{Tx: &TxSlot{}}
}
for i := 0; i < len(txs); i++ {
p.UnsafeAdd(txs[i])
}
p.EnforceInvariants()
b.ResetTimer()
for i := 0; i < b.N; i++ {
txs[0].timestamp = 1
heap.Fix(p.best, txs[0].bestIndex)
heap.Fix(p.worst, txs[0].worstIndex)
}
}
func BenchmarkName2(b *testing.B) {
txs := make([]*metaTx, 10_000)
p := NewSubPool(BaseFeeSubPool)
for i := 0; i < len(txs); i++ {
txs[i] = &metaTx{Tx: &TxSlot{}}
}
for i := 0; i < len(txs); i++ {
p.UnsafeAdd(txs[i])
}
p.EnforceInvariants()
b.ResetTimer()
for i := 0; i < b.N; i++ {
}
}

View File

@ -55,7 +55,7 @@ type TxParseContext struct {
sig [65]byte sig [65]byte
withSender bool withSender bool
isProtected bool isProtected bool
reject func([]byte) bool checkHash func([]byte) error
cfg TxParsseConfig cfg TxParsseConfig
} }
@ -125,9 +125,10 @@ const (
const ParseTransactionErrorPrefix = "parse transaction payload" const ParseTransactionErrorPrefix = "parse transaction payload"
var ErrRejected = errors.New("rejected") var ErrRejected = errors.New("rejected")
var ErrAlreadyKnown = errors.New("already known")
func (ctx *TxParseContext) Reject(f func(hash []byte) bool) { ctx.reject = f } func (ctx *TxParseContext) Reject(f func(hash []byte) error) { ctx.checkHash = f }
func (ctx *TxParseContext) WithSender(v bool) { ctx.withSender = v } func (ctx *TxParseContext) WithSender(v bool) { ctx.withSender = v }
// ParseTransaction extracts all the information from the transactions's payload (RLP) necessary to build TxSlot // ParseTransaction extracts all the information from the transactions's payload (RLP) necessary to build TxSlot
// it also performs syntactic validation of the transactions // it also performs syntactic validation of the transactions
@ -371,8 +372,10 @@ func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int, slot *TxSlo
if !ctx.withSender { if !ctx.withSender {
return p, nil return p, nil
} }
if ctx.reject != nil && ctx.reject(slot.idHash[:32]) { if ctx.checkHash != nil {
return p, ErrRejected if err := ctx.checkHash(slot.idHash[:32]); err != nil {
return p, err
}
} }
// Computing sigHash (hash used to recover sender from the signature) // Computing sigHash (hash used to recover sender from the signature)