Pool: no state persistance (#62)

This commit is contained in:
Alex Sharov 2021-09-08 19:21:13 +07:00 committed by GitHub
parent c4efc0ea3a
commit 0b4e528fac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 144 additions and 431 deletions

View File

@ -7,13 +7,12 @@
package consensus
import (
"reflect"
"sync"
"github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/runtime/protoimpl"
"google.golang.org/protobuf/types/known/emptypb"
types "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
emptypb "google.golang.org/protobuf/types/known/emptypb"
reflect "reflect"
sync "sync"
)
const (

View File

@ -7,13 +7,12 @@
package remote
import (
"reflect"
"sync"
"github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/runtime/protoimpl"
"google.golang.org/protobuf/types/known/emptypb"
types "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
emptypb "google.golang.org/protobuf/types/known/emptypb"
reflect "reflect"
sync "sync"
)
const (

View File

@ -346,21 +346,13 @@ var ChaindataTables = []string{
const (
RecentLocalTransaction = "RecentLocalTransaction" // sequence_u64 -> tx_hash
PoolSenderID = "PoolSenderID" // sender_20bytes -> sender_id_u64
PoolSenderIDToAdress = "PoolSenderIDToAddress" // sender_id_u64 -> sender_20bytes
PoolSender = "PoolSender" // sender_id_u64 -> nonce, balance
PoolTransaction = "PoolTransaction" // txHash -> sender_id_u64+tx_rlp
PoolStateEviction = "PoolStateEviction" // commit_id_u64 -> roaring([sender_id_u64]) - list of senders who had no transactions at this time, if after some time they still have no transactions - evict them.
PoolInfo = "PoolInfo" // option_key -> option_value
)
var TxPoolTables = []string{
RecentLocalTransaction,
PoolSenderID,
PoolSenderIDToAdress,
PoolSender,
PoolTransaction,
PoolStateEviction,
PoolInfo,
}
var SentryTables = []string{}

View File

@ -185,7 +185,7 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
continue
}
log.Warn("[txpool.fetch] Handling incoming message", "msg", req.Id.String(), "err", err)
log.Warn("[txpool.fetch] Handling incoming message", "msg", req.Id.String(), "err", err, "rlp", fmt.Sprintf("%x", req.Data))
}
if f.wg != nil {
f.wg.Done()
@ -307,11 +307,11 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes
})
switch req.Id {
case sentry.MessageId_POOLED_TRANSACTIONS_65:
if _, _, err := ParsePooledTransactions66(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil {
if _, err := ParsePooledTransactions65(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil {
return err
}
case sentry.MessageId_POOLED_TRANSACTIONS_66:
if _, err := ParsePooledTransactions65(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil {
if _, _, err := ParsePooledTransactions66(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil {
return err
}
default:
@ -456,7 +456,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien
diff[string(addr[:])] = sender{nonce: nonce, balance: balance}
}
if err := f.db.View(ctx, func(tx kv.Tx) error {
return f.pool.OnNewBlock(tx, diff, unwindTxs, minedTxs, req.ProtocolBaseFee, req.BlockHeight, gointerfaces.ConvertH256ToHash(req.BlockHash))
return f.pool.OnNewBlock(diff, unwindTxs, minedTxs, req.ProtocolBaseFee, req.BlockHeight, gointerfaces.ConvertH256ToHash(req.BlockHash))
}); err != nil {
log.Warn("onNewBlock", "err", err)
}

View File

@ -25,7 +25,7 @@ var TxPoolAPIVersion = &types2.VersionReply{Major: 1, Minor: 0, Patch: 0}
type txPool interface {
GetRlp(tx kv.Tx, hash []byte) ([]byte, error)
AddLocals(ctx context.Context, newTxs TxSlots, tx kv.Tx) ([]DiscardReason, error)
AddLocals(ctx context.Context, newTxs TxSlots) ([]DiscardReason, error)
DeprecatedForEach(_ context.Context, f func(rlp, sender []byte, t SubPoolType), tx kv.Tx) error
CountContent() (int, int, int)
IdHashKnown(tx kv.Tx, hash []byte) (bool, error)
@ -123,7 +123,7 @@ func (s *GrpcServer) Add(ctx context.Context, in *txpool_proto.AddRequest) (*txp
}
reply := &txpool_proto.AddReply{Imported: make([]txpool_proto.ImportResult, len(in.RlpTxs)), Errors: make([]string, len(in.RlpTxs))}
discardReasons, err := s.txPool.AddLocals(ctx, slots, tx)
discardReasons, err := s.txPool.AddLocals(ctx, slots)
if err != nil {
return nil, err
}

View File

@ -31,7 +31,7 @@ var _ Pool = &PoolMock{}
// IdHashKnownFunc: func(tx kv.Tx, hash []byte) (bool, error) {
// panic("mock out the IdHashKnown method")
// },
// OnNewBlockFunc: func(tx kv.Tx, stateChanges map[string]sender, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte) error {
// OnNewBlockFunc: func(stateChanges map[string]sender, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte) error {
// panic("mock out the OnNewBlock method")
// },
// StartedFunc: func() bool {
@ -57,7 +57,7 @@ type PoolMock struct {
IdHashKnownFunc func(tx kv.Tx, hash []byte) (bool, error)
// OnNewBlockFunc mocks the OnNewBlock method.
OnNewBlockFunc func(tx kv.Tx, stateChanges map[string]sender, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte) error
OnNewBlockFunc func(stateChanges map[string]sender, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte) error
// StartedFunc mocks the Started method.
StartedFunc func() bool
@ -92,8 +92,6 @@ type PoolMock struct {
}
// OnNewBlock holds details about calls to the OnNewBlock method.
OnNewBlock []struct {
// Tx is the tx argument value.
Tx kv.Tx
// StateChanges is the stateChanges argument value.
StateChanges map[string]sender
// UnwindTxs is the unwindTxs argument value.
@ -264,9 +262,8 @@ func (mock *PoolMock) IdHashKnownCalls() []struct {
}
// OnNewBlock calls OnNewBlockFunc.
func (mock *PoolMock) OnNewBlock(tx kv.Tx, stateChanges map[string]sender, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte) error {
func (mock *PoolMock) OnNewBlock(stateChanges map[string]sender, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte) error {
callInfo := struct {
Tx kv.Tx
StateChanges map[string]sender
UnwindTxs TxSlots
MinedTxs TxSlots
@ -274,7 +271,6 @@ func (mock *PoolMock) OnNewBlock(tx kv.Tx, stateChanges map[string]sender, unwin
BlockHeight uint64
BlockHash [32]byte
}{
Tx: tx,
StateChanges: stateChanges,
UnwindTxs: unwindTxs,
MinedTxs: minedTxs,
@ -291,14 +287,13 @@ func (mock *PoolMock) OnNewBlock(tx kv.Tx, stateChanges map[string]sender, unwin
)
return errOut
}
return mock.OnNewBlockFunc(tx, stateChanges, unwindTxs, minedTxs, baseFee, blockHeight, blockHash)
return mock.OnNewBlockFunc(stateChanges, unwindTxs, minedTxs, baseFee, blockHeight, blockHash)
}
// OnNewBlockCalls gets all the calls that were made to OnNewBlock.
// Check the length with:
// len(mockedPool.OnNewBlockCalls())
func (mock *PoolMock) OnNewBlockCalls() []struct {
Tx kv.Tx
StateChanges map[string]sender
UnwindTxs TxSlots
MinedTxs TxSlots
@ -307,7 +302,6 @@ func (mock *PoolMock) OnNewBlockCalls() []struct {
BlockHash [32]byte
} {
var calls []struct {
Tx kv.Tx
StateChanges map[string]sender
UnwindTxs TxSlots
MinedTxs TxSlots

View File

@ -51,7 +51,6 @@ var (
cacheTotalCounter = metrics.GetOrCreateCounter(`pool_cache_total`)
cacheHitCounter = metrics.GetOrCreateCounter(`pool_cache_total{result="hit"}`)
writeToDbBytesCounter = metrics.GetOrCreateCounter(`pool_write_to_db_bytes`)
sendersEvictedCounter = metrics.GetOrCreateCounter(`pool_senders_evicted`)
)
const ASSERT = false
@ -82,7 +81,7 @@ type Pool interface {
Started() bool
GetRlp(tx kv.Tx, hash []byte) ([]byte, error)
AddRemoteTxs(ctx context.Context, newTxs TxSlots)
OnNewBlock(tx kv.Tx, stateChanges map[string]sender, unwindTxs, minedTxs TxSlots, baseFee, blockHeight uint64, blockHash [32]byte) error
OnNewBlock(stateChanges map[string]sender, unwindTxs, minedTxs TxSlots, baseFee, blockHeight uint64, blockHash [32]byte) error
AddNewGoodPeer(peerID PeerID)
}
@ -176,7 +175,6 @@ type sendersBatch struct {
blockHeight atomic.Uint64
blockHash atomic.String
senderID uint64
commitID uint64
senderIDs map[string]uint64
senderInfo map[uint64]*sender
}
@ -186,67 +184,25 @@ func newSendersCache() *sendersBatch {
}
//nolint
func (sc *sendersBatch) idsCount(tx kv.Tx) (inMem int, inDb int, err error) {
c, err := tx.Cursor(kv.PoolSenderID)
if err != nil {
return 0, 0, err
}
inDB, err := c.Count()
if err != nil {
return 0, 0, err
}
return len(sc.senderIDs), int(inDB), nil
}
func (sc *sendersBatch) idsCount() (inMem int) { return len(sc.senderIDs) }
//nolint
func (sc *sendersBatch) infoCount(tx kv.Tx) (inMem int, inDb int, err error) {
c, err := tx.Cursor(kv.PoolSender)
if err != nil {
return 0, 0, err
}
inDB, err := c.Count()
if err != nil {
return 0, 0, err
}
return len(sc.senderInfo), int(inDB), nil
}
func (sc *sendersBatch) id(addr string, tx kv.Tx) (uint64, bool, error) {
func (sc *sendersBatch) infoCount() (inMem int) { return len(sc.senderInfo) }
func (sc *sendersBatch) id(addr string) (uint64, bool) {
id, ok := sc.senderIDs[addr]
if !ok {
v, err := tx.GetOne(kv.PoolSenderID, []byte(addr))
if err != nil {
return 0, false, err
}
if len(v) == 0 {
return 0, false, nil
}
id = binary.BigEndian.Uint64(v)
}
return id, true, nil
return id, ok
}
func (sc *sendersBatch) info(id uint64, tx kv.Tx, expectMiss bool) (*sender, error) {
func (sc *sendersBatch) info(id uint64, expectMiss bool) *sender {
cacheTotalCounter.Inc()
info, ok := sc.senderInfo[id]
if ok {
cacheHitCounter.Inc()
return info, nil
return info
}
cacheTotalCounter.Inc()
encID := make([]byte, 8)
binary.BigEndian.PutUint64(encID, id)
v, err := tx.GetOne(kv.PoolSender, encID)
if err != nil {
return nil, err
if !expectMiss {
panic("all senders must be loaded in advance")
}
if len(v) == 0 {
if !expectMiss {
panic("all senders must be loaded in advance")
}
return nil, nil // don't fallback to core db, it will be manually done in right place
}
cacheHitCounter.Inc()
balance := uint256.NewInt(0)
balance.SetBytes(v[8:])
return newSender(binary.BigEndian.Uint64(v), *balance), nil
return nil
}
//nolint
@ -257,11 +213,11 @@ func (sc *sendersBatch) printDebug(prefix string) {
}
}
func (sc *sendersBatch) onNewTxs(tx kv.Tx, newTxs TxSlots) (cacheMisses map[uint64]string, err error) {
if err := sc.ensureSenderIDOnNewTxs(tx, newTxs); err != nil {
func (sc *sendersBatch) onNewTxs(newTxs TxSlots) (cacheMisses map[uint64]string, err error) {
if err := sc.ensureSenderIDOnNewTxs(newTxs); err != nil {
return nil, err
}
cacheMisses, err = sc.setTxSenderID(tx, newTxs)
cacheMisses, err = sc.setTxSenderID(newTxs)
if err != nil {
return nil, err
}
@ -283,29 +239,26 @@ func (sc *sendersBatch) loadFromCore(coreTx kv.Tx, toLoad map[uint64]string) err
return nil
}
func (sc *sendersBatch) onNewBlock(tx kv.Tx, stateChanges map[string]sender, unwindTxs, minedTxs TxSlots, blockHeight uint64, blockHash [32]byte) error {
func (sc *sendersBatch) onNewBlock(stateChanges map[string]sender, unwindTxs, minedTxs TxSlots, blockHeight uint64, blockHash [32]byte) error {
//TODO: if see non-continuous block heigh - load gap from changesets
sc.blockHeight.Store(blockHeight)
sc.blockHash.Store(string(blockHash[:]))
//`loadSenders` goes by network to core - and it must be outside of sendersBatch lock. But other methods must be locked
if err := sc.mergeStateChanges(tx, stateChanges, unwindTxs, minedTxs); err != nil {
if err := sc.mergeStateChanges(stateChanges, unwindTxs, minedTxs); err != nil {
return err
}
if _, err := sc.setTxSenderID(tx, unwindTxs); err != nil {
if _, err := sc.setTxSenderID(unwindTxs); err != nil {
return err
}
if _, err := sc.setTxSenderID(tx, minedTxs); err != nil {
if _, err := sc.setTxSenderID(minedTxs); err != nil {
return err
}
return nil
}
func (sc *sendersBatch) mergeStateChanges(tx kv.Tx, stateChanges map[string]sender, unwindedTxs, minedTxs TxSlots) error {
func (sc *sendersBatch) mergeStateChanges(stateChanges map[string]sender, unwindedTxs, minedTxs TxSlots) error {
for addr, v := range stateChanges { // merge state changes
id, ok, err := sc.id(addr, tx)
if err != nil {
return err
}
id, ok := sc.id(addr)
if !ok {
sc.senderID++
id = sc.senderID
@ -315,10 +268,7 @@ func (sc *sendersBatch) mergeStateChanges(tx kv.Tx, stateChanges map[string]send
}
for i := 0; i < unwindedTxs.senders.Len(); i++ {
id, ok, err := sc.id(string(unwindedTxs.senders.At(i)), tx)
if err != nil {
return err
}
id, ok := sc.id(string(unwindedTxs.senders.At(i)))
if !ok {
sc.senderID++
id = sc.senderID
@ -332,10 +282,7 @@ func (sc *sendersBatch) mergeStateChanges(tx kv.Tx, stateChanges map[string]send
}
for i := 0; i < len(minedTxs.txs); i++ {
id, ok, err := sc.id(string(minedTxs.senders.At(i)), tx)
if err != nil {
return err
}
id, ok := sc.id(string(minedTxs.senders.At(i)))
if !ok {
sc.senderID++
id = sc.senderID
@ -350,12 +297,9 @@ func (sc *sendersBatch) mergeStateChanges(tx kv.Tx, stateChanges map[string]send
return nil
}
func (sc *sendersBatch) ensureSenderIDOnNewTxs(tx kv.Tx, newTxs TxSlots) error {
func (sc *sendersBatch) ensureSenderIDOnNewTxs(newTxs TxSlots) error {
for i := 0; i < len(newTxs.txs); i++ {
_, ok, err := sc.id(string(newTxs.senders.At(i)), tx)
if err != nil {
return err
}
_, ok := sc.id(string(newTxs.senders.At(i)))
if ok {
continue
}
@ -365,26 +309,20 @@ func (sc *sendersBatch) ensureSenderIDOnNewTxs(tx kv.Tx, newTxs TxSlots) error {
return nil
}
func (sc *sendersBatch) setTxSenderID(tx kv.Tx, txs TxSlots) (map[uint64]string, error) {
func (sc *sendersBatch) setTxSenderID(txs TxSlots) (map[uint64]string, error) {
toLoad := map[uint64]string{}
for i := range txs.txs {
addr := string(txs.senders.At(i))
// assign ID to each new sender
id, ok, err := sc.id(addr, tx)
if err != nil {
return nil, err
}
id, ok := sc.id(addr)
if !ok {
panic("not supported yet")
}
txs.txs[i].senderID = id
// load data from db if need
info, err := sc.info(txs.txs[i].senderID, tx, true)
if err != nil {
return nil, err
}
info := sc.info(txs.txs[i].senderID, true)
if info != nil {
continue
}
@ -396,24 +334,11 @@ func (sc *sendersBatch) setTxSenderID(tx kv.Tx, txs TxSlots) (map[uint64]string,
}
return toLoad, nil
}
func (sc *sendersBatch) syncMissedStateDiff(ctx context.Context, tx kv.RwTx, coreTx kv.Tx, missedTo uint64) error {
func (sc *sendersBatch) syncMissedStateDiff(ctx context.Context, coreTx kv.Tx, missedTo uint64) error {
dropLocalSendersCache := false
if missedTo > 0 && missedTo-sc.blockHeight.Load() > 1024 {
dropLocalSendersCache = true
}
lastCommitTimeV, err := tx.GetOne(kv.PoolInfo, SenderCommitTimeKey)
if err != nil {
return err
}
lastCommitTime := time.Time{}
if len(lastCommitTimeV) > 0 {
if err := lastCommitTime.UnmarshalBinary(lastCommitTimeV); err != nil {
return err
}
if time.Since(lastCommitTime) > 3*24*time.Hour {
dropLocalSendersCache = true
}
}
if coreTx != nil {
ok, err := isCanonical(coreTx, sc.blockHeight.Load(), []byte(sc.blockHash.Load()))
if err != nil {
@ -425,9 +350,6 @@ func (sc *sendersBatch) syncMissedStateDiff(ctx context.Context, tx kv.RwTx, cor
}
if dropLocalSendersCache {
if err := tx.ClearBucket(kv.PoolSender); err != nil {
return err
}
sc.senderInfo = map[uint64]*sender{}
}
@ -445,7 +367,7 @@ func (sc *sendersBatch) syncMissedStateDiff(ctx context.Context, tx kv.RwTx, cor
if err != nil {
return err
}
if err := sc.mergeStateChanges(tx, diff, TxSlots{}, TxSlots{}); err != nil {
if err := sc.mergeStateChanges(diff, TxSlots{}, TxSlots{}); err != nil {
return err
}
return nil
@ -575,29 +497,22 @@ func (p *TxPool) printDebug(prefix string) {
(*p.queued.best)[i].Tx.printDebug(fmt.Sprintf("%s.queued : %b", prefix, (*p.queued.best)[i].subPool))
}
}
func (p *TxPool) logStats(tx kv.Tx) error {
func (p *TxPool) logStats() {
protocolBaseFee, currentBaseFee := p.protocolBaseFee.Load(), p.currentBaseFee.Load()
p.lock.RLock()
defer p.lock.RUnlock()
idsInMem, idsInDb, err := p.senders.idsCount(tx)
if err != nil {
return err
}
infoInMem, infoInDb, err := p.senders.infoCount(tx)
if err != nil {
return err
}
idsInMem := p.senders.idsCount()
infoInMem := p.senders.infoCount()
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Info(fmt.Sprintf("[txpool] baseFee: %d, %dm; queuesSize: pending=%d/%d, baseFee=%d/%d, queued=%d/%d; sendersBatch: id=%d+%d,info=%d+%d, alloc=%dMb, sys=%dMb\n",
log.Info(fmt.Sprintf("[txpool] baseFee: %d, %dm; queuesSize: pending=%d/%d, baseFee=%d/%d, queued=%d/%d; sendersBatch: id=%d,info=%d, alloc=%dMb, sys=%dMb\n",
protocolBaseFee, currentBaseFee/1_000_000,
p.pending.Len(), PendingSubPoolLimit, p.baseFee.Len(), BaseFeeSubPoolLimit, p.queued.Len(), QueuedSubPoolLimit,
idsInMem, idsInDb, infoInMem, infoInDb,
idsInMem, infoInMem,
m.Alloc/1024/1024, m.Sys/1024/1024,
))
return nil
}
func (p *TxPool) GetRlp(tx kv.Tx, hash []byte) ([]byte, error) {
p.lock.RLock()
@ -692,8 +607,8 @@ func (p *TxPool) Best(n uint16, txs *TxsRlp, tx kv.Tx) error {
log.Warn("tx rlp not found")
continue
}
txs.Txs[i] = rlpTx[8:]
copy(txs.Senders.At(i), rlpTx[:8])
txs.Txs[i] = rlpTx[20:]
copy(txs.Senders.At(i), rlpTx[:20])
txs.IsLocal[i] = best[i].subPool&IsLocal > 0
/*
found := false
@ -733,7 +648,6 @@ func (p *TxPool) DeprecatedForEach(_ context.Context, f func(rlp, sender []byte,
p.lock.RLock()
defer p.lock.RUnlock()
encID := make([]byte, 8)
p.byNonce.tree.Ascend(func(i btree.Item) bool {
mt := i.(*sortByNonce).metaTx
slot := mt.Tx
@ -748,7 +662,7 @@ func (p *TxPool) DeprecatedForEach(_ context.Context, f func(rlp, sender []byte,
log.Error("[txpool] tx not found in db")
return false
}
slotRlp = v[8:]
slotRlp = v[20:]
}
var sender []byte
@ -761,16 +675,7 @@ func (p *TxPool) DeprecatedForEach(_ context.Context, f func(rlp, sender []byte,
}
}
if !found {
binary.BigEndian.PutUint64(encID, slot.senderID)
v, err := tx.GetOne(kv.PoolSenderIDToAdress, encID)
if err != nil {
log.Error("[txpool] get sender from db", "err", err)
return false
}
if v == nil {
log.Error("[txpool] sender not found in db")
return false
}
return true
}
f(slotRlp, sender, mt.currentSubPool)
return true
@ -789,7 +694,7 @@ func (p *TxPool) AddRemoteTxs(_ context.Context, newTxs TxSlots) {
p.unprocessedRemoteTxs.Append(newTxs.txs[i], newTxs.senders.At(i), newTxs.isLocal[i])
}
}
func (p *TxPool) AddLocals(ctx context.Context, newTxs TxSlots, tx kv.Tx) ([]DiscardReason, error) {
func (p *TxPool) AddLocals(ctx context.Context, newTxs TxSlots) ([]DiscardReason, error) {
p.lock.Lock()
defer p.lock.Unlock()
discardReasonsIndex := len(p.discardReasons)
@ -797,8 +702,7 @@ func (p *TxPool) AddLocals(ctx context.Context, newTxs TxSlots, tx kv.Tx) ([]Dis
for i := range newTxs.isLocal {
newTxs.isLocal[i] = true
}
cacheMisses, err := p.senders.onNewTxs(tx, newTxs)
cacheMisses, err := p.senders.onNewTxs(newTxs)
if err != nil {
return nil, err
}
@ -814,7 +718,7 @@ func (p *TxPool) AddLocals(ctx context.Context, newTxs TxSlots, tx kv.Tx) ([]Dis
if !p.Started() {
return nil, fmt.Errorf("pool not started yet")
}
if err := onNewTxs(tx, p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil {
if err := onNewTxs(p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil {
return nil, err
}
@ -840,7 +744,7 @@ func (p *TxPool) copyDiscardReasons(from int) []DiscardReason {
copy(cpy, p.discardReasons[from:])
return cpy
}
func (p *TxPool) processRemoteTxs(ctx context.Context, tx kv.Tx) error {
func (p *TxPool) processRemoteTxs(ctx context.Context) error {
p.lock.RLock()
l := len(p.unprocessedRemoteTxs.txs)
p.lock.RUnlock()
@ -854,7 +758,7 @@ func (p *TxPool) processRemoteTxs(ctx context.Context, tx kv.Tx) error {
defer p.lock.Unlock()
newTxs := *p.unprocessedRemoteTxs
cacheMisses, err := p.senders.onNewTxs(tx, newTxs)
cacheMisses, err := p.senders.onNewTxs(newTxs)
if err != nil {
return err
}
@ -871,7 +775,7 @@ func (p *TxPool) processRemoteTxs(ctx context.Context, tx kv.Tx) error {
return fmt.Errorf("txpool not started yet")
}
if err := onNewTxs(tx, p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil {
if err := onNewTxs(p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil {
return err
}
@ -899,7 +803,7 @@ func (p *TxPool) processRemoteTxs(ctx context.Context, tx kv.Tx) error {
//log.Info("[txpool] on new txs", "amount", len(newTxs.txs), "in", time.Since(t))
return nil
}
func onNewTxs(tx kv.Tx, senders *sendersBatch, newTxs TxSlots, protocolBaseFee, currentBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, discard func(*metaTx)) error {
func onNewTxs(senders *sendersBatch, newTxs TxSlots, protocolBaseFee, currentBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, discard func(*metaTx)) error {
for i := range newTxs.txs {
if newTxs.txs[i].senderID == 0 {
return fmt.Errorf("senderID can't be zero")
@ -908,10 +812,7 @@ func onNewTxs(tx kv.Tx, senders *sendersBatch, newTxs TxSlots, protocolBaseFee,
changedSenders := unsafeAddToPendingPool(byNonce, newTxs, pending, baseFee, queued, byHash, discard)
for id := range changedSenders {
sender, err := senders.info(id, tx, false)
if err != nil {
return err
}
sender := senders.info(id, false)
onSenderChange(id, sender, byNonce, protocolBaseFee, currentBaseFee)
}
@ -934,14 +835,14 @@ func (p *TxPool) setBaseFee(baseFee uint64) (uint64, uint64) {
return p.protocolBaseFee.Load(), p.currentBaseFee.Load()
}
func (p *TxPool) OnNewBlock(tx kv.Tx, stateChanges map[string]sender, unwindTxs, minedTxs TxSlots, baseFee, blockHeight uint64, blockHash [32]byte) error {
func (p *TxPool) OnNewBlock(stateChanges map[string]sender, unwindTxs, minedTxs TxSlots, baseFee, blockHeight uint64, blockHash [32]byte) error {
defer newBlockTimer.UpdateDuration(time.Now())
p.lock.Lock()
defer p.lock.Unlock()
t := time.Now()
protocolBaseFee, baseFee := p.setBaseFee(baseFee)
if err := p.senders.onNewBlock(tx, stateChanges, unwindTxs, minedTxs, blockHeight, blockHash); err != nil {
if err := p.senders.onNewBlock(stateChanges, unwindTxs, minedTxs, blockHeight, blockHash); err != nil {
return err
}
//log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "baseFee", baseFee, "blockHeight", blockHeight)
@ -952,7 +853,7 @@ func (p *TxPool) OnNewBlock(tx kv.Tx, stateChanges map[string]sender, unwindTxs,
return err
}
if err := onNewBlock(tx, p.senders, unwindTxs, minedTxs.txs, protocolBaseFee, baseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil {
if err := onNewBlock(p.senders, unwindTxs, minedTxs.txs, protocolBaseFee, baseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil {
return err
}
@ -982,7 +883,7 @@ func (p *TxPool) discardLocked(mt *metaTx) {
p.isLocalHashLRU.Add(string(mt.Tx.idHash[:]), struct{}{})
}
}
func onNewBlock(tx kv.Tx, senders *sendersBatch, unwindTxs TxSlots, minedTxs []*TxSlot, protocolBaseFee, pendingBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, discard func(*metaTx)) error {
func onNewBlock(senders *sendersBatch, unwindTxs TxSlots, minedTxs []*TxSlot, protocolBaseFee, pendingBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, discard func(*metaTx)) error {
for i := range unwindTxs.txs {
if unwindTxs.txs[i].senderID == 0 {
return fmt.Errorf("onNewBlock.unwindTxs: senderID can't be zero")
@ -1009,10 +910,7 @@ func onNewBlock(tx kv.Tx, senders *sendersBatch, unwindTxs TxSlots, minedTxs []*
// time (up to some "immutability threshold").
changedSenders := unsafeAddToPendingPool(byNonce, unwindTxs, pending, baseFee, queued, byHash, discard)
for id := range changedSenders {
sender, err := senders.info(id, tx, false)
if err != nil {
return err
}
sender := senders.info(id, false)
onSenderChange(id, sender, byNonce, protocolBaseFee, pendingBaseFee)
}
@ -1507,7 +1405,6 @@ func (p *WorstQueue) Pop() interface{} {
// promote/demote transactions
// reorgs
func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs chan Hashes, send *Send, newSlotsStreams *NewSlotsStreams, notifyMiningAboutNewSlots func()) {
//db.Update(ctx, func(tx kv.RwTx) error { return tx.ClearBucket(kv.PooledSender) })
if err := db.Update(ctx, func(tx kv.RwTx) error {
return coreDB.View(ctx, func(coreTx kv.Tx) error {
return p.fromDB(ctx, tx, coreTx)
@ -1515,9 +1412,7 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
}); err != nil {
log.Error("[txpool] restore from db", "err", err)
}
if err := db.View(ctx, func(tx kv.Tx) error { return p.logStats(tx) }); err != nil {
log.Error("[txpool] log stats", "err", err)
}
p.logStats()
//if ASSERT {
// go func() {
// if err := p.forceCheckState(ctx, db, coreDB); err != nil {
@ -1543,11 +1438,9 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
case <-ctx.Done():
return
case <-logEvery.C:
if err := db.View(ctx, func(tx kv.Tx) error { return p.logStats(tx) }); err != nil {
log.Error("[txpool] log stats", "err", err)
}
p.logStats()
case <-processRemoteTxsEvery.C:
if err := db.View(ctx, func(tx kv.Tx) error { return p.processRemoteTxs(ctx, tx) }); err != nil {
if err := p.processRemoteTxs(ctx); err != nil {
if s, ok := status.FromError(err); ok && retryLater(s.Code()) {
continue
}
@ -1559,14 +1452,13 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
case <-commitEvery.C:
if db != nil {
t := time.Now()
evicted, written, err := p.flush(db)
written, err := p.flush(db)
if err != nil {
log.Error("[txpool] flush is local history", "err", err)
continue
}
writeToDbBytesCounter.Set(written)
sendersEvictedCounter.Set(evicted)
log.Info("[txpool] flush", "written_kb", written/1024, "evicted", evicted, "in", time.Since(t))
log.Info("[txpool] Commit", "written_kb", written/1024, "in", time.Since(t))
}
case h := <-newTxs: //TODO: maybe send TxSlots object instead of Hashes?
notifyMiningAboutNewSlots()
@ -1620,6 +1512,7 @@ func coreProgress(coreTx kv.Tx) (uint64, error) {
}
//nolint
/*
func (p *TxPool) forceCheckState(ctx context.Context, db, coreDB kv.RoDB) error {
for {
if err := db.View(ctx, func(tx kv.Tx) error {
@ -1655,6 +1548,7 @@ func (p *TxPool) forceCheckState(ctx context.Context, db, coreDB kv.RoDB) error
}
}
}
*/
//nolint
func copyBytes(b []byte) (copiedBytes []byte) {
@ -1663,13 +1557,13 @@ func copyBytes(b []byte) (copiedBytes []byte) {
return
}
func (p *TxPool) flush(db kv.RwDB) (evicted, written uint64, err error) {
func (p *TxPool) flush(db kv.RwDB) (written uint64, err error) {
defer writeToDbTimer.UpdateDuration(time.Now())
p.lock.Lock()
defer p.lock.Unlock()
//it's important that write db tx is done inside lock, to make last writes visible for all read operations
if err := db.Update(context.Background(), func(tx kv.RwTx) error {
evicted, err = p.flushLocked(tx)
err = p.flushLocked(tx)
if err != nil {
return err
}
@ -1679,18 +1573,18 @@ func (p *TxPool) flush(db kv.RwDB) (evicted, written uint64, err error) {
}
return nil
}); err != nil {
return 0, 0, err
return 0, err
}
return evicted, written, nil
return written, nil
}
func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) {
func (p *TxPool) flushLocked(tx kv.RwTx) (err error) {
sendersWithoutTransactions := roaring64.New()
for i := 0; i < len(p.deletedTxs); i++ {
if p.byNonce.count(p.deletedTxs[i].Tx.senderID) == 0 {
sendersWithoutTransactions.Add(p.deletedTxs[i].Tx.senderID)
}
if err := tx.Delete(kv.PoolTransaction, p.deletedTxs[i].Tx.idHash[:], nil); err != nil {
return evicted, err
return err
}
p.deletedTxs[i] = nil // for gc
}
@ -1698,12 +1592,12 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) {
txHashes := p.isLocalHashLRU.Keys()
encID := make([]byte, 8)
if err := tx.ClearBucket(kv.RecentLocalTransaction); err != nil {
return evicted, err
return err
}
for i := range txHashes {
binary.BigEndian.PutUint64(encID, uint64(i))
if err := tx.Append(kv.RecentLocalTransaction, encID, txHashes[i].([]byte)); err != nil {
return evicted, err
return err
}
}
@ -1712,27 +1606,32 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) {
if metaTx.Tx.rlp == nil {
continue
}
v = ensureEnoughSize(v, 8+len(metaTx.Tx.rlp))
binary.BigEndian.PutUint64(v, metaTx.Tx.senderID)
copy(v[8:], metaTx.Tx.rlp)
v = ensureEnoughSize(v, 20+len(metaTx.Tx.rlp))
for addr, id := range p.senders.senderIDs { // no inverted index - tradeoff flush speed for memory usage
if id == metaTx.Tx.senderID {
copy(v[:20], addr)
break
}
}
if err := tx.Put(kv.PoolTransaction, []byte(txHash), v); err != nil {
return evicted, err
return err
}
metaTx.Tx.rlp = nil
}
binary.BigEndian.PutUint64(encID, p.protocolBaseFee.Load())
if err := tx.Put(kv.PoolInfo, PoolProtocolBaseFeeKey, encID); err != nil {
return evicted, err
return err
}
binary.BigEndian.PutUint64(encID, p.currentBaseFee.Load())
if err := tx.Put(kv.PoolInfo, PoolPendingBaseFeeKey, encID); err != nil {
return evicted, err
return err
}
evicted, err = p.senders.flush(tx, p.byNonce, sendersWithoutTransactions, p.cfg.EvictSendersAfterRounds)
err = p.senders.flush(tx)
if err != nil {
return evicted, err
return err
}
// clean - in-memory data structure as later as possible - because if during this Tx will happen error,
@ -1740,154 +1639,20 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) {
// failed write transaction must not create side-effects
p.deletedTxs = p.deletedTxs[:0]
p.discardReasons = p.discardReasons[:0]
return evicted, nil
return nil
}
func (sc *sendersBatch) flush(tx kv.RwTx, byNonce *ByNonce, sendersWithoutTransactions *roaring64.Bitmap, evictAfterRounds uint64) (evicted uint64, err error) {
sc.commitID++
var justDeleted, justInserted []uint64
func (sc *sendersBatch) flush(tx kv.RwTx) (err error) {
encID := make([]byte, 8)
for addr, id := range sc.senderIDs {
binary.BigEndian.PutUint64(encID, id)
currentV, err := tx.GetOne(kv.PoolSenderID, []byte(addr))
if err != nil {
return evicted, err
}
if currentV != nil && bytes.Equal(currentV, encID) {
continue
}
//fmt.Printf("Put: %d\n", id)
if err := tx.Put(kv.PoolSenderID, []byte(addr), encID); err != nil {
return evicted, err
}
if err := tx.Put(kv.PoolSenderIDToAdress, encID, []byte(addr)); err != nil {
return evicted, err
}
if ASSERT {
justInserted = append(justInserted, id)
}
if byNonce.count(id) == 0 {
sendersWithoutTransactions.Add(id)
}
}
if ASSERT {
sort.Slice(justInserted, func(i, j int) bool { return justInserted[i] < justInserted[j] })
}
v := make([]byte, 8, 8+32)
for id, info := range sc.senderInfo {
//if info.nonce == 0 && info.balance.IsZero() {
// continue
//}
binary.BigEndian.PutUint64(encID, id)
binary.BigEndian.PutUint64(v, info.nonce)
v = append(v[:8], info.balance.Bytes()...)
enc, err := tx.GetOne(kv.PoolSender, encID)
if err != nil {
return evicted, err
}
if bytes.Equal(enc, v) {
continue
}
if err := tx.Put(kv.PoolSender, encID, v); err != nil {
return evicted, err
}
}
//fmt.Printf("justDeleted:%d, justInserted:%d\n", justDeleted, justInserted)
binary.BigEndian.PutUint64(encID, sc.commitID)
// Eviction logic. Store into db list of senders:
// - which have discarded transactions at this commit
// - but have no active transactions left
// after some time read old records from DB and if such senders still have no transactions - evict them
if sendersWithoutTransactions.GetCardinality() > 0 {
sendersWithoutTransactions.RunOptimize()
b, err := sendersWithoutTransactions.MarshalBinary()
if err != nil {
return 0, err
}
if err := tx.Append(kv.PoolStateEviction, encID, b); err != nil {
return evicted, err
}
}
c, err := tx.RwCursor(kv.PoolStateEviction)
if err != nil {
return evicted, err
}
defer c.Close()
for k, v, err := c.First(); k != nil; k, v, err = c.Next() {
if err != nil {
return evicted, err
}
if sc.commitID-binary.BigEndian.Uint64(k) < evictAfterRounds {
break
}
ids := roaring64.New()
if err := ids.UnmarshalBinary(v); err != nil {
return 0, err
}
for _, senderID := range ids.ToArray() {
if byNonce.count(senderID) > 0 {
continue
}
binary.BigEndian.PutUint64(encID, senderID)
addr, err := tx.GetOne(kv.PoolSenderIDToAdress, encID)
if err != nil {
return evicted, err
}
if addr == nil {
continue
}
if err := tx.Delete(kv.PoolSenderID, addr, nil); err != nil {
return evicted, err
}
if err := tx.Delete(kv.PoolSenderIDToAdress, encID, nil); err != nil {
return evicted, err
}
if err := tx.Delete(kv.PoolSender, encID, nil); err != nil {
return evicted, err
}
evicted++
if ASSERT {
justDeleted = append(justDeleted, senderID) //nolint
}
}
if err := c.DeleteCurrent(); err != nil {
return evicted, err
}
}
//fmt.Printf("justDeleted:%d, justInserted:%d\n", justDeleted, justInserted)
binary.BigEndian.PutUint64(encID, sc.blockHeight.Load())
if err := tx.Put(kv.PoolInfo, SenderCacheHeightKey, encID); err != nil {
return evicted, err
return err
}
if err := tx.Put(kv.PoolInfo, SenderCacheHashKey, []byte(sc.blockHash.Load())); err != nil {
return evicted, err
}
binary.BigEndian.PutUint64(encID, sc.senderID)
if err := tx.Put(kv.PoolInfo, SenderCacheIDKey, encID); err != nil {
return evicted, err
}
binary.BigEndian.PutUint64(encID, sc.commitID)
if err := tx.Put(kv.PoolInfo, SenderCommitIDKey, encID); err != nil {
return evicted, err
}
lastCommitTime, err := time.Now().MarshalBinary()
if err != nil {
return evicted, err
}
if err := tx.Put(kv.PoolInfo, SenderCommitTimeKey, lastCommitTime); err != nil {
return evicted, err
return err
}
sc.senderIDs = map[string]uint64{}
sc.senderInfo = map[uint64]*sender{}
return evicted, nil
return nil
}
func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error {
@ -1908,27 +1673,29 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error {
txs := TxSlots{}
parseCtx := NewTxParseContext()
parseCtx.WithSender(false)
i := 0
if err := tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error {
addr, txRlp := v[:20], v[20:]
txs.Resize(uint(i + 1))
txs.txs[i] = &TxSlot{}
_, err := parseCtx.ParseTransaction(v[8:], 0, txs.txs[i], nil)
_, err := parseCtx.ParseTransaction(txRlp, 0, txs.txs[i], nil)
if err != nil {
return fmt.Errorf("err: %w, rlp: %x\n", err, v[8:])
return fmt.Errorf("err: %w, rlp: %x\n", err, txRlp)
}
txs.txs[i].rlp = nil // means that we don't need store it in db anymore
txs.txs[i].senderID = binary.BigEndian.Uint64(v)
copy(txs.senders.At(i), addr)
senderAddr, err := tx.GetOne(kv.PoolSenderIDToAdress, v[:8])
if err != nil {
return err
id, ok := p.senders.senderIDs[string(addr)]
if !ok {
p.senders.senderID++
id = p.senders.senderID
p.senders.senderIDs[string(addr)] = id
}
if len(senderAddr) == 0 {
panic("must not happen")
}
copy(txs.senders.At(i), senderAddr)
//bkock num = binary.BigEndian.Uint64(v[8:])
txs.txs[i].senderID = id
binary.BigEndian.Uint64(v)
_, isLocalTx := p.isLocalHashLRU.Get(string(k))
txs.isLocal[i] = isLocalTx
i++
@ -1956,7 +1723,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error {
currentBaseFee = binary.BigEndian.Uint64(v)
}
}
cacheMisses, err := p.senders.onNewTxs(tx, txs)
cacheMisses, err := p.senders.onNewTxs(txs)
if err != nil {
return err
}
@ -1965,7 +1732,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error {
return err
}
}
if err := onNewTxs(tx, p.senders, txs, protocolBaseFee, currentBaseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil {
if err := onNewTxs(p.senders, txs, protocolBaseFee, currentBaseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil {
return err
}
p.currentBaseFee.Store(currentBaseFee)
@ -1993,26 +1760,8 @@ func (sc *sendersBatch) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) er
sc.blockHash.Store(string(v))
}
}
{
v, err := tx.GetOne(kv.PoolInfo, SenderCacheIDKey)
if err != nil {
return err
}
if len(v) > 0 {
sc.senderID = binary.BigEndian.Uint64(v)
}
}
{
v, err := tx.GetOne(kv.PoolInfo, SenderCommitIDKey)
if err != nil {
return err
}
if len(v) > 0 {
sc.commitID = binary.BigEndian.Uint64(v)
}
}
if err := sc.syncMissedStateDiff(ctx, tx, coreTx, 0); err != nil {
if err := sc.syncMissedStateDiff(ctx, coreTx, 0); err != nil {
return err
}
return nil
@ -2056,9 +1805,6 @@ func changesets(ctx context.Context, from uint64, coreTx kv.Tx) (map[string]send
return diff, nil
}
var SenderCommitTimeKey = []byte("sender_commit_time")
var SenderCacheIDKey = []byte("sender_cache_id")
var SenderCommitIDKey = []byte("sender_commit_id")
var SenderCacheHeightKey = []byte("sender_cache_block_height")
var SenderCacheHashKey = []byte("sender_cache_block_hash")
var PoolPendingBaseFeeKey = []byte("pending_base_fee")

View File

@ -295,7 +295,7 @@ func splitDataset(in TxSlots) (TxSlots, TxSlots, TxSlots, TxSlots) {
return p1, p2, p3, p4
}
func FuzzOnNewBlocks12(f *testing.F) {
func FuzzOnNewBlocks(f *testing.F) {
var u64 = [1 * 4]byte{1}
var senderAddr = [1 + 1 + 1]byte{1}
f.Add(u64[:], u64[:], u64[:], u64[:], senderAddr[:], 12)
@ -494,13 +494,7 @@ func FuzzOnNewBlocks12(f *testing.F) {
}
prevTotal = pending.Len() + baseFee.Len() + queued.Len()
}
checkDB := func(tx kv.Tx) {
c1, _ := tx.Cursor(kv.PoolSenderID)
c2, _ := tx.Cursor(kv.PoolSenderIDToAdress)
count1, _ := c1.Count()
count2, _ := c2.Count()
assert.Equal(count1, count2)
}
//TODO: check that id=>addr and addr=>id mappings have same len
//fmt.Printf("-------\n")
tx, err := db.BeginRw(context.Background())
@ -510,48 +504,47 @@ func FuzzOnNewBlocks12(f *testing.F) {
// go to first fork
//fmt.Printf("ll1: %d,%d,%d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len())
txs1, txs2, p2pReceived, txs3 := splitDataset(txs)
err = pool.OnNewBlock(tx, map[string]sender{}, txs1, TxSlots{}, currentBaseFee, 1, [32]byte{})
err = pool.OnNewBlock(map[string]sender{}, txs1, TxSlots{}, currentBaseFee, 1, [32]byte{})
assert.NoError(err)
check(txs1, TxSlots{}, "fork1")
checkNotify(txs1, TxSlots{}, "fork1")
_, _, _ = p2pReceived, txs2, txs3
err = pool.OnNewBlock(tx, map[string]sender{}, TxSlots{}, txs2, currentBaseFee, 1, [32]byte{})
err = pool.OnNewBlock(map[string]sender{}, TxSlots{}, txs2, currentBaseFee, 1, [32]byte{})
check(TxSlots{}, txs2, "fork1 mined")
checkNotify(TxSlots{}, txs2, "fork1 mined")
// unwind everything and switch to new fork (need unwind mined now)
err = pool.OnNewBlock(tx, map[string]sender{}, txs2, TxSlots{}, currentBaseFee, 2, [32]byte{})
err = pool.OnNewBlock(map[string]sender{}, txs2, TxSlots{}, currentBaseFee, 2, [32]byte{})
assert.NoError(err)
check(txs2, TxSlots{}, "fork2")
checkNotify(txs2, TxSlots{}, "fork2")
err = pool.OnNewBlock(tx, map[string]sender{}, TxSlots{}, txs3, currentBaseFee, 2, [32]byte{})
err = pool.OnNewBlock(map[string]sender{}, TxSlots{}, txs3, currentBaseFee, 2, [32]byte{})
assert.NoError(err)
check(TxSlots{}, txs3, "fork2 mined")
checkNotify(TxSlots{}, txs3, "fork2 mined")
// add some remote txs from p2p
pool.AddRemoteTxs(context.Background(), p2pReceived)
err = pool.processRemoteTxs(context.Background(), tx)
err = pool.processRemoteTxs(context.Background())
assert.NoError(err)
check(p2pReceived, TxSlots{}, "p2pmsg1")
checkNotify(p2pReceived, TxSlots{}, "p2pmsg1")
senderIdsBeforeFlush := len(pool.senders.senderIDs)
senderInfoBeforeFlush := len(pool.senders.senderInfo)
//senderIdsBeforeFlush := len(pool.senders.senderIDs)
//senderInfoBeforeFlush := len(pool.senders.senderInfo)
_, err = pool.flushLocked(tx) // we don't test eviction here, because dedicated test exists
err = pool.flushLocked(tx) // we don't test eviction here, because dedicated test exists
require.NoError(err)
check(p2pReceived, TxSlots{}, "after_flush")
checkDB(tx)
//checkNotify(p2pReceived, TxSlots{}, "after_flush")
p2, err := New(ch, nil, DefaultConfig)
assert.NoError(err)
p2.senders = pool.senders // senders are not persisted
err = p2.fromDB(context.Background(), tx, nil)
require.NoError(err)
checkDB(tx)
for _, txn := range p2.byHash {
assert.Nil(txn.Tx.rlp)
}
@ -559,15 +552,13 @@ func FuzzOnNewBlocks12(f *testing.F) {
check(txs2, TxSlots{}, "fromDB")
//checkNotify(txs2, TxSlots{}, "fromDB")
assert.Equal(pool.senders.senderID, p2.senders.senderID)
assert.Equal(pool.senders.blockHeight.Load(), p2.senders.blockHeight.Load())
//assert.Equal(pool.senders.senderID, p2.senders.senderID)
//assert.Equal(pool.senders.blockHeight.Load(), p2.senders.blockHeight.Load())
idsCountAfterFlush, idsCountInDbAfterFlush, err := p2.senders.idsCount(tx)
assert.NoError(err)
assert.LessOrEqual(senderIdsBeforeFlush, idsCountAfterFlush+idsCountInDbAfterFlush)
infoCountAfterFlush, infoCountInDbAfterFlush, err := p2.senders.infoCount(tx)
assert.NoError(err)
assert.LessOrEqual(senderInfoBeforeFlush, infoCountAfterFlush+infoCountInDbAfterFlush)
//idsCountAfterFlush := p2.senders.idsCount()
//assert.LessOrEqual(senderIdsBeforeFlush, idsCountAfterFlush)
//infoCountAfterFlush := p2.senders.infoCount()
//assert.LessOrEqual(senderInfoBeforeFlush, infoCountAfterFlush)
assert.Equal(pool.pending.Len(), p2.pending.Len())
assert.Equal(pool.baseFee.Len(), p2.baseFee.Len())

View File

@ -21,7 +21,6 @@ import (
"testing"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/google/btree"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/stretchr/testify/require"
@ -31,7 +30,6 @@ func TestSenders(t *testing.T) {
t.Run("evict_all_on_next_round", func(t *testing.T) {
senders, require := newSendersCache(), require.New(t)
_, tx := memdb.NewTestPoolTx(t)
byNonce := &ByNonce{btree.New(16)}
changed := roaring64.New()
senders.senderIDs[fmt.Sprintf("%020x", 1)] = 1
@ -40,20 +38,16 @@ func TestSenders(t *testing.T) {
senders.senderInfo[2] = newSender(1, *uint256.NewInt(1))
changed.AddMany([]uint64{1, 2})
evicted, err := senders.flush(tx, byNonce, changed, 1)
err := senders.flush(tx)
require.NoError(err)
require.Zero(evicted)
changed.Clear()
evicted, err = senders.flush(tx, byNonce, changed, 1)
err = senders.flush(tx)
require.NoError(err)
require.Equal(2, int(evicted))
})
t.Run("evict_even_if_used_in_current_round_but_no_txs", func(t *testing.T) {
senders, require := newSendersCache(), require.New(t)
_, tx := memdb.NewTestPoolTx(t)
byNonce := &ByNonce{btree.New(16)}
senders.senderInfo[1] = newSender(1, *uint256.NewInt(1))
senders.senderIDs[fmt.Sprintf("%020x", 1)] = 1
@ -62,17 +56,15 @@ func TestSenders(t *testing.T) {
changed := roaring64.New()
changed.AddMany([]uint64{1, 2})
evicted, err := senders.flush(tx, byNonce, changed, 1)
err := senders.flush(tx)
require.NoError(err)
require.Zero(evicted)
senders.senderInfo[1] = newSender(1, *uint256.NewInt(1)) // means used in current round, but still has 0 transactions
senders.senderIDs[fmt.Sprintf("%020x", 1)] = 1
changed.Clear()
changed.AddMany([]uint64{1})
evicted, err = senders.flush(tx, byNonce, changed, 1)
err = senders.flush(tx)
require.NoError(err)
require.Equal(2, int(evicted))
})
}

View File

@ -20,7 +20,7 @@ import (
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler))
//}
func FuzzParseTx1(f *testing.F) {
func FuzzParseTx(f *testing.F) {
f.Add([]byte{1}, 0)
f.Fuzz(func(t *testing.T, in []byte, pos int) {
t.Parallel()