diff --git a/gointerfaces/consensus/consensus.pb.go b/gointerfaces/consensus/consensus.pb.go index 4b9d8b240..167ac5e66 100644 --- a/gointerfaces/consensus/consensus.pb.go +++ b/gointerfaces/consensus/consensus.pb.go @@ -7,13 +7,12 @@ package consensus import ( - "reflect" - "sync" - - "github.com/ledgerwatch/erigon-lib/gointerfaces/types" - "google.golang.org/protobuf/reflect/protoreflect" - "google.golang.org/protobuf/runtime/protoimpl" - "google.golang.org/protobuf/types/known/emptypb" + types "github.com/ledgerwatch/erigon-lib/gointerfaces/types" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" ) const ( diff --git a/gointerfaces/remote/ethbackend.pb.go b/gointerfaces/remote/ethbackend.pb.go index 928c58cc9..acc4fe5f8 100644 --- a/gointerfaces/remote/ethbackend.pb.go +++ b/gointerfaces/remote/ethbackend.pb.go @@ -7,13 +7,12 @@ package remote import ( - "reflect" - "sync" - - "github.com/ledgerwatch/erigon-lib/gointerfaces/types" - "google.golang.org/protobuf/reflect/protoreflect" - "google.golang.org/protobuf/runtime/protoimpl" - "google.golang.org/protobuf/types/known/emptypb" + types "github.com/ledgerwatch/erigon-lib/gointerfaces/types" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" ) const ( diff --git a/kv/tables.go b/kv/tables.go index 83f779ff6..e48ce28c0 100644 --- a/kv/tables.go +++ b/kv/tables.go @@ -346,21 +346,13 @@ var ChaindataTables = []string{ const ( RecentLocalTransaction = "RecentLocalTransaction" // sequence_u64 -> tx_hash - PoolSenderID = "PoolSenderID" // sender_20bytes -> sender_id_u64 - PoolSenderIDToAdress = "PoolSenderIDToAddress" // sender_id_u64 -> sender_20bytes - PoolSender = "PoolSender" // sender_id_u64 -> nonce, balance PoolTransaction = "PoolTransaction" // txHash -> sender_id_u64+tx_rlp - PoolStateEviction = "PoolStateEviction" // commit_id_u64 -> roaring([sender_id_u64]) - list of senders who had no transactions at this time, if after some time they still have no transactions - evict them. PoolInfo = "PoolInfo" // option_key -> option_value ) var TxPoolTables = []string{ RecentLocalTransaction, - PoolSenderID, - PoolSenderIDToAdress, - PoolSender, PoolTransaction, - PoolStateEviction, PoolInfo, } var SentryTables = []string{} diff --git a/txpool/fetch.go b/txpool/fetch.go index e7c3ce021..71e556048 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -185,7 +185,7 @@ func (f *Fetch) receiveMessage(ctx context.Context, sentryClient sentry.SentryCl if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { continue } - log.Warn("[txpool.fetch] Handling incoming message", "msg", req.Id.String(), "err", err) + log.Warn("[txpool.fetch] Handling incoming message", "msg", req.Id.String(), "err", err, "rlp", fmt.Sprintf("%x", req.Data)) } if f.wg != nil { f.wg.Done() @@ -307,11 +307,11 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes }) switch req.Id { case sentry.MessageId_POOLED_TRANSACTIONS_65: - if _, _, err := ParsePooledTransactions66(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil { + if _, err := ParsePooledTransactions65(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil { return err } case sentry.MessageId_POOLED_TRANSACTIONS_66: - if _, err := ParsePooledTransactions65(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil { + if _, _, err := ParsePooledTransactions66(req.Data, 0, f.pooledTxsParseCtx, &txs); err != nil { return err } default: @@ -456,7 +456,7 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien diff[string(addr[:])] = sender{nonce: nonce, balance: balance} } if err := f.db.View(ctx, func(tx kv.Tx) error { - return f.pool.OnNewBlock(tx, diff, unwindTxs, minedTxs, req.ProtocolBaseFee, req.BlockHeight, gointerfaces.ConvertH256ToHash(req.BlockHash)) + return f.pool.OnNewBlock(diff, unwindTxs, minedTxs, req.ProtocolBaseFee, req.BlockHeight, gointerfaces.ConvertH256ToHash(req.BlockHash)) }); err != nil { log.Warn("onNewBlock", "err", err) } diff --git a/txpool/grpc_server.go b/txpool/grpc_server.go index 611519106..f58d3608f 100644 --- a/txpool/grpc_server.go +++ b/txpool/grpc_server.go @@ -25,7 +25,7 @@ var TxPoolAPIVersion = &types2.VersionReply{Major: 1, Minor: 0, Patch: 0} type txPool interface { GetRlp(tx kv.Tx, hash []byte) ([]byte, error) - AddLocals(ctx context.Context, newTxs TxSlots, tx kv.Tx) ([]DiscardReason, error) + AddLocals(ctx context.Context, newTxs TxSlots) ([]DiscardReason, error) DeprecatedForEach(_ context.Context, f func(rlp, sender []byte, t SubPoolType), tx kv.Tx) error CountContent() (int, int, int) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) @@ -123,7 +123,7 @@ func (s *GrpcServer) Add(ctx context.Context, in *txpool_proto.AddRequest) (*txp } reply := &txpool_proto.AddReply{Imported: make([]txpool_proto.ImportResult, len(in.RlpTxs)), Errors: make([]string, len(in.RlpTxs))} - discardReasons, err := s.txPool.AddLocals(ctx, slots, tx) + discardReasons, err := s.txPool.AddLocals(ctx, slots) if err != nil { return nil, err } diff --git a/txpool/mocks_test.go b/txpool/mocks_test.go index 620f6de40..56b02a7e6 100644 --- a/txpool/mocks_test.go +++ b/txpool/mocks_test.go @@ -31,7 +31,7 @@ var _ Pool = &PoolMock{} // IdHashKnownFunc: func(tx kv.Tx, hash []byte) (bool, error) { // panic("mock out the IdHashKnown method") // }, -// OnNewBlockFunc: func(tx kv.Tx, stateChanges map[string]sender, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte) error { +// OnNewBlockFunc: func(stateChanges map[string]sender, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte) error { // panic("mock out the OnNewBlock method") // }, // StartedFunc: func() bool { @@ -57,7 +57,7 @@ type PoolMock struct { IdHashKnownFunc func(tx kv.Tx, hash []byte) (bool, error) // OnNewBlockFunc mocks the OnNewBlock method. - OnNewBlockFunc func(tx kv.Tx, stateChanges map[string]sender, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte) error + OnNewBlockFunc func(stateChanges map[string]sender, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte) error // StartedFunc mocks the Started method. StartedFunc func() bool @@ -92,8 +92,6 @@ type PoolMock struct { } // OnNewBlock holds details about calls to the OnNewBlock method. OnNewBlock []struct { - // Tx is the tx argument value. - Tx kv.Tx // StateChanges is the stateChanges argument value. StateChanges map[string]sender // UnwindTxs is the unwindTxs argument value. @@ -264,9 +262,8 @@ func (mock *PoolMock) IdHashKnownCalls() []struct { } // OnNewBlock calls OnNewBlockFunc. -func (mock *PoolMock) OnNewBlock(tx kv.Tx, stateChanges map[string]sender, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte) error { +func (mock *PoolMock) OnNewBlock(stateChanges map[string]sender, unwindTxs TxSlots, minedTxs TxSlots, baseFee uint64, blockHeight uint64, blockHash [32]byte) error { callInfo := struct { - Tx kv.Tx StateChanges map[string]sender UnwindTxs TxSlots MinedTxs TxSlots @@ -274,7 +271,6 @@ func (mock *PoolMock) OnNewBlock(tx kv.Tx, stateChanges map[string]sender, unwin BlockHeight uint64 BlockHash [32]byte }{ - Tx: tx, StateChanges: stateChanges, UnwindTxs: unwindTxs, MinedTxs: minedTxs, @@ -291,14 +287,13 @@ func (mock *PoolMock) OnNewBlock(tx kv.Tx, stateChanges map[string]sender, unwin ) return errOut } - return mock.OnNewBlockFunc(tx, stateChanges, unwindTxs, minedTxs, baseFee, blockHeight, blockHash) + return mock.OnNewBlockFunc(stateChanges, unwindTxs, minedTxs, baseFee, blockHeight, blockHash) } // OnNewBlockCalls gets all the calls that were made to OnNewBlock. // Check the length with: // len(mockedPool.OnNewBlockCalls()) func (mock *PoolMock) OnNewBlockCalls() []struct { - Tx kv.Tx StateChanges map[string]sender UnwindTxs TxSlots MinedTxs TxSlots @@ -307,7 +302,6 @@ func (mock *PoolMock) OnNewBlockCalls() []struct { BlockHash [32]byte } { var calls []struct { - Tx kv.Tx StateChanges map[string]sender UnwindTxs TxSlots MinedTxs TxSlots diff --git a/txpool/pool.go b/txpool/pool.go index 441ab64ff..4742094cd 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -51,7 +51,6 @@ var ( cacheTotalCounter = metrics.GetOrCreateCounter(`pool_cache_total`) cacheHitCounter = metrics.GetOrCreateCounter(`pool_cache_total{result="hit"}`) writeToDbBytesCounter = metrics.GetOrCreateCounter(`pool_write_to_db_bytes`) - sendersEvictedCounter = metrics.GetOrCreateCounter(`pool_senders_evicted`) ) const ASSERT = false @@ -82,7 +81,7 @@ type Pool interface { Started() bool GetRlp(tx kv.Tx, hash []byte) ([]byte, error) AddRemoteTxs(ctx context.Context, newTxs TxSlots) - OnNewBlock(tx kv.Tx, stateChanges map[string]sender, unwindTxs, minedTxs TxSlots, baseFee, blockHeight uint64, blockHash [32]byte) error + OnNewBlock(stateChanges map[string]sender, unwindTxs, minedTxs TxSlots, baseFee, blockHeight uint64, blockHash [32]byte) error AddNewGoodPeer(peerID PeerID) } @@ -176,7 +175,6 @@ type sendersBatch struct { blockHeight atomic.Uint64 blockHash atomic.String senderID uint64 - commitID uint64 senderIDs map[string]uint64 senderInfo map[uint64]*sender } @@ -186,67 +184,25 @@ func newSendersCache() *sendersBatch { } //nolint -func (sc *sendersBatch) idsCount(tx kv.Tx) (inMem int, inDb int, err error) { - c, err := tx.Cursor(kv.PoolSenderID) - if err != nil { - return 0, 0, err - } - inDB, err := c.Count() - if err != nil { - return 0, 0, err - } - return len(sc.senderIDs), int(inDB), nil -} +func (sc *sendersBatch) idsCount() (inMem int) { return len(sc.senderIDs) } //nolint -func (sc *sendersBatch) infoCount(tx kv.Tx) (inMem int, inDb int, err error) { - c, err := tx.Cursor(kv.PoolSender) - if err != nil { - return 0, 0, err - } - inDB, err := c.Count() - if err != nil { - return 0, 0, err - } - return len(sc.senderInfo), int(inDB), nil -} -func (sc *sendersBatch) id(addr string, tx kv.Tx) (uint64, bool, error) { +func (sc *sendersBatch) infoCount() (inMem int) { return len(sc.senderInfo) } +func (sc *sendersBatch) id(addr string) (uint64, bool) { id, ok := sc.senderIDs[addr] - if !ok { - v, err := tx.GetOne(kv.PoolSenderID, []byte(addr)) - if err != nil { - return 0, false, err - } - if len(v) == 0 { - return 0, false, nil - } - id = binary.BigEndian.Uint64(v) - } - return id, true, nil + return id, ok } -func (sc *sendersBatch) info(id uint64, tx kv.Tx, expectMiss bool) (*sender, error) { +func (sc *sendersBatch) info(id uint64, expectMiss bool) *sender { + cacheTotalCounter.Inc() info, ok := sc.senderInfo[id] if ok { cacheHitCounter.Inc() - return info, nil + return info } - cacheTotalCounter.Inc() - encID := make([]byte, 8) - binary.BigEndian.PutUint64(encID, id) - v, err := tx.GetOne(kv.PoolSender, encID) - if err != nil { - return nil, err + if !expectMiss { + panic("all senders must be loaded in advance") } - if len(v) == 0 { - if !expectMiss { - panic("all senders must be loaded in advance") - } - return nil, nil // don't fallback to core db, it will be manually done in right place - } - cacheHitCounter.Inc() - balance := uint256.NewInt(0) - balance.SetBytes(v[8:]) - return newSender(binary.BigEndian.Uint64(v), *balance), nil + return nil } //nolint @@ -257,11 +213,11 @@ func (sc *sendersBatch) printDebug(prefix string) { } } -func (sc *sendersBatch) onNewTxs(tx kv.Tx, newTxs TxSlots) (cacheMisses map[uint64]string, err error) { - if err := sc.ensureSenderIDOnNewTxs(tx, newTxs); err != nil { +func (sc *sendersBatch) onNewTxs(newTxs TxSlots) (cacheMisses map[uint64]string, err error) { + if err := sc.ensureSenderIDOnNewTxs(newTxs); err != nil { return nil, err } - cacheMisses, err = sc.setTxSenderID(tx, newTxs) + cacheMisses, err = sc.setTxSenderID(newTxs) if err != nil { return nil, err } @@ -283,29 +239,26 @@ func (sc *sendersBatch) loadFromCore(coreTx kv.Tx, toLoad map[uint64]string) err return nil } -func (sc *sendersBatch) onNewBlock(tx kv.Tx, stateChanges map[string]sender, unwindTxs, minedTxs TxSlots, blockHeight uint64, blockHash [32]byte) error { +func (sc *sendersBatch) onNewBlock(stateChanges map[string]sender, unwindTxs, minedTxs TxSlots, blockHeight uint64, blockHash [32]byte) error { //TODO: if see non-continuous block heigh - load gap from changesets sc.blockHeight.Store(blockHeight) sc.blockHash.Store(string(blockHash[:])) //`loadSenders` goes by network to core - and it must be outside of sendersBatch lock. But other methods must be locked - if err := sc.mergeStateChanges(tx, stateChanges, unwindTxs, minedTxs); err != nil { + if err := sc.mergeStateChanges(stateChanges, unwindTxs, minedTxs); err != nil { return err } - if _, err := sc.setTxSenderID(tx, unwindTxs); err != nil { + if _, err := sc.setTxSenderID(unwindTxs); err != nil { return err } - if _, err := sc.setTxSenderID(tx, minedTxs); err != nil { + if _, err := sc.setTxSenderID(minedTxs); err != nil { return err } return nil } -func (sc *sendersBatch) mergeStateChanges(tx kv.Tx, stateChanges map[string]sender, unwindedTxs, minedTxs TxSlots) error { +func (sc *sendersBatch) mergeStateChanges(stateChanges map[string]sender, unwindedTxs, minedTxs TxSlots) error { for addr, v := range stateChanges { // merge state changes - id, ok, err := sc.id(addr, tx) - if err != nil { - return err - } + id, ok := sc.id(addr) if !ok { sc.senderID++ id = sc.senderID @@ -315,10 +268,7 @@ func (sc *sendersBatch) mergeStateChanges(tx kv.Tx, stateChanges map[string]send } for i := 0; i < unwindedTxs.senders.Len(); i++ { - id, ok, err := sc.id(string(unwindedTxs.senders.At(i)), tx) - if err != nil { - return err - } + id, ok := sc.id(string(unwindedTxs.senders.At(i))) if !ok { sc.senderID++ id = sc.senderID @@ -332,10 +282,7 @@ func (sc *sendersBatch) mergeStateChanges(tx kv.Tx, stateChanges map[string]send } for i := 0; i < len(minedTxs.txs); i++ { - id, ok, err := sc.id(string(minedTxs.senders.At(i)), tx) - if err != nil { - return err - } + id, ok := sc.id(string(minedTxs.senders.At(i))) if !ok { sc.senderID++ id = sc.senderID @@ -350,12 +297,9 @@ func (sc *sendersBatch) mergeStateChanges(tx kv.Tx, stateChanges map[string]send return nil } -func (sc *sendersBatch) ensureSenderIDOnNewTxs(tx kv.Tx, newTxs TxSlots) error { +func (sc *sendersBatch) ensureSenderIDOnNewTxs(newTxs TxSlots) error { for i := 0; i < len(newTxs.txs); i++ { - _, ok, err := sc.id(string(newTxs.senders.At(i)), tx) - if err != nil { - return err - } + _, ok := sc.id(string(newTxs.senders.At(i))) if ok { continue } @@ -365,26 +309,20 @@ func (sc *sendersBatch) ensureSenderIDOnNewTxs(tx kv.Tx, newTxs TxSlots) error { return nil } -func (sc *sendersBatch) setTxSenderID(tx kv.Tx, txs TxSlots) (map[uint64]string, error) { +func (sc *sendersBatch) setTxSenderID(txs TxSlots) (map[uint64]string, error) { toLoad := map[uint64]string{} for i := range txs.txs { addr := string(txs.senders.At(i)) // assign ID to each new sender - id, ok, err := sc.id(addr, tx) - if err != nil { - return nil, err - } + id, ok := sc.id(addr) if !ok { panic("not supported yet") } txs.txs[i].senderID = id // load data from db if need - info, err := sc.info(txs.txs[i].senderID, tx, true) - if err != nil { - return nil, err - } + info := sc.info(txs.txs[i].senderID, true) if info != nil { continue } @@ -396,24 +334,11 @@ func (sc *sendersBatch) setTxSenderID(tx kv.Tx, txs TxSlots) (map[uint64]string, } return toLoad, nil } -func (sc *sendersBatch) syncMissedStateDiff(ctx context.Context, tx kv.RwTx, coreTx kv.Tx, missedTo uint64) error { +func (sc *sendersBatch) syncMissedStateDiff(ctx context.Context, coreTx kv.Tx, missedTo uint64) error { dropLocalSendersCache := false if missedTo > 0 && missedTo-sc.blockHeight.Load() > 1024 { dropLocalSendersCache = true } - lastCommitTimeV, err := tx.GetOne(kv.PoolInfo, SenderCommitTimeKey) - if err != nil { - return err - } - lastCommitTime := time.Time{} - if len(lastCommitTimeV) > 0 { - if err := lastCommitTime.UnmarshalBinary(lastCommitTimeV); err != nil { - return err - } - if time.Since(lastCommitTime) > 3*24*time.Hour { - dropLocalSendersCache = true - } - } if coreTx != nil { ok, err := isCanonical(coreTx, sc.blockHeight.Load(), []byte(sc.blockHash.Load())) if err != nil { @@ -425,9 +350,6 @@ func (sc *sendersBatch) syncMissedStateDiff(ctx context.Context, tx kv.RwTx, cor } if dropLocalSendersCache { - if err := tx.ClearBucket(kv.PoolSender); err != nil { - return err - } sc.senderInfo = map[uint64]*sender{} } @@ -445,7 +367,7 @@ func (sc *sendersBatch) syncMissedStateDiff(ctx context.Context, tx kv.RwTx, cor if err != nil { return err } - if err := sc.mergeStateChanges(tx, diff, TxSlots{}, TxSlots{}); err != nil { + if err := sc.mergeStateChanges(diff, TxSlots{}, TxSlots{}); err != nil { return err } return nil @@ -575,29 +497,22 @@ func (p *TxPool) printDebug(prefix string) { (*p.queued.best)[i].Tx.printDebug(fmt.Sprintf("%s.queued : %b", prefix, (*p.queued.best)[i].subPool)) } } -func (p *TxPool) logStats(tx kv.Tx) error { +func (p *TxPool) logStats() { protocolBaseFee, currentBaseFee := p.protocolBaseFee.Load(), p.currentBaseFee.Load() p.lock.RLock() defer p.lock.RUnlock() - idsInMem, idsInDb, err := p.senders.idsCount(tx) - if err != nil { - return err - } - infoInMem, infoInDb, err := p.senders.infoCount(tx) - if err != nil { - return err - } + idsInMem := p.senders.idsCount() + infoInMem := p.senders.infoCount() var m runtime.MemStats runtime.ReadMemStats(&m) - log.Info(fmt.Sprintf("[txpool] baseFee: %d, %dm; queuesSize: pending=%d/%d, baseFee=%d/%d, queued=%d/%d; sendersBatch: id=%d+%d,info=%d+%d, alloc=%dMb, sys=%dMb\n", + log.Info(fmt.Sprintf("[txpool] baseFee: %d, %dm; queuesSize: pending=%d/%d, baseFee=%d/%d, queued=%d/%d; sendersBatch: id=%d,info=%d, alloc=%dMb, sys=%dMb\n", protocolBaseFee, currentBaseFee/1_000_000, p.pending.Len(), PendingSubPoolLimit, p.baseFee.Len(), BaseFeeSubPoolLimit, p.queued.Len(), QueuedSubPoolLimit, - idsInMem, idsInDb, infoInMem, infoInDb, + idsInMem, infoInMem, m.Alloc/1024/1024, m.Sys/1024/1024, )) - return nil } func (p *TxPool) GetRlp(tx kv.Tx, hash []byte) ([]byte, error) { p.lock.RLock() @@ -692,8 +607,8 @@ func (p *TxPool) Best(n uint16, txs *TxsRlp, tx kv.Tx) error { log.Warn("tx rlp not found") continue } - txs.Txs[i] = rlpTx[8:] - copy(txs.Senders.At(i), rlpTx[:8]) + txs.Txs[i] = rlpTx[20:] + copy(txs.Senders.At(i), rlpTx[:20]) txs.IsLocal[i] = best[i].subPool&IsLocal > 0 /* found := false @@ -733,7 +648,6 @@ func (p *TxPool) DeprecatedForEach(_ context.Context, f func(rlp, sender []byte, p.lock.RLock() defer p.lock.RUnlock() - encID := make([]byte, 8) p.byNonce.tree.Ascend(func(i btree.Item) bool { mt := i.(*sortByNonce).metaTx slot := mt.Tx @@ -748,7 +662,7 @@ func (p *TxPool) DeprecatedForEach(_ context.Context, f func(rlp, sender []byte, log.Error("[txpool] tx not found in db") return false } - slotRlp = v[8:] + slotRlp = v[20:] } var sender []byte @@ -761,16 +675,7 @@ func (p *TxPool) DeprecatedForEach(_ context.Context, f func(rlp, sender []byte, } } if !found { - binary.BigEndian.PutUint64(encID, slot.senderID) - v, err := tx.GetOne(kv.PoolSenderIDToAdress, encID) - if err != nil { - log.Error("[txpool] get sender from db", "err", err) - return false - } - if v == nil { - log.Error("[txpool] sender not found in db") - return false - } + return true } f(slotRlp, sender, mt.currentSubPool) return true @@ -789,7 +694,7 @@ func (p *TxPool) AddRemoteTxs(_ context.Context, newTxs TxSlots) { p.unprocessedRemoteTxs.Append(newTxs.txs[i], newTxs.senders.At(i), newTxs.isLocal[i]) } } -func (p *TxPool) AddLocals(ctx context.Context, newTxs TxSlots, tx kv.Tx) ([]DiscardReason, error) { +func (p *TxPool) AddLocals(ctx context.Context, newTxs TxSlots) ([]DiscardReason, error) { p.lock.Lock() defer p.lock.Unlock() discardReasonsIndex := len(p.discardReasons) @@ -797,8 +702,7 @@ func (p *TxPool) AddLocals(ctx context.Context, newTxs TxSlots, tx kv.Tx) ([]Dis for i := range newTxs.isLocal { newTxs.isLocal[i] = true } - - cacheMisses, err := p.senders.onNewTxs(tx, newTxs) + cacheMisses, err := p.senders.onNewTxs(newTxs) if err != nil { return nil, err } @@ -814,7 +718,7 @@ func (p *TxPool) AddLocals(ctx context.Context, newTxs TxSlots, tx kv.Tx) ([]Dis if !p.Started() { return nil, fmt.Errorf("pool not started yet") } - if err := onNewTxs(tx, p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil { + if err := onNewTxs(p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil { return nil, err } @@ -840,7 +744,7 @@ func (p *TxPool) copyDiscardReasons(from int) []DiscardReason { copy(cpy, p.discardReasons[from:]) return cpy } -func (p *TxPool) processRemoteTxs(ctx context.Context, tx kv.Tx) error { +func (p *TxPool) processRemoteTxs(ctx context.Context) error { p.lock.RLock() l := len(p.unprocessedRemoteTxs.txs) p.lock.RUnlock() @@ -854,7 +758,7 @@ func (p *TxPool) processRemoteTxs(ctx context.Context, tx kv.Tx) error { defer p.lock.Unlock() newTxs := *p.unprocessedRemoteTxs - cacheMisses, err := p.senders.onNewTxs(tx, newTxs) + cacheMisses, err := p.senders.onNewTxs(newTxs) if err != nil { return err } @@ -871,7 +775,7 @@ func (p *TxPool) processRemoteTxs(ctx context.Context, tx kv.Tx) error { return fmt.Errorf("txpool not started yet") } - if err := onNewTxs(tx, p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil { + if err := onNewTxs(p.senders, newTxs, p.protocolBaseFee.Load(), p.currentBaseFee.Load(), p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil { return err } @@ -899,7 +803,7 @@ func (p *TxPool) processRemoteTxs(ctx context.Context, tx kv.Tx) error { //log.Info("[txpool] on new txs", "amount", len(newTxs.txs), "in", time.Since(t)) return nil } -func onNewTxs(tx kv.Tx, senders *sendersBatch, newTxs TxSlots, protocolBaseFee, currentBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, discard func(*metaTx)) error { +func onNewTxs(senders *sendersBatch, newTxs TxSlots, protocolBaseFee, currentBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, discard func(*metaTx)) error { for i := range newTxs.txs { if newTxs.txs[i].senderID == 0 { return fmt.Errorf("senderID can't be zero") @@ -908,10 +812,7 @@ func onNewTxs(tx kv.Tx, senders *sendersBatch, newTxs TxSlots, protocolBaseFee, changedSenders := unsafeAddToPendingPool(byNonce, newTxs, pending, baseFee, queued, byHash, discard) for id := range changedSenders { - sender, err := senders.info(id, tx, false) - if err != nil { - return err - } + sender := senders.info(id, false) onSenderChange(id, sender, byNonce, protocolBaseFee, currentBaseFee) } @@ -934,14 +835,14 @@ func (p *TxPool) setBaseFee(baseFee uint64) (uint64, uint64) { return p.protocolBaseFee.Load(), p.currentBaseFee.Load() } -func (p *TxPool) OnNewBlock(tx kv.Tx, stateChanges map[string]sender, unwindTxs, minedTxs TxSlots, baseFee, blockHeight uint64, blockHash [32]byte) error { +func (p *TxPool) OnNewBlock(stateChanges map[string]sender, unwindTxs, minedTxs TxSlots, baseFee, blockHeight uint64, blockHash [32]byte) error { defer newBlockTimer.UpdateDuration(time.Now()) p.lock.Lock() defer p.lock.Unlock() t := time.Now() protocolBaseFee, baseFee := p.setBaseFee(baseFee) - if err := p.senders.onNewBlock(tx, stateChanges, unwindTxs, minedTxs, blockHeight, blockHash); err != nil { + if err := p.senders.onNewBlock(stateChanges, unwindTxs, minedTxs, blockHeight, blockHash); err != nil { return err } //log.Debug("[txpool] new block", "unwinded", len(unwindTxs.txs), "mined", len(minedTxs.txs), "baseFee", baseFee, "blockHeight", blockHeight) @@ -952,7 +853,7 @@ func (p *TxPool) OnNewBlock(tx kv.Tx, stateChanges map[string]sender, unwindTxs, return err } - if err := onNewBlock(tx, p.senders, unwindTxs, minedTxs.txs, protocolBaseFee, baseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil { + if err := onNewBlock(p.senders, unwindTxs, minedTxs.txs, protocolBaseFee, baseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil { return err } @@ -982,7 +883,7 @@ func (p *TxPool) discardLocked(mt *metaTx) { p.isLocalHashLRU.Add(string(mt.Tx.idHash[:]), struct{}{}) } } -func onNewBlock(tx kv.Tx, senders *sendersBatch, unwindTxs TxSlots, minedTxs []*TxSlot, protocolBaseFee, pendingBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, discard func(*metaTx)) error { +func onNewBlock(senders *sendersBatch, unwindTxs TxSlots, minedTxs []*TxSlot, protocolBaseFee, pendingBaseFee uint64, pending *PendingPool, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, discard func(*metaTx)) error { for i := range unwindTxs.txs { if unwindTxs.txs[i].senderID == 0 { return fmt.Errorf("onNewBlock.unwindTxs: senderID can't be zero") @@ -1009,10 +910,7 @@ func onNewBlock(tx kv.Tx, senders *sendersBatch, unwindTxs TxSlots, minedTxs []* // time (up to some "immutability threshold"). changedSenders := unsafeAddToPendingPool(byNonce, unwindTxs, pending, baseFee, queued, byHash, discard) for id := range changedSenders { - sender, err := senders.info(id, tx, false) - if err != nil { - return err - } + sender := senders.info(id, false) onSenderChange(id, sender, byNonce, protocolBaseFee, pendingBaseFee) } @@ -1507,7 +1405,6 @@ func (p *WorstQueue) Pop() interface{} { // promote/demote transactions // reorgs func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs chan Hashes, send *Send, newSlotsStreams *NewSlotsStreams, notifyMiningAboutNewSlots func()) { - //db.Update(ctx, func(tx kv.RwTx) error { return tx.ClearBucket(kv.PooledSender) }) if err := db.Update(ctx, func(tx kv.RwTx) error { return coreDB.View(ctx, func(coreTx kv.Tx) error { return p.fromDB(ctx, tx, coreTx) @@ -1515,9 +1412,7 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs }); err != nil { log.Error("[txpool] restore from db", "err", err) } - if err := db.View(ctx, func(tx kv.Tx) error { return p.logStats(tx) }); err != nil { - log.Error("[txpool] log stats", "err", err) - } + p.logStats() //if ASSERT { // go func() { // if err := p.forceCheckState(ctx, db, coreDB); err != nil { @@ -1543,11 +1438,9 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs case <-ctx.Done(): return case <-logEvery.C: - if err := db.View(ctx, func(tx kv.Tx) error { return p.logStats(tx) }); err != nil { - log.Error("[txpool] log stats", "err", err) - } + p.logStats() case <-processRemoteTxsEvery.C: - if err := db.View(ctx, func(tx kv.Tx) error { return p.processRemoteTxs(ctx, tx) }); err != nil { + if err := p.processRemoteTxs(ctx); err != nil { if s, ok := status.FromError(err); ok && retryLater(s.Code()) { continue } @@ -1559,14 +1452,13 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs case <-commitEvery.C: if db != nil { t := time.Now() - evicted, written, err := p.flush(db) + written, err := p.flush(db) if err != nil { log.Error("[txpool] flush is local history", "err", err) continue } writeToDbBytesCounter.Set(written) - sendersEvictedCounter.Set(evicted) - log.Info("[txpool] flush", "written_kb", written/1024, "evicted", evicted, "in", time.Since(t)) + log.Info("[txpool] Commit", "written_kb", written/1024, "in", time.Since(t)) } case h := <-newTxs: //TODO: maybe send TxSlots object instead of Hashes? notifyMiningAboutNewSlots() @@ -1620,6 +1512,7 @@ func coreProgress(coreTx kv.Tx) (uint64, error) { } //nolint +/* func (p *TxPool) forceCheckState(ctx context.Context, db, coreDB kv.RoDB) error { for { if err := db.View(ctx, func(tx kv.Tx) error { @@ -1655,6 +1548,7 @@ func (p *TxPool) forceCheckState(ctx context.Context, db, coreDB kv.RoDB) error } } } +*/ //nolint func copyBytes(b []byte) (copiedBytes []byte) { @@ -1663,13 +1557,13 @@ func copyBytes(b []byte) (copiedBytes []byte) { return } -func (p *TxPool) flush(db kv.RwDB) (evicted, written uint64, err error) { +func (p *TxPool) flush(db kv.RwDB) (written uint64, err error) { defer writeToDbTimer.UpdateDuration(time.Now()) p.lock.Lock() defer p.lock.Unlock() //it's important that write db tx is done inside lock, to make last writes visible for all read operations if err := db.Update(context.Background(), func(tx kv.RwTx) error { - evicted, err = p.flushLocked(tx) + err = p.flushLocked(tx) if err != nil { return err } @@ -1679,18 +1573,18 @@ func (p *TxPool) flush(db kv.RwDB) (evicted, written uint64, err error) { } return nil }); err != nil { - return 0, 0, err + return 0, err } - return evicted, written, nil + return written, nil } -func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) { +func (p *TxPool) flushLocked(tx kv.RwTx) (err error) { sendersWithoutTransactions := roaring64.New() for i := 0; i < len(p.deletedTxs); i++ { if p.byNonce.count(p.deletedTxs[i].Tx.senderID) == 0 { sendersWithoutTransactions.Add(p.deletedTxs[i].Tx.senderID) } if err := tx.Delete(kv.PoolTransaction, p.deletedTxs[i].Tx.idHash[:], nil); err != nil { - return evicted, err + return err } p.deletedTxs[i] = nil // for gc } @@ -1698,12 +1592,12 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) { txHashes := p.isLocalHashLRU.Keys() encID := make([]byte, 8) if err := tx.ClearBucket(kv.RecentLocalTransaction); err != nil { - return evicted, err + return err } for i := range txHashes { binary.BigEndian.PutUint64(encID, uint64(i)) if err := tx.Append(kv.RecentLocalTransaction, encID, txHashes[i].([]byte)); err != nil { - return evicted, err + return err } } @@ -1712,27 +1606,32 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) { if metaTx.Tx.rlp == nil { continue } - v = ensureEnoughSize(v, 8+len(metaTx.Tx.rlp)) - binary.BigEndian.PutUint64(v, metaTx.Tx.senderID) - copy(v[8:], metaTx.Tx.rlp) + v = ensureEnoughSize(v, 20+len(metaTx.Tx.rlp)) + for addr, id := range p.senders.senderIDs { // no inverted index - tradeoff flush speed for memory usage + if id == metaTx.Tx.senderID { + copy(v[:20], addr) + break + } + } + if err := tx.Put(kv.PoolTransaction, []byte(txHash), v); err != nil { - return evicted, err + return err } metaTx.Tx.rlp = nil } binary.BigEndian.PutUint64(encID, p.protocolBaseFee.Load()) if err := tx.Put(kv.PoolInfo, PoolProtocolBaseFeeKey, encID); err != nil { - return evicted, err + return err } binary.BigEndian.PutUint64(encID, p.currentBaseFee.Load()) if err := tx.Put(kv.PoolInfo, PoolPendingBaseFeeKey, encID); err != nil { - return evicted, err + return err } - evicted, err = p.senders.flush(tx, p.byNonce, sendersWithoutTransactions, p.cfg.EvictSendersAfterRounds) + err = p.senders.flush(tx) if err != nil { - return evicted, err + return err } // clean - in-memory data structure as later as possible - because if during this Tx will happen error, @@ -1740,154 +1639,20 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) { // failed write transaction must not create side-effects p.deletedTxs = p.deletedTxs[:0] p.discardReasons = p.discardReasons[:0] - return evicted, nil + return nil } -func (sc *sendersBatch) flush(tx kv.RwTx, byNonce *ByNonce, sendersWithoutTransactions *roaring64.Bitmap, evictAfterRounds uint64) (evicted uint64, err error) { - sc.commitID++ - - var justDeleted, justInserted []uint64 +func (sc *sendersBatch) flush(tx kv.RwTx) (err error) { encID := make([]byte, 8) - for addr, id := range sc.senderIDs { - binary.BigEndian.PutUint64(encID, id) - currentV, err := tx.GetOne(kv.PoolSenderID, []byte(addr)) - if err != nil { - return evicted, err - } - if currentV != nil && bytes.Equal(currentV, encID) { - continue - } - //fmt.Printf("Put: %d\n", id) - if err := tx.Put(kv.PoolSenderID, []byte(addr), encID); err != nil { - return evicted, err - } - if err := tx.Put(kv.PoolSenderIDToAdress, encID, []byte(addr)); err != nil { - return evicted, err - } - if ASSERT { - justInserted = append(justInserted, id) - } - if byNonce.count(id) == 0 { - sendersWithoutTransactions.Add(id) - } - } - - if ASSERT { - sort.Slice(justInserted, func(i, j int) bool { return justInserted[i] < justInserted[j] }) - } - - v := make([]byte, 8, 8+32) - for id, info := range sc.senderInfo { - //if info.nonce == 0 && info.balance.IsZero() { - // continue - //} - binary.BigEndian.PutUint64(encID, id) - binary.BigEndian.PutUint64(v, info.nonce) - v = append(v[:8], info.balance.Bytes()...) - enc, err := tx.GetOne(kv.PoolSender, encID) - if err != nil { - return evicted, err - } - if bytes.Equal(enc, v) { - continue - } - if err := tx.Put(kv.PoolSender, encID, v); err != nil { - return evicted, err - } - } - //fmt.Printf("justDeleted:%d, justInserted:%d\n", justDeleted, justInserted) - - binary.BigEndian.PutUint64(encID, sc.commitID) - // Eviction logic. Store into db list of senders: - // - which have discarded transactions at this commit - // - but have no active transactions left - // after some time read old records from DB and if such senders still have no transactions - evict them - if sendersWithoutTransactions.GetCardinality() > 0 { - sendersWithoutTransactions.RunOptimize() - b, err := sendersWithoutTransactions.MarshalBinary() - if err != nil { - return 0, err - } - if err := tx.Append(kv.PoolStateEviction, encID, b); err != nil { - return evicted, err - } - } - - c, err := tx.RwCursor(kv.PoolStateEviction) - if err != nil { - return evicted, err - } - defer c.Close() - for k, v, err := c.First(); k != nil; k, v, err = c.Next() { - if err != nil { - return evicted, err - } - if sc.commitID-binary.BigEndian.Uint64(k) < evictAfterRounds { - break - } - ids := roaring64.New() - if err := ids.UnmarshalBinary(v); err != nil { - return 0, err - } - for _, senderID := range ids.ToArray() { - if byNonce.count(senderID) > 0 { - continue - } - binary.BigEndian.PutUint64(encID, senderID) - addr, err := tx.GetOne(kv.PoolSenderIDToAdress, encID) - if err != nil { - return evicted, err - } - if addr == nil { - continue - } - if err := tx.Delete(kv.PoolSenderID, addr, nil); err != nil { - return evicted, err - } - if err := tx.Delete(kv.PoolSenderIDToAdress, encID, nil); err != nil { - return evicted, err - } - if err := tx.Delete(kv.PoolSender, encID, nil); err != nil { - return evicted, err - } - evicted++ - if ASSERT { - justDeleted = append(justDeleted, senderID) //nolint - } - } - if err := c.DeleteCurrent(); err != nil { - return evicted, err - } - } - - //fmt.Printf("justDeleted:%d, justInserted:%d\n", justDeleted, justInserted) - binary.BigEndian.PutUint64(encID, sc.blockHeight.Load()) if err := tx.Put(kv.PoolInfo, SenderCacheHeightKey, encID); err != nil { - return evicted, err + return err } if err := tx.Put(kv.PoolInfo, SenderCacheHashKey, []byte(sc.blockHash.Load())); err != nil { - return evicted, err - } - binary.BigEndian.PutUint64(encID, sc.senderID) - if err := tx.Put(kv.PoolInfo, SenderCacheIDKey, encID); err != nil { - return evicted, err - } - binary.BigEndian.PutUint64(encID, sc.commitID) - if err := tx.Put(kv.PoolInfo, SenderCommitIDKey, encID); err != nil { - return evicted, err - } - lastCommitTime, err := time.Now().MarshalBinary() - if err != nil { - return evicted, err - } - if err := tx.Put(kv.PoolInfo, SenderCommitTimeKey, lastCommitTime); err != nil { - return evicted, err + return err } - sc.senderIDs = map[string]uint64{} - sc.senderInfo = map[uint64]*sender{} - return evicted, nil + return nil } func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error { @@ -1908,27 +1673,29 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error { txs := TxSlots{} parseCtx := NewTxParseContext() parseCtx.WithSender(false) + i := 0 if err := tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { + addr, txRlp := v[:20], v[20:] txs.Resize(uint(i + 1)) txs.txs[i] = &TxSlot{} - _, err := parseCtx.ParseTransaction(v[8:], 0, txs.txs[i], nil) + _, err := parseCtx.ParseTransaction(txRlp, 0, txs.txs[i], nil) if err != nil { - return fmt.Errorf("err: %w, rlp: %x\n", err, v[8:]) + return fmt.Errorf("err: %w, rlp: %x\n", err, txRlp) } txs.txs[i].rlp = nil // means that we don't need store it in db anymore - txs.txs[i].senderID = binary.BigEndian.Uint64(v) + copy(txs.senders.At(i), addr) - senderAddr, err := tx.GetOne(kv.PoolSenderIDToAdress, v[:8]) - if err != nil { - return err + id, ok := p.senders.senderIDs[string(addr)] + if !ok { + p.senders.senderID++ + id = p.senders.senderID + p.senders.senderIDs[string(addr)] = id } - if len(senderAddr) == 0 { - panic("must not happen") - } - copy(txs.senders.At(i), senderAddr) - //bkock num = binary.BigEndian.Uint64(v[8:]) + txs.txs[i].senderID = id + binary.BigEndian.Uint64(v) + _, isLocalTx := p.isLocalHashLRU.Get(string(k)) txs.isLocal[i] = isLocalTx i++ @@ -1956,7 +1723,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error { currentBaseFee = binary.BigEndian.Uint64(v) } } - cacheMisses, err := p.senders.onNewTxs(tx, txs) + cacheMisses, err := p.senders.onNewTxs(txs) if err != nil { return err } @@ -1965,7 +1732,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error { return err } } - if err := onNewTxs(tx, p.senders, txs, protocolBaseFee, currentBaseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil { + if err := onNewTxs(p.senders, txs, protocolBaseFee, currentBaseFee, p.pending, p.baseFee, p.queued, p.byNonce, p.byHash, p.discardLocked); err != nil { return err } p.currentBaseFee.Store(currentBaseFee) @@ -1993,26 +1760,8 @@ func (sc *sendersBatch) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) er sc.blockHash.Store(string(v)) } } - { - v, err := tx.GetOne(kv.PoolInfo, SenderCacheIDKey) - if err != nil { - return err - } - if len(v) > 0 { - sc.senderID = binary.BigEndian.Uint64(v) - } - } - { - v, err := tx.GetOne(kv.PoolInfo, SenderCommitIDKey) - if err != nil { - return err - } - if len(v) > 0 { - sc.commitID = binary.BigEndian.Uint64(v) - } - } - if err := sc.syncMissedStateDiff(ctx, tx, coreTx, 0); err != nil { + if err := sc.syncMissedStateDiff(ctx, coreTx, 0); err != nil { return err } return nil @@ -2056,9 +1805,6 @@ func changesets(ctx context.Context, from uint64, coreTx kv.Tx) (map[string]send return diff, nil } -var SenderCommitTimeKey = []byte("sender_commit_time") -var SenderCacheIDKey = []byte("sender_cache_id") -var SenderCommitIDKey = []byte("sender_commit_id") var SenderCacheHeightKey = []byte("sender_cache_block_height") var SenderCacheHashKey = []byte("sender_cache_block_hash") var PoolPendingBaseFeeKey = []byte("pending_base_fee") diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index 74f99d3a2..857aaef60 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -295,7 +295,7 @@ func splitDataset(in TxSlots) (TxSlots, TxSlots, TxSlots, TxSlots) { return p1, p2, p3, p4 } -func FuzzOnNewBlocks12(f *testing.F) { +func FuzzOnNewBlocks(f *testing.F) { var u64 = [1 * 4]byte{1} var senderAddr = [1 + 1 + 1]byte{1} f.Add(u64[:], u64[:], u64[:], u64[:], senderAddr[:], 12) @@ -494,13 +494,7 @@ func FuzzOnNewBlocks12(f *testing.F) { } prevTotal = pending.Len() + baseFee.Len() + queued.Len() } - checkDB := func(tx kv.Tx) { - c1, _ := tx.Cursor(kv.PoolSenderID) - c2, _ := tx.Cursor(kv.PoolSenderIDToAdress) - count1, _ := c1.Count() - count2, _ := c2.Count() - assert.Equal(count1, count2) - } + //TODO: check that id=>addr and addr=>id mappings have same len //fmt.Printf("-------\n") tx, err := db.BeginRw(context.Background()) @@ -510,48 +504,47 @@ func FuzzOnNewBlocks12(f *testing.F) { // go to first fork //fmt.Printf("ll1: %d,%d,%d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len()) txs1, txs2, p2pReceived, txs3 := splitDataset(txs) - err = pool.OnNewBlock(tx, map[string]sender{}, txs1, TxSlots{}, currentBaseFee, 1, [32]byte{}) + err = pool.OnNewBlock(map[string]sender{}, txs1, TxSlots{}, currentBaseFee, 1, [32]byte{}) assert.NoError(err) check(txs1, TxSlots{}, "fork1") checkNotify(txs1, TxSlots{}, "fork1") _, _, _ = p2pReceived, txs2, txs3 - err = pool.OnNewBlock(tx, map[string]sender{}, TxSlots{}, txs2, currentBaseFee, 1, [32]byte{}) + err = pool.OnNewBlock(map[string]sender{}, TxSlots{}, txs2, currentBaseFee, 1, [32]byte{}) check(TxSlots{}, txs2, "fork1 mined") checkNotify(TxSlots{}, txs2, "fork1 mined") // unwind everything and switch to new fork (need unwind mined now) - err = pool.OnNewBlock(tx, map[string]sender{}, txs2, TxSlots{}, currentBaseFee, 2, [32]byte{}) + err = pool.OnNewBlock(map[string]sender{}, txs2, TxSlots{}, currentBaseFee, 2, [32]byte{}) assert.NoError(err) check(txs2, TxSlots{}, "fork2") checkNotify(txs2, TxSlots{}, "fork2") - err = pool.OnNewBlock(tx, map[string]sender{}, TxSlots{}, txs3, currentBaseFee, 2, [32]byte{}) + err = pool.OnNewBlock(map[string]sender{}, TxSlots{}, txs3, currentBaseFee, 2, [32]byte{}) assert.NoError(err) check(TxSlots{}, txs3, "fork2 mined") checkNotify(TxSlots{}, txs3, "fork2 mined") // add some remote txs from p2p pool.AddRemoteTxs(context.Background(), p2pReceived) - err = pool.processRemoteTxs(context.Background(), tx) + err = pool.processRemoteTxs(context.Background()) assert.NoError(err) check(p2pReceived, TxSlots{}, "p2pmsg1") checkNotify(p2pReceived, TxSlots{}, "p2pmsg1") - senderIdsBeforeFlush := len(pool.senders.senderIDs) - senderInfoBeforeFlush := len(pool.senders.senderInfo) + //senderIdsBeforeFlush := len(pool.senders.senderIDs) + //senderInfoBeforeFlush := len(pool.senders.senderInfo) - _, err = pool.flushLocked(tx) // we don't test eviction here, because dedicated test exists + err = pool.flushLocked(tx) // we don't test eviction here, because dedicated test exists require.NoError(err) check(p2pReceived, TxSlots{}, "after_flush") - checkDB(tx) //checkNotify(p2pReceived, TxSlots{}, "after_flush") p2, err := New(ch, nil, DefaultConfig) assert.NoError(err) + p2.senders = pool.senders // senders are not persisted err = p2.fromDB(context.Background(), tx, nil) require.NoError(err) - checkDB(tx) for _, txn := range p2.byHash { assert.Nil(txn.Tx.rlp) } @@ -559,15 +552,13 @@ func FuzzOnNewBlocks12(f *testing.F) { check(txs2, TxSlots{}, "fromDB") //checkNotify(txs2, TxSlots{}, "fromDB") - assert.Equal(pool.senders.senderID, p2.senders.senderID) - assert.Equal(pool.senders.blockHeight.Load(), p2.senders.blockHeight.Load()) + //assert.Equal(pool.senders.senderID, p2.senders.senderID) + //assert.Equal(pool.senders.blockHeight.Load(), p2.senders.blockHeight.Load()) - idsCountAfterFlush, idsCountInDbAfterFlush, err := p2.senders.idsCount(tx) - assert.NoError(err) - assert.LessOrEqual(senderIdsBeforeFlush, idsCountAfterFlush+idsCountInDbAfterFlush) - infoCountAfterFlush, infoCountInDbAfterFlush, err := p2.senders.infoCount(tx) - assert.NoError(err) - assert.LessOrEqual(senderInfoBeforeFlush, infoCountAfterFlush+infoCountInDbAfterFlush) + //idsCountAfterFlush := p2.senders.idsCount() + //assert.LessOrEqual(senderIdsBeforeFlush, idsCountAfterFlush) + //infoCountAfterFlush := p2.senders.infoCount() + //assert.LessOrEqual(senderInfoBeforeFlush, infoCountAfterFlush) assert.Equal(pool.pending.Len(), p2.pending.Len()) assert.Equal(pool.baseFee.Len(), p2.baseFee.Len()) diff --git a/txpool/pool_test.go b/txpool/pool_test.go index 384581fb1..831f3eb42 100644 --- a/txpool/pool_test.go +++ b/txpool/pool_test.go @@ -21,7 +21,6 @@ import ( "testing" "github.com/RoaringBitmap/roaring/roaring64" - "github.com/google/btree" "github.com/holiman/uint256" "github.com/ledgerwatch/erigon-lib/kv/memdb" "github.com/stretchr/testify/require" @@ -31,7 +30,6 @@ func TestSenders(t *testing.T) { t.Run("evict_all_on_next_round", func(t *testing.T) { senders, require := newSendersCache(), require.New(t) _, tx := memdb.NewTestPoolTx(t) - byNonce := &ByNonce{btree.New(16)} changed := roaring64.New() senders.senderIDs[fmt.Sprintf("%020x", 1)] = 1 @@ -40,20 +38,16 @@ func TestSenders(t *testing.T) { senders.senderInfo[2] = newSender(1, *uint256.NewInt(1)) changed.AddMany([]uint64{1, 2}) - evicted, err := senders.flush(tx, byNonce, changed, 1) + err := senders.flush(tx) require.NoError(err) - require.Zero(evicted) changed.Clear() - evicted, err = senders.flush(tx, byNonce, changed, 1) + err = senders.flush(tx) require.NoError(err) - - require.Equal(2, int(evicted)) }) t.Run("evict_even_if_used_in_current_round_but_no_txs", func(t *testing.T) { senders, require := newSendersCache(), require.New(t) _, tx := memdb.NewTestPoolTx(t) - byNonce := &ByNonce{btree.New(16)} senders.senderInfo[1] = newSender(1, *uint256.NewInt(1)) senders.senderIDs[fmt.Sprintf("%020x", 1)] = 1 @@ -62,17 +56,15 @@ func TestSenders(t *testing.T) { changed := roaring64.New() changed.AddMany([]uint64{1, 2}) - evicted, err := senders.flush(tx, byNonce, changed, 1) + err := senders.flush(tx) require.NoError(err) - require.Zero(evicted) senders.senderInfo[1] = newSender(1, *uint256.NewInt(1)) // means used in current round, but still has 0 transactions senders.senderIDs[fmt.Sprintf("%020x", 1)] = 1 changed.Clear() changed.AddMany([]uint64{1}) - evicted, err = senders.flush(tx, byNonce, changed, 1) + err = senders.flush(tx) require.NoError(err) - require.Equal(2, int(evicted)) }) } diff --git a/txpool/types_fuzz_test.go b/txpool/types_fuzz_test.go index d5952210f..fbe7191b3 100644 --- a/txpool/types_fuzz_test.go +++ b/txpool/types_fuzz_test.go @@ -20,7 +20,7 @@ import ( // log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) //} -func FuzzParseTx1(f *testing.F) { +func FuzzParseTx(f *testing.F) { f.Add([]byte{1}, 0) f.Fuzz(func(t *testing.T, in []byte, pos int) { t.Parallel()