From b178ce65e62b95faf61b59afd93d266f3b74c10b Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 5 Aug 2021 15:30:42 +0700 Subject: [PATCH 01/12] test: add real re-org and p2p txs --- txpool/types.go | 85 +++++++++++++++++++++++--------------------- txpool/types_test.go | 3 +- 2 files changed, 46 insertions(+), 42 deletions(-) diff --git a/txpool/types.go b/txpool/types.go index 1a863905d..e2c3b1fb5 100644 --- a/txpool/types.go +++ b/txpool/types.go @@ -101,11 +101,14 @@ const ParseTransactionErrorPrefix = "parse transaction payload" // ParseTransaction extracts all the information from the transactions's payload (RLP) necessary to build TxSlot // it also performs syntactic validation of the transactions -func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int) (slot *TxSlot, sender [20]byte, p int, err error) { +func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int, slot *TxSlot, sender []byte) (p int, err error) { if len(payload) == 0 { - return nil, sender, 0, fmt.Errorf("%s: empty rlp", ParseTransactionErrorPrefix) + return 0, fmt.Errorf("%s: empty rlp", ParseTransactionErrorPrefix) } - slot = &TxSlot{rlp: payload} + if len(sender) != 20 { + return 0, fmt.Errorf("%s: expect sender buffer of len 20", ParseTransactionErrorPrefix) + } + slot.rlp = payload // Compute transaction hash ctx.keccak1.Reset() ctx.keccak2.Reset() @@ -113,10 +116,10 @@ func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int) (slot *TxSl // therefore we assign the first returned value of Prefix function (list) to legacy variable dataPos, dataLen, legacy, err := rlp.Prefix(payload, pos) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: size Prefix: %v", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: size Prefix: %v", ParseTransactionErrorPrefix, err) } if dataPos+dataLen != len(payload) { - return nil, sender, 0, fmt.Errorf("%s: transaction must be either 1 list or 1 string", ParseTransactionErrorPrefix) + return 0, fmt.Errorf("%s: transaction must be either 1 list or 1 string", ParseTransactionErrorPrefix) } p = dataPos @@ -125,22 +128,22 @@ func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int) (slot *TxSl if !legacy { txType = int(payload[p]) if _, err = ctx.keccak1.Write(payload[p : p+1]); err != nil { - return nil, sender, 0, fmt.Errorf("%s: computing idHash (hashing type Prefix): %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: computing idHash (hashing type Prefix): %w", ParseTransactionErrorPrefix, err) } if _, err = ctx.keccak2.Write(payload[p : p+1]); err != nil { - return nil, sender, 0, fmt.Errorf("%s: computing signHash (hashing type Prefix): %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: computing signHash (hashing type Prefix): %w", ParseTransactionErrorPrefix, err) } p++ if p >= len(payload) { - return nil, sender, 0, fmt.Errorf("%s: unexpected end of payload after txType", ParseTransactionErrorPrefix) + return 0, fmt.Errorf("%s: unexpected end of payload after txType", ParseTransactionErrorPrefix) } dataPos, dataLen, err = rlp.List(payload, p) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: envelope Prefix: %v", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: envelope Prefix: %v", ParseTransactionErrorPrefix, err) } // Hash the envelope, not the full payload if _, err = ctx.keccak1.Write(payload[p : dataPos+dataLen]); err != nil { - return nil, sender, 0, fmt.Errorf("%s: computing idHash (hashing the envelope): %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: computing idHash (hashing the envelope): %w", ParseTransactionErrorPrefix, err) } p = dataPos } @@ -150,20 +153,20 @@ func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int) (slot *TxSl if !legacy { dataPos, dataLen, err = rlp.String(payload, p) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: chainId len: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: chainId len: %w", ParseTransactionErrorPrefix, err) } p = dataPos + dataLen } // Next follows the nonce, which we need to parse p, slot.nonce, err = rlp.U64(payload, p) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: nonce: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: nonce: %w", ParseTransactionErrorPrefix, err) } // Next follows gas price or tip // Although consensus rules specify that tip can be up to 256 bit long, we narrow it to 64 bit p, slot.tip, err = rlp.U64(payload, p) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: tip: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: tip: %w", ParseTransactionErrorPrefix, err) } // Next follows feeCap, but only for dynamic fee transactions, for legacy transaction, it is // equal to tip @@ -173,21 +176,21 @@ func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int) (slot *TxSl // Although consensus rules specify that feeCap can be up to 256 bit long, we narrow it to 64 bit p, slot.feeCap, err = rlp.U64(payload, p) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: feeCap: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: feeCap: %w", ParseTransactionErrorPrefix, err) } } // Next follows gas p, slot.gas, err = rlp.U64(payload, p) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: gas: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: gas: %w", ParseTransactionErrorPrefix, err) } // Next follows the destrination address (if present) dataPos, dataLen, err = rlp.String(payload, p) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: to len: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: to len: %w", ParseTransactionErrorPrefix, err) } if dataLen != 0 && dataLen != 20 { - return nil, sender, 0, fmt.Errorf("%s: unexpected length of to field: %d", ParseTransactionErrorPrefix, dataLen) + return 0, fmt.Errorf("%s: unexpected length of to field: %d", ParseTransactionErrorPrefix, dataLen) } // Only note if To field is empty or not slot.creation = dataLen == 0 @@ -195,12 +198,12 @@ func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int) (slot *TxSl // Next follows value p, err = rlp.U256(payload, p, &slot.value) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: value: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: value: %w", ParseTransactionErrorPrefix, err) } // Next goes data, but we are only interesting in its length dataPos, dataLen, err = rlp.String(payload, p) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: data len: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: data len: %w", ParseTransactionErrorPrefix, err) } slot.dataLen = dataLen p = dataPos + dataLen @@ -208,42 +211,42 @@ func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int) (slot *TxSl if !legacy { dataPos, dataLen, err = rlp.List(payload, p) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: access list len: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: access list len: %w", ParseTransactionErrorPrefix, err) } tuplePos := dataPos var tupleLen int for tuplePos < dataPos+dataLen { tuplePos, tupleLen, err = rlp.List(payload, tuplePos) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: tuple len: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: tuple len: %w", ParseTransactionErrorPrefix, err) } var addrPos int addrPos, err = rlp.StringOfLen(payload, tuplePos, 20) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: tuple addr len: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: tuple addr len: %w", ParseTransactionErrorPrefix, err) } slot.alAddrCount++ var storagePos, storageLen int storagePos, storageLen, err = rlp.List(payload, addrPos+20) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: storage key list len: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: storage key list len: %w", ParseTransactionErrorPrefix, err) } skeyPos := storagePos for skeyPos < storagePos+storageLen { skeyPos, err = rlp.StringOfLen(payload, skeyPos, 32) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: tuple storage key len: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: tuple storage key len: %w", ParseTransactionErrorPrefix, err) } slot.alStorCount++ skeyPos += 32 } if skeyPos != storagePos+storageLen { - return nil, sender, 0, fmt.Errorf("%s: extraneous space in the tuple after storage key list", ParseTransactionErrorPrefix) + return 0, fmt.Errorf("%s: extraneous space in the tuple after storage key list", ParseTransactionErrorPrefix) } tuplePos += tupleLen } if tuplePos != dataPos+dataLen { - return nil, sender, 0, fmt.Errorf("%s: extraneous space in the access list after all tuples", ParseTransactionErrorPrefix) + return 0, fmt.Errorf("%s: extraneous space in the access list after all tuples", ParseTransactionErrorPrefix) } p = dataPos + dataLen } @@ -256,7 +259,7 @@ func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int) (slot *TxSl if legacy { p, err = rlp.U256(payload, p, &ctx.chainId) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: V: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: V: %w", ParseTransactionErrorPrefix, err) } // Compute chainId from V if ctx.chainId.Eq(&ctx.n27) || ctx.chainId.Eq(&ctx.n28) { @@ -280,27 +283,27 @@ func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int) (slot *TxSl var v uint64 p, v, err = rlp.U64(payload, p) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: V: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: V: %w", ParseTransactionErrorPrefix, err) } if v > 1 { - return nil, sender, 0, fmt.Errorf("%s: V is loo large: %d", ParseTransactionErrorPrefix, v) + return 0, fmt.Errorf("%s: V is loo large: %d", ParseTransactionErrorPrefix, v) } vByte = byte(v) } // Next follows R of the signature p, err = rlp.U256(payload, p, &ctx.r) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: R: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: R: %w", ParseTransactionErrorPrefix, err) } // New follows S of the signature p, err = rlp.U256(payload, p, &ctx.s) if err != nil { - return nil, sender, 0, fmt.Errorf("%s: S: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: S: %w", ParseTransactionErrorPrefix, err) } // For legacy transactions, hash the full payload if legacy { if _, err = ctx.keccak1.Write(payload[pos:p]); err != nil { - return nil, sender, 0, fmt.Errorf("%s: computing idHash: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: computing idHash: %w", ParseTransactionErrorPrefix, err) } } //ctx.keccak1.Sum(slot.idHash[:0]) @@ -310,25 +313,25 @@ func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int) (slot *TxSl if sigHashLen < 56 { ctx.buf[0] = byte(sigHashLen) + 192 if _, err := ctx.keccak2.Write(ctx.buf[:1]); err != nil { - return nil, sender, 0, fmt.Errorf("%s: computing signHash (hashing len Prefix): %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: computing signHash (hashing len Prefix): %w", ParseTransactionErrorPrefix, err) } } else { beLen := (bits.Len(sigHashLen) + 7) / 8 binary.BigEndian.PutUint64(ctx.buf[1:], uint64(sigHashLen)) ctx.buf[8-beLen] = byte(beLen) + 247 if _, err := ctx.keccak2.Write(ctx.buf[8-beLen : 9]); err != nil { - return nil, sender, 0, fmt.Errorf("%s: computing signHash (hashing len Prefix): %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: computing signHash (hashing len Prefix): %w", ParseTransactionErrorPrefix, err) } } if _, err = ctx.keccak2.Write(payload[sigHashPos:sigHashEnd]); err != nil { - return nil, sender, 0, fmt.Errorf("%s: computing signHash: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: computing signHash: %w", ParseTransactionErrorPrefix, err) } if legacy { if chainIdLen > 0 { if chainIdBits <= 7 { ctx.buf[0] = byte(ctx.chainId.Uint64()) if _, err := ctx.keccak2.Write(ctx.buf[:1]); err != nil { - return nil, sender, 0, fmt.Errorf("%s: computing signHash (hashing legacy chainId): %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: computing signHash (hashing legacy chainId): %w", ParseTransactionErrorPrefix, err) } } else { binary.BigEndian.PutUint64(ctx.buf[1:9], ctx.chainId[3]) @@ -337,14 +340,14 @@ func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int) (slot *TxSl binary.BigEndian.PutUint64(ctx.buf[25:33], ctx.chainId[0]) ctx.buf[32-chainIdLen] = 128 + byte(chainIdLen) if _, err = ctx.keccak2.Write(ctx.buf[32-chainIdLen : 33]); err != nil { - return nil, sender, 0, fmt.Errorf("%s: computing signHash (hashing legacy chainId): %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: computing signHash (hashing legacy chainId): %w", ParseTransactionErrorPrefix, err) } } // Encode two zeros ctx.buf[0] = 128 ctx.buf[1] = 128 if _, err := ctx.keccak2.Write(ctx.buf[:2]); err != nil { - return nil, sender, 0, fmt.Errorf("%s: computing signHash (hashing zeros after legacy chainId): %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: computing signHash (hashing zeros after legacy chainId): %w", ParseTransactionErrorPrefix, err) } } } @@ -362,17 +365,17 @@ func (ctx *TxParseContext) ParseTransaction(payload []byte, pos int) (slot *TxSl ctx.sig[64] = vByte // recover sender if _, err = secp256k1.RecoverPubkeyWithContext(ctx.recCtx, ctx.sighash[:], ctx.sig[:], ctx.buf[:0]); err != nil { - return nil, sender, 0, fmt.Errorf("%s: recovering sender from signature: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: recovering sender from signature: %w", ParseTransactionErrorPrefix, err) } //apply keccak to the public key ctx.keccak2.Reset() if _, err = ctx.keccak2.Write(ctx.buf[1:65]); err != nil { - return nil, sender, 0, fmt.Errorf("%s: computing sender from public key: %w", ParseTransactionErrorPrefix, err) + return 0, fmt.Errorf("%s: computing sender from public key: %w", ParseTransactionErrorPrefix, err) } // squeeze the hash of the public key //ctx.keccak2.Sum(ctx.buf[:0]) _, _ = ctx.keccak2.(io.Reader).Read(ctx.buf[:32]) //take last 20 bytes as address copy(sender[:], ctx.buf[12:32]) - return slot, sender, p, nil + return p, nil } diff --git a/txpool/types_test.go b/txpool/types_test.go index 63a2c589f..7306eb87a 100644 --- a/txpool/types_test.go +++ b/txpool/types_test.go @@ -63,7 +63,8 @@ func TestParseTransactionRLP(t *testing.T) { require := require.New(t) var err error payload := decodeHex(tt.payloadStr) - tx, txSender, parseEnd, err := ctx.ParseTransaction(payload, 0) + tx, txSender := &TxSlot{}, [20]byte{} + parseEnd, err := ctx.ParseTransaction(payload, 0, tx, txSender[:]) require.NoError(err) require.Equal(len(payload), parseEnd) if tt.signHashStr != "" { From 7616ab3cde4313876cc9bb11a3a78aa9db2c4c5b Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 5 Aug 2021 16:35:29 +0700 Subject: [PATCH 02/12] addresses type --- txpool/pool.go | 4 ++-- txpool/types.go | 7 ++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 90ec2b64c..685945016 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -295,7 +295,7 @@ func (p *TxPool) OnNewBlock(unwindTxs, minedTxs TxSlots, protocolBaseFee, blockB } func setTxSenderID(senderIDs map[string]uint64, senderInfo map[uint64]*senderInfo, txs TxSlots) { for i := range txs.txs { - id, ok := senderIDs[string(txs.senders[i*20:(i+1)*20])] + id, ok := senderIDs[string(txs.senders.At(i))] if !ok { for i := range senderInfo { //TODO: create field for it? if id < i { @@ -303,7 +303,7 @@ func setTxSenderID(senderIDs map[string]uint64, senderInfo map[uint64]*senderInf } } id++ - senderIDs[string(txs.senders[i*20:(i+1)*20])] = id + senderIDs[string(txs.senders.At(i))] = id } txs.txs[i].senderID = id } diff --git a/txpool/types.go b/txpool/types.go index e2c3b1fb5..5ec45dcb4 100644 --- a/txpool/types.go +++ b/txpool/types.go @@ -37,6 +37,11 @@ type Hashes []byte // flatten list of 32-byte hashes func (h Hashes) At(i int) []byte { return h[i*32 : (i+1)*32] } func (h Hashes) Len() int { return len(h) / 32 } +type Addresses []byte // flatten list of 20-byte addresses + +func (h Addresses) At(i int) []byte { return h[i*20 : (i+1)*20] } +func (h Addresses) Len() int { return len(h) / 20 } + // TxContext is object that is required to parse transactions and turn transaction payload into TxSlot objects // usage of TxContext helps avoid extra memory allocations type TxParseContext struct { @@ -87,7 +92,7 @@ type TxSlot struct { type TxSlots struct { txs []*TxSlot - senders []byte // plain 20-byte addresses + senders Addresses isLocal []bool } From aef904971c0487d10fa3a1fbed44bb8dad7c540a Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 5 Aug 2021 16:45:58 +0700 Subject: [PATCH 03/12] validate TxSlots --- txpool/pool.go | 11 +++++++++++ txpool/pool_fuzz_test.go | 4 +++- txpool/types.go | 10 ++++++++++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/txpool/pool.go b/txpool/pool.go index 685945016..b96cd26f6 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -202,6 +202,10 @@ func (p *TxPool) OnNewPeer(peerID PeerID) { p.recentlyConnectedPeers.AddPeer(pee func (p *TxPool) OnNewTxs(newTxs TxSlots) error { p.lock.Lock() defer p.lock.Unlock() + if err := newTxs.Valid(); err != nil { + return err + } + protocolBaseFee, blockBaseFee := p.protocolBaseFee.Load(), p.blockBaseFee.Load() if protocolBaseFee == 0 || blockBaseFee == 0 { return fmt.Errorf("non-zero base fee") @@ -267,6 +271,13 @@ func onNewTxs(senderInfo map[uint64]*senderInfo, newTxs TxSlots, protocolBaseFee func (p *TxPool) OnNewBlock(unwindTxs, minedTxs TxSlots, protocolBaseFee, blockBaseFee uint64) error { p.lock.Lock() defer p.lock.Unlock() + if err := unwindTxs.Valid(); err != nil { + return err + } + if err := minedTxs.Valid(); err != nil { + return err + } + p.protocolBaseFee.Store(protocolBaseFee) p.blockBaseFee.Store(blockBaseFee) diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index fb1ebd5fe..425304bc6 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -217,6 +217,8 @@ func FuzzOnNewBlocks6(f *testing.F) { t.Skip() } assert := assert.New(t) + err := txs.Valid() + assert.NoError(err) ch := make(chan Hashes, 100) pool := New(ch) @@ -333,7 +335,7 @@ func FuzzOnNewBlocks6(f *testing.F) { // go to first fork unwindTxs, minedTxs1, p2pReceived, minedTxs2 := splitDataset(txs) - err := pool.OnNewBlock(unwindTxs, minedTxs1, protocolBaseFee, blockBaseFee) + err = pool.OnNewBlock(unwindTxs, minedTxs1, protocolBaseFee, blockBaseFee) assert.NoError(err) check(unwindTxs, minedTxs1) select { diff --git a/txpool/types.go b/txpool/types.go index 5ec45dcb4..cb4069b46 100644 --- a/txpool/types.go +++ b/txpool/types.go @@ -96,6 +96,16 @@ type TxSlots struct { isLocal []bool } +func (s TxSlots) Valid() error { + if len(s.txs) != len(s.isLocal) { + return fmt.Errorf("TxSlots: expect equal len of isLocal=%d and txs=%d", len(s.isLocal), len(s.txs)) + } + if len(s.txs) != s.senders.Len() { + return fmt.Errorf("TxSlots: expect equal len of senders=%d and txs=%d", s.senders.Len(), len(s.txs)) + } + return nil +} + const ( LegacyTxType int = 0 AccessListTxType int = 1 From 730cd96e9bf7891f3f7bb896cc2e7cc2eb6c66a2 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 5 Aug 2021 18:38:37 +0700 Subject: [PATCH 04/12] help fuzzer --- txpool/pool_fuzz_test.go | 135 +++++++++++++++++++++------------------ 1 file changed, 72 insertions(+), 63 deletions(-) diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index 425304bc6..3780cd829 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -4,7 +4,9 @@ package txpool import ( + "bytes" "encoding/binary" + "fmt" "testing" "github.com/google/btree" @@ -83,7 +85,7 @@ func FuzzTwoQueue(f *testing.F) { } func u64Slice(in []byte) ([]uint64, bool) { - if len(in)%8 != 0 { + if len(in) < 8 { return nil, false } res := make([]uint64, len(in)/8) @@ -93,7 +95,7 @@ func u64Slice(in []byte) ([]uint64, bool) { return res, true } func u256Slice(in []byte) ([]uint256.Int, bool) { - if len(in)%32 != 0 { + if len(in) < 32 { return nil, false } res := make([]uint256.Int, len(in)/32) @@ -103,32 +105,26 @@ func u256Slice(in []byte) ([]uint256.Int, bool) { return res, true } -func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawSender, rawSenderNonce, rawSenderBalance []byte) (sendersInfo map[uint64]*senderInfo, senderIDs map[string]uint64, txs TxSlots, ok bool) { - if len(rawTxNonce) < 8 || len(rawValues) < 32 || len(rawTips) < 8 || len(rawSender) < 20 || len(rawSenderNonce) < 8 || len(rawSenderBalance) < 32 { - return nil, nil, txs, false - } - if len(rawTxNonce) != len(rawTips) { - return nil, nil, txs, false - } - if len(rawTxNonce)*32/8 != len(rawValues) { - return nil, nil, txs, false - } - if len(rawSenderNonce)*20/8 != len(rawSender) { - return nil, nil, txs, false - } - if len(rawSenderNonce)*32/8 != len(rawSenderBalance) { - return nil, nil, txs, false - } - senderNonce, ok := u64Slice(rawSenderNonce) - if !ok { - return nil, nil, txs, false - } - for i := 0; i < len(senderNonce); i++ { - if senderNonce[i] == 0 { - return nil, nil, txs, false +func parseSenders(rawSenders []byte) (senders Addresses, nonces []uint64, balances []uint256.Int) { + for i := 0; i < len(rawSenders)-(20+8+32-1); i += 20 + 8 + 32 { + senders = append(senders, rawSenders[i:i+20]...) + nonce := binary.BigEndian.Uint64(rawSenders[i+20:]) + if nonce == 0 { + nonce = 1 } + nonces = append(nonces, nonce) + balance := uint256.NewInt(0) + balance.SetBytes(rawSenders[i+20+8 : i+20+8+32]) + balances = append(balances, *balance) } + return +} +func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawSender []byte) (sendersInfo map[uint64]*senderInfo, senderIDs map[string]uint64, txs TxSlots, ok bool) { + if len(rawTxNonce) < 8 || len(rawValues) < 32 || len(rawTips) < 8 || len(rawSender) < 20+8+32 { + return nil, nil, txs, false + } + senders, senderNonce, senderBalance := parseSenders(rawSender) txNonce, ok := u64Slice(rawTxNonce) if !ok { return nil, nil, txs, false @@ -141,27 +137,21 @@ func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawSender, rawSenderNonc if !ok { return nil, nil, txs, false } - senderBalance, ok := u256Slice(rawSenderBalance) - if !ok { - return nil, nil, txs, false - } sendersInfo = map[uint64]*senderInfo{} senderIDs = map[string]uint64{} for i := 0; i < len(senderNonce); i++ { senderID := uint64(i + 1) //non-zero expected - sendersInfo[senderID] = newSenderInfo(senderNonce[i], senderBalance[i]) - senderIDs[string(rawSender[i*20:(i+1)*20])] = senderID + sendersInfo[senderID] = newSenderInfo(senderNonce[i], senderBalance[i%len(senderBalance)]) + senderIDs[string(senders.At(i%senders.Len()))] = senderID } - sendersAmount := len(sendersInfo) for i := range txNonce { txs.txs = append(txs.txs, &TxSlot{ nonce: txNonce[i], - value: values[i], - tip: tips[i], + value: values[i%len(values)], + tip: tips[i%len(tips)], }) - senderN := i % sendersAmount - txs.senders = append(txs.senders, rawSender[senderN*20:(senderN+1)*20]...) + txs.senders = append(txs.senders, senders.At(i%senders.Len())...) txs.isLocal = append(txs.isLocal, false) } @@ -197,25 +187,26 @@ func splitDataset(in TxSlots) (TxSlots, TxSlots, TxSlots, TxSlots) { return p1, p2, p3, p4 } -func FuzzOnNewBlocks6(f *testing.F) { +func FuzzOnNewBlocks7(f *testing.F) { var u64 = [8 * 4]byte{1} - var u256 = [32 * 4]byte{1} - f.Add(u64[:], u64[:], u64[:], u64[:], u256[:], u256[:], 123, 456) - f.Add(u64[:], u64[:], u64[:], u64[:], u256[:], u256[:], 78, 100) - f.Add(u64[:], u64[:], u64[:], u64[:], u256[:], u256[:], 100_000, 101_000) - f.Fuzz(func(t *testing.T, txNonce, values, tips, sender, senderNonce, senderBalance []byte, protocolBaseFee, blockBaseFee uint64) { + var sender = [20 + 8 + 32]byte{1} + f.Add(u64[:], u64[:], u64[:], sender[:], 123, 456) + f.Add(u64[:], u64[:], u64[:], sender[:], 78, 100) + f.Add(u64[:], u64[:], u64[:], sender[:], 100_000, 101_000) + f.Fuzz(func(t *testing.T, txNonce, values, tips, sender []byte, protocolBaseFee, blockBaseFee uint64) { t.Parallel() if protocolBaseFee == 0 || blockBaseFee == 0 { t.Skip() } - if len(txNonce)%(8*4) != 0 || len(txNonce) != len(tips) { + if len(sender) < 20+8+32 { t.Skip() } - senders, senderIDs, txs, ok := poolsFromFuzzBytes(txNonce, values, tips, sender, senderNonce, senderBalance) + senders, senderIDs, txs, ok := poolsFromFuzzBytes(txNonce, values, tips, sender) if !ok { t.Skip() } + assert := assert.New(t) err := txs.Valid() assert.NoError(err) @@ -226,6 +217,9 @@ func FuzzOnNewBlocks6(f *testing.F) { pool.senderIDs = senderIDs check := func(unwindTxs, minedTxs TxSlots) { pending, baseFee, queued := pool.pending, pool.baseFee, pool.queued + if pending.Len() > 0 || baseFee.Len() > 0 || queued.Len() > 0 { + fmt.Printf("len: %d,%d,%d\n", pending.Len(), baseFee.Len(), queued.Len()) + } best, worst := pending.Best(), pending.Worst() assert.LessOrEqual(pending.Len(), PendingSubPoolLimit) @@ -333,39 +327,54 @@ func FuzzOnNewBlocks6(f *testing.F) { } } + checkNotify := func(unwindTxs, minedTxs TxSlots) { + select { + case newHashes := <-ch: + //assert.Equal(len(unwindTxs.txs), newHashes.Len()) + assert.Greater(len(newHashes), 0) + for i := 0; i < newHashes.Len(); i++ { + foundInUnwind := false + foundInMined := false + newHash := newHashes.At(i) + for j := range unwindTxs.txs { + if bytes.Equal(unwindTxs.txs[j].idHash[:], newHash) { + foundInUnwind = true + break + } + } + for j := range minedTxs.txs { + if bytes.Equal(unwindTxs.txs[j].idHash[:], newHash) { + foundInMined = true + break + } + } + assert.True(foundInUnwind) + assert.False(foundInMined) + } + default: + + //TODO: no notifications - means pools must be empty (unchanged) + } + } + // go to first fork unwindTxs, minedTxs1, p2pReceived, minedTxs2 := splitDataset(txs) err = pool.OnNewBlock(unwindTxs, minedTxs1, protocolBaseFee, blockBaseFee) assert.NoError(err) check(unwindTxs, minedTxs1) - select { - case newHashes := <-ch: - assert.Greater(len(newHashes), 0) - //TODO: all notified hashes must be in given list - default: - //TODO: no notifications - means pools must be empty (unchanged) - } - //assert.Equal(len(unwindTxs.txs), newHashes.Len()) + checkNotify(unwindTxs, minedTxs1) // unwind everything and switch to new fork (need unwind mined now) err = pool.OnNewBlock(minedTxs1, minedTxs2, protocolBaseFee, blockBaseFee) assert.NoError(err) check(minedTxs1, minedTxs2) - select { - case newHashes := <-ch: - assert.Greater(len(newHashes), 0) - default: - } + checkNotify(minedTxs1, minedTxs2) // add some remote txs from p2p err = pool.OnNewTxs(p2pReceived) assert.NoError(err) - check(TxSlots{}, p2pReceived) - select { - case newHashes := <-ch: - assert.Greater(len(newHashes), 0) - default: - } + check(p2pReceived, TxSlots{}) + checkNotify(p2pReceived, TxSlots{}) }) } From 98587462250bf6bc0c71b0c88f32aa9764db56d5 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 5 Aug 2021 19:41:18 +0700 Subject: [PATCH 05/12] 111 --- txpool/pool.go | 4 +-- txpool/pool_fuzz_test.go | 67 ++++++++++++++++++++++++++++++---------- 2 files changed, 52 insertions(+), 19 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index b96cd26f6..1cc2f8358 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -372,8 +372,8 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs [ queued.EnforceInvariants() promote(pending, baseFee, queued, func(i *MetaTx) { - //fmt.Printf("del1 nonce: %d, %t\n", i.Tx.senderID, senderInfo[i.Tx.senderID].nonce < i.Tx.nonce) - //fmt.Printf("del2 balance: %x,%x,%x\n", i.Tx.value, i.Tx.tip, senderInfo[i.Tx.senderID].balance) + fmt.Printf("del1 nonce: %d, %d,%d\n", i.Tx.senderID, senderInfo[i.Tx.senderID].nonce, i.Tx.nonce) + fmt.Printf("del2 balance: %d,%d,%d\n", i.Tx.value.Uint64(), i.Tx.tip, senderInfo[i.Tx.senderID].balance.Uint64()) delete(byHash, string(i.Tx.idHash[:])) senderInfo[i.Tx.senderID].txNonce2Tx.Delete(&nonce2TxItem{i}) if i.SubPool&IsLocal != 0 { diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index 3780cd829..9b80ae918 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -94,42 +94,75 @@ func u64Slice(in []byte) ([]uint64, bool) { } return res, true } -func u256Slice(in []byte) ([]uint256.Int, bool) { - if len(in) < 32 { +func u8Slice(in []byte) ([]uint64, bool) { + if len(in) < 1 { return nil, false } - res := make([]uint256.Int, len(in)/32) + res := make([]uint64, len(in)) for i := 0; i < len(res); i++ { - res[i].SetBytes(in[i*32 : (i+1)*32]) + res[i] = uint64(in[i]) + } + return res, true +} +func u16Slice(in []byte) ([]uint64, bool) { + if len(in) < 2 { + return nil, false + } + res := make([]uint64, len(in)/2) + for i := 0; i < len(res); i++ { + res[i] = uint64(binary.BigEndian.Uint16(in[i*2:])) + } + return res, true +} +func u256Slice(in []byte) ([]uint256.Int, bool) { + if len(in) < 1 { + return nil, false + } + res := make([]uint256.Int, len(in)) + for i := 0; i < len(res); i++ { + res[i].SetUint64(uint64(in[i])) } return res, true } -func parseSenders(rawSenders []byte) (senders Addresses, nonces []uint64, balances []uint256.Int) { - for i := 0; i < len(rawSenders)-(20+8+32-1); i += 20 + 8 + 32 { - senders = append(senders, rawSenders[i:i+20]...) - nonce := binary.BigEndian.Uint64(rawSenders[i+20:]) +func parseSenders(in []byte) (senders Addresses, nonces []uint64, balances []uint256.Int) { + zeroes := [19]byte{} + for i := 0; i < len(in)-(1+1+1-1); i += 1 + 1 + 1 { + senders = append(senders, zeroes[:]...) + senders = append(senders, in[i:i+1]...) + nonce := uint64(in[i+1]) if nonce == 0 { nonce = 1 } nonces = append(nonces, nonce) - balance := uint256.NewInt(0) - balance.SetBytes(rawSenders[i+20+8 : i+20+8+32]) - balances = append(balances, *balance) + balances = append(balances, *uint256.NewInt(uint64(in[i+1+1]))) + } + return +} + +func parseTxs(in []byte) (nonces, tips []uint64, values []uint256.Int) { + for i := 0; i < len(in)-(1+1+1-1); i += 1 + 1 + 1 { + nonce := uint64(in[i]) + if nonce == 0 { + nonce = 1 + } + nonces = append(nonces, nonce) + tips = append(tips, uint64(in[i+1])) + values = append(values, *uint256.NewInt(uint64(in[i+1+1]))) } return } func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawSender []byte) (sendersInfo map[uint64]*senderInfo, senderIDs map[string]uint64, txs TxSlots, ok bool) { - if len(rawTxNonce) < 8 || len(rawValues) < 32 || len(rawTips) < 8 || len(rawSender) < 20+8+32 { + if len(rawTxNonce) < 1 || len(rawValues) < 1 || len(rawTips) < 1 || len(rawSender) < 1+1+1 { return nil, nil, txs, false } senders, senderNonce, senderBalance := parseSenders(rawSender) - txNonce, ok := u64Slice(rawTxNonce) + txNonce, ok := u8Slice(rawTxNonce) if !ok { return nil, nil, txs, false } - tips, ok := u64Slice(rawTips) + tips, ok := u8Slice(rawTips) if !ok { return nil, nil, txs, false } @@ -188,8 +221,8 @@ func splitDataset(in TxSlots) (TxSlots, TxSlots, TxSlots, TxSlots) { } func FuzzOnNewBlocks7(f *testing.F) { - var u64 = [8 * 4]byte{1} - var sender = [20 + 8 + 32]byte{1} + var u64 = [1 * 4]byte{1} + var sender = [1 + 1 + 1]byte{1} f.Add(u64[:], u64[:], u64[:], sender[:], 123, 456) f.Add(u64[:], u64[:], u64[:], sender[:], 78, 100) f.Add(u64[:], u64[:], u64[:], sender[:], 100_000, 101_000) @@ -198,7 +231,7 @@ func FuzzOnNewBlocks7(f *testing.F) { if protocolBaseFee == 0 || blockBaseFee == 0 { t.Skip() } - if len(sender) < 20+8+32 { + if len(sender) < 1+1+1 { t.Skip() } From 3eb8a99b598f0db878348d4543592216504910f8 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 5 Aug 2021 19:42:02 +0700 Subject: [PATCH 06/12] 111 --- txpool/pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 1cc2f8358..ce21a7b66 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -372,8 +372,8 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs [ queued.EnforceInvariants() promote(pending, baseFee, queued, func(i *MetaTx) { - fmt.Printf("del1 nonce: %d, %d,%d\n", i.Tx.senderID, senderInfo[i.Tx.senderID].nonce, i.Tx.nonce) - fmt.Printf("del2 balance: %d,%d,%d\n", i.Tx.value.Uint64(), i.Tx.tip, senderInfo[i.Tx.senderID].balance.Uint64()) + //fmt.Printf("del1 nonce: %d, %d,%d\n", i.Tx.senderID, senderInfo[i.Tx.senderID].nonce, i.Tx.nonce) + //fmt.Printf("del2 balance: %d,%d,%d\n", i.Tx.value.Uint64(), i.Tx.tip, senderInfo[i.Tx.senderID].balance.Uint64()) delete(byHash, string(i.Tx.idHash[:])) senderInfo[i.Tx.senderID].txNonce2Tx.Delete(&nonce2TxItem{i}) if i.SubPool&IsLocal != 0 { From ee64806c90b779b62b41b2cdcbc224a989fa2b23 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 5 Aug 2021 20:39:08 +0700 Subject: [PATCH 07/12] finally tests catch something --- txpool/pool.go | 8 +++--- txpool/pool_fuzz_test.go | 59 ++++++++++++++++++++++++---------------- 2 files changed, 40 insertions(+), 27 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index ce21a7b66..146cd44cd 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -465,7 +465,7 @@ func onSenderChange(sender *senderInfo, protocolBaseFee, blockBaseFee uint64) { // this transaction will never be included into this particular chain. it.MetaTx.SubPool &^= EnoughFeeCapProtocol if it.MetaTx.Tx.feeCap >= protocolBaseFee { - it.MetaTx.SubPool &= EnoughFeeCapProtocol + it.MetaTx.SubPool |= EnoughFeeCapProtocol } // 2. Absence of nonce gaps. Set to 1 for transactions whose nonce is N, state nonce for @@ -473,7 +473,7 @@ func onSenderChange(sender *senderInfo, protocolBaseFee, blockBaseFee uint64) { // sender. Set to 0 is the transaction's nonce is divided from the state nonce by one or more nonce gaps. it.MetaTx.SubPool &^= NoNonceGaps if prevNonce == -1 || uint64(prevNonce)+1 == it.MetaTx.Tx.nonce { - it.MetaTx.SubPool &= NoNonceGaps + it.MetaTx.SubPool |= NoNonceGaps } prevNonce = int(it.Tx.nonce) @@ -485,7 +485,7 @@ func onSenderChange(sender *senderInfo, protocolBaseFee, blockBaseFee uint64) { // transactions will be able to pay for gas. it.MetaTx.SubPool &^= EnoughBalance if sender.balance.Gt(accumulatedSenderSpent) || sender.balance.Eq(accumulatedSenderSpent) { - it.MetaTx.SubPool &= EnoughBalance + it.MetaTx.SubPool |= EnoughBalance } accumulatedSenderSpent.Add(accumulatedSenderSpent, needBalance) // already deleted all transactions with nonce <= sender.nonce @@ -493,7 +493,7 @@ func onSenderChange(sender *senderInfo, protocolBaseFee, blockBaseFee uint64) { // baseFee of the currently pending block. Set to 0 otherwise. it.MetaTx.SubPool &^= EnoughFeeCapBlock if it.MetaTx.Tx.feeCap >= blockBaseFee { - it.MetaTx.SubPool &= EnoughFeeCapBlock + it.MetaTx.SubPool |= EnoughFeeCapBlock } // 5. Local transaction. Set to 1 if transaction is local. diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index 9b80ae918..fcb666101 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -100,7 +100,7 @@ func u8Slice(in []byte) ([]uint64, bool) { } res := make([]uint64, len(in)) for i := 0; i < len(res); i++ { - res[i] = uint64(in[i]) + res[i] = uint64(in[i] % 32) } return res, true } @@ -120,17 +120,17 @@ func u256Slice(in []byte) ([]uint256.Int, bool) { } res := make([]uint256.Int, len(in)) for i := 0; i < len(res); i++ { - res[i].SetUint64(uint64(in[i])) + res[i].SetUint64(uint64(in[i] % 32)) } return res, true } func parseSenders(in []byte) (senders Addresses, nonces []uint64, balances []uint256.Int) { - zeroes := [19]byte{} + zeroes := [20]byte{} for i := 0; i < len(in)-(1+1+1-1); i += 1 + 1 + 1 { + zeroes[19] = in[i] % 8 senders = append(senders, zeroes[:]...) - senders = append(senders, in[i:i+1]...) - nonce := uint64(in[i+1]) + nonce := uint64(in[i+1] % 8) if nonce == 0 { nonce = 1 } @@ -153,8 +153,8 @@ func parseTxs(in []byte) (nonces, tips []uint64, values []uint256.Int) { return } -func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawSender []byte) (sendersInfo map[uint64]*senderInfo, senderIDs map[string]uint64, txs TxSlots, ok bool) { - if len(rawTxNonce) < 1 || len(rawValues) < 1 || len(rawTips) < 1 || len(rawSender) < 1+1+1 { +func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawFeeCap, rawSender []byte) (sendersInfo map[uint64]*senderInfo, senderIDs map[string]uint64, txs TxSlots, ok bool) { + if len(rawTxNonce) < 1 || len(rawValues) < 1 || len(rawTips) < 1 || len(rawFeeCap) < 1 || len(rawSender) < 1+1+1 { return nil, nil, txs, false } senders, senderNonce, senderBalance := parseSenders(rawSender) @@ -162,6 +162,10 @@ func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawSender []byte) (sende if !ok { return nil, nil, txs, false } + feeCap, ok := u8Slice(rawFeeCap) + if !ok { + return nil, nil, txs, false + } tips, ok := u8Slice(rawTips) if !ok { return nil, nil, txs, false @@ -180,9 +184,10 @@ func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawSender []byte) (sende } for i := range txNonce { txs.txs = append(txs.txs, &TxSlot{ - nonce: txNonce[i], - value: values[i%len(values)], - tip: tips[i%len(tips)], + nonce: txNonce[i], + value: values[i%len(values)], + tip: tips[i%len(tips)], + feeCap: feeCap[i%len(feeCap)], }) txs.senders = append(txs.senders, senders.At(i%senders.Len())...) txs.isLocal = append(txs.isLocal, false) @@ -220,14 +225,19 @@ func splitDataset(in TxSlots) (TxSlots, TxSlots, TxSlots, TxSlots) { return p1, p2, p3, p4 } -func FuzzOnNewBlocks7(f *testing.F) { +func FuzzOnNewBlocks10(f *testing.F) { var u64 = [1 * 4]byte{1} var sender = [1 + 1 + 1]byte{1} - f.Add(u64[:], u64[:], u64[:], sender[:], 123, 456) - f.Add(u64[:], u64[:], u64[:], sender[:], 78, 100) - f.Add(u64[:], u64[:], u64[:], sender[:], 100_000, 101_000) - f.Fuzz(func(t *testing.T, txNonce, values, tips, sender []byte, protocolBaseFee, blockBaseFee uint64) { + f.Add(u64[:], u64[:], u64[:], u64[:], sender[:], 1, 2) + f.Add(u64[:], u64[:], u64[:], u64[:], sender[:], 3, 4) + f.Add(u64[:], u64[:], u64[:], u64[:], sender[:], 10, 12) + f.Fuzz(func(t *testing.T, txNonce, values, tips, feeCap, sender []byte, protocolBaseFee1, blockBaseFee1 uint8) { t.Parallel() + if protocolBaseFee1 > 4 || blockBaseFee1 > 4 { + t.Skip() + } + protocolBaseFee, blockBaseFee := uint64(protocolBaseFee1), uint64(blockBaseFee1) + protocolBaseFeeU256, blockBaseFeeU256 := uint256.NewInt(protocolBaseFee), uint256.NewInt(blockBaseFee) if protocolBaseFee == 0 || blockBaseFee == 0 { t.Skip() } @@ -235,7 +245,7 @@ func FuzzOnNewBlocks7(f *testing.F) { t.Skip() } - senders, senderIDs, txs, ok := poolsFromFuzzBytes(txNonce, values, tips, sender) + senders, senderIDs, txs, ok := poolsFromFuzzBytes(txNonce, values, tips, feeCap, sender) if !ok { t.Skip() } @@ -270,8 +280,9 @@ func FuzzOnNewBlocks7(f *testing.F) { need := uint256.NewInt(i.gas) need = need.Mul(need, uint256.NewInt(i.feeCap)) - assert.GreaterOrEqual(uint256.NewInt(protocolBaseFee), need.Add(need, &i.value)) - assert.GreaterOrEqual(uint256.NewInt(blockBaseFee), need.Add(need, &i.value)) + need = need.Add(need, &i.value) + assert.True(need.Lt(protocolBaseFeeU256) || need.Eq(protocolBaseFeeU256)) + assert.True(need.Lt(blockBaseFeeU256) || need.Eq(blockBaseFeeU256)) // side data structures must have all txs assert.True(senders[i.senderID].txNonce2Tx.Has(&nonce2TxItem{tx})) @@ -300,14 +311,15 @@ func FuzzOnNewBlocks7(f *testing.F) { iterateSubPoolUnordered(baseFee, func(tx *MetaTx) { i := tx.Tx assert.GreaterOrEqual(i.nonce, senders[i.senderID].nonce) - if tx.SubPool&EnoughBalance > 0 { + if tx.SubPool&EnoughBalance != 0 { assert.True(tx.SenderHasEnoughBalance) } need := uint256.NewInt(i.gas) need = need.Mul(need, uint256.NewInt(i.feeCap)) - assert.GreaterOrEqual(uint256.NewInt(protocolBaseFee), need.Add(need, &i.value)) - assert.GreaterOrEqual(uint256.NewInt(blockBaseFee), need.Add(need, &i.value)) + need = need.Add(need, &i.value) + assert.True(need.Lt(protocolBaseFeeU256) || need.Eq(protocolBaseFeeU256)) + assert.True(need.Lt(blockBaseFeeU256) || need.Eq(blockBaseFeeU256)) assert.True(senders[i.senderID].txNonce2Tx.Has(&nonce2TxItem{tx})) _, ok = pool.byHash[string(i.idHash[:])] @@ -330,8 +342,9 @@ func FuzzOnNewBlocks7(f *testing.F) { need := uint256.NewInt(i.gas) need = need.Mul(need, uint256.NewInt(i.feeCap)) - assert.GreaterOrEqual(uint256.NewInt(protocolBaseFee), need.Add(need, &i.value)) - assert.GreaterOrEqual(uint256.NewInt(blockBaseFee), need.Add(need, &i.value)) + need = need.Add(need, &i.value) + assert.True(need.Lt(protocolBaseFeeU256) || need.Eq(protocolBaseFeeU256)) + assert.True(need.Lt(blockBaseFeeU256) || need.Eq(blockBaseFeeU256)) assert.True(senders[i.senderID].txNonce2Tx.Has(&nonce2TxItem{tx})) _, ok = pool.byHash[string(i.idHash[:])] From e965c0981d24cf20151b5e7165166ae8bbc67744 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Thu, 5 Aug 2021 22:48:56 +0700 Subject: [PATCH 08/12] fixes --- txpool/pool.go | 5 +++-- txpool/pool_fuzz_test.go | 8 +++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 146cd44cd..78bb33a2f 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -358,7 +358,7 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs [ //TODO: also check if sender is in list of local-senders i.SubPool |= IsLocal } - delete(byHash, string(i.Tx.idHash[:])) + byHash[string(i.Tx.idHash[:])] = i }) } @@ -483,11 +483,12 @@ func onSenderChange(sender *senderInfo, protocolBaseFee, blockBaseFee uint64) { // nonces N+1 ... M is no more than B. Set to 0 otherwise. In other words, this bit is // set if there is currently a guarantee that the transaction and all its required prior // transactions will be able to pay for gas. + accumulatedSenderSpent = accumulatedSenderSpent.Add(accumulatedSenderSpent, needBalance) // already deleted all transactions with nonce <= sender.nonce it.MetaTx.SubPool &^= EnoughBalance if sender.balance.Gt(accumulatedSenderSpent) || sender.balance.Eq(accumulatedSenderSpent) { it.MetaTx.SubPool |= EnoughBalance + fmt.Printf("a3: %b,%d,%d,%d,%t\n", it.MetaTx.SubPool, accumulatedSenderSpent.Uint64(), sender.balance.Uint64(), needBalance.Uint64(), it.MetaTx.SenderHasEnoughBalance) } - accumulatedSenderSpent.Add(accumulatedSenderSpent, needBalance) // already deleted all transactions with nonce <= sender.nonce // 4. Dynamic fee requirement. Set to 1 if feeCap of the transaction is no less than // baseFee of the currently pending block. Set to 0 otherwise. diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index fcb666101..7359e5aed 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -12,6 +12,7 @@ import ( "github.com/google/btree" "github.com/holiman/uint256" "github.com/stretchr/testify/assert" + "go.uber.org/atomic" ) // https://blog.golang.org/fuzz-beta @@ -153,6 +154,8 @@ func parseTxs(in []byte) (nonces, tips []uint64, values []uint256.Int) { return } +var txId atomic.Uint64 + func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawFeeCap, rawSender []byte) (sendersInfo map[uint64]*senderInfo, senderIDs map[string]uint64, txs TxSlots, ok bool) { if len(rawTxNonce) < 1 || len(rawValues) < 1 || len(rawTips) < 1 || len(rawFeeCap) < 1 || len(rawSender) < 1+1+1 { return nil, nil, txs, false @@ -183,6 +186,7 @@ func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawFeeCap, rawSender []b senderIDs[string(senders.At(i%senders.Len()))] = senderID } for i := range txNonce { + txId.Inc() txs.txs = append(txs.txs, &TxSlot{ nonce: txNonce[i], value: values[i%len(values)], @@ -191,6 +195,7 @@ func poolsFromFuzzBytes(rawTxNonce, rawValues, rawTips, rawFeeCap, rawSender []b }) txs.senders = append(txs.senders, senders.At(i%senders.Len())...) txs.isLocal = append(txs.isLocal, false) + binary.BigEndian.PutUint64(txs.txs[i].idHash[:], txId.Load()) } return sendersInfo, senderIDs, txs, true @@ -312,6 +317,7 @@ func FuzzOnNewBlocks10(f *testing.F) { i := tx.Tx assert.GreaterOrEqual(i.nonce, senders[i.senderID].nonce) if tx.SubPool&EnoughBalance != 0 { + assert.True(tx.SenderHasEnoughBalance) } @@ -389,7 +395,7 @@ func FuzzOnNewBlocks10(f *testing.F) { } } for j := range minedTxs.txs { - if bytes.Equal(unwindTxs.txs[j].idHash[:], newHash) { + if bytes.Equal(minedTxs.txs[j].idHash[:], newHash) { foundInMined = true break } From 922e21e8a1d8c543deb564eb28f7bf3309d55859 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 6 Aug 2021 10:36:44 +0700 Subject: [PATCH 09/12] fixes for many bugs --- txpool/pool.go | 34 +++++------ txpool/pool_fuzz_test.go | 126 ++++++++++++++++++++------------------- 2 files changed, 82 insertions(+), 78 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 78bb33a2f..f8e479c4d 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -245,7 +245,7 @@ func onNewTxs(senderInfo map[uint64]*senderInfo, newTxs TxSlots, protocolBaseFee //TODO: also check if sender is in list of local-senders i.SubPool |= IsLocal } - delete(byHash, string(i.Tx.idHash[:])) + byHash[string(i.Tx.idHash[:])] = i }) for i := range senderInfo { @@ -350,17 +350,14 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs [ // they effective lose their priority over the "remote" transactions. In order to prevent that, // somehow the fact that certain transactions were local, needs to be remembered for some // time (up to some "immutability threshold"). - if len(unwindTxs.txs) > 0 { - //TODO: restore isLocal flag in unwindTxs - unsafeAddToPool(senderInfo, unwindTxs, pending, func(i *MetaTx) { - //fmt.Printf("add: %d,%d\n", i.Tx.senderID, i.Tx.nonce) - if _, ok := localsHistory.Get(i.Tx.idHash); ok { - //TODO: also check if sender is in list of local-senders - i.SubPool |= IsLocal - } - byHash[string(i.Tx.idHash[:])] = i - }) - } + unsafeAddToPool(senderInfo, unwindTxs, pending, func(i *MetaTx) { + //fmt.Printf("add: %d,%d\n", i.Tx.senderID, i.Tx.nonce) + if _, ok := localsHistory.Get(i.Tx.idHash); ok { + //TODO: also check if sender is in list of local-senders + i.SubPool |= IsLocal + } + byHash[string(i.Tx.idHash[:])] = i + }) for i := range senderInfo { // TODO: aggregate changed senders before call this func @@ -442,6 +439,9 @@ func unsafeAddToPool(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, to *S continue } } + //if sender.nonce > tx.nonce { + // continue + //} beforeAdd(mt) sender.txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{mt}) to.UnsafeAdd(mt, PendingSubPool) @@ -449,7 +449,7 @@ func unsafeAddToPool(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, to *S } func onSenderChange(sender *senderInfo, protocolBaseFee, blockBaseFee uint64) { - prevNonce := -1 + prevNonce := sender.nonce accumulatedSenderSpent := uint256.NewInt(0) sender.txNonce2Tx.Ascend(func(i btree.Item) bool { it := i.(*nonce2TxItem) @@ -459,12 +459,13 @@ func onSenderChange(sender *senderInfo, protocolBaseFee, blockBaseFee uint64) { needBalance.Mul(uint256.NewInt(it.MetaTx.Tx.gas), uint256.NewInt(it.MetaTx.Tx.feeCap)) needBalance.Add(&it.MetaTx.NeedBalance, &it.MetaTx.Tx.value) it.MetaTx.SenderHasEnoughBalance = sender.balance.Gt(needBalance) || sender.balance.Eq(needBalance) - // 1. Minimum fee requirement. Set to 1 if feeCap of the transaction is no less than in-protocol // parameter of minimal base fee. Set to 0 if feeCap is less than minimum base fee, which means // this transaction will never be included into this particular chain. it.MetaTx.SubPool &^= EnoughFeeCapProtocol if it.MetaTx.Tx.feeCap >= protocolBaseFee { + //fmt.Printf("alex1: %d,%d,%d,%d\n", it.MetaTx.NeedBalance.Uint64(), it.MetaTx.Tx.gas, it.MetaTx.Tx.feeCap, it.MetaTx.Tx.value.Uint64()) + //fmt.Printf("alex2: %d,%t\n", sender.balance.Uint64(), it.MetaTx.SenderHasEnoughBalance) it.MetaTx.SubPool |= EnoughFeeCapProtocol } @@ -472,10 +473,10 @@ func onSenderChange(sender *senderInfo, protocolBaseFee, blockBaseFee uint64) { // the sender is M, and there are transactions for all nonces between M and N from the same // sender. Set to 0 is the transaction's nonce is divided from the state nonce by one or more nonce gaps. it.MetaTx.SubPool &^= NoNonceGaps - if prevNonce == -1 || uint64(prevNonce)+1 == it.MetaTx.Tx.nonce { + if uint64(prevNonce)+1 == it.MetaTx.Tx.nonce { it.MetaTx.SubPool |= NoNonceGaps + prevNonce = it.Tx.nonce } - prevNonce = int(it.Tx.nonce) // 3. Sufficient balance for gas. Set to 1 if the balance of sender's account in the // state is B, nonce of the sender in the state is M, nonce of the transaction is N, and the @@ -487,7 +488,6 @@ func onSenderChange(sender *senderInfo, protocolBaseFee, blockBaseFee uint64) { it.MetaTx.SubPool &^= EnoughBalance if sender.balance.Gt(accumulatedSenderSpent) || sender.balance.Eq(accumulatedSenderSpent) { it.MetaTx.SubPool |= EnoughBalance - fmt.Printf("a3: %b,%d,%d,%d,%t\n", it.MetaTx.SubPool, accumulatedSenderSpent.Uint64(), sender.balance.Uint64(), needBalance.Uint64(), it.MetaTx.SenderHasEnoughBalance) } // 4. Dynamic fee requirement. Set to 1 if feeCap of the transaction is no less than diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index 7359e5aed..0993c0a79 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -6,7 +6,6 @@ package txpool import ( "bytes" "encoding/binary" - "fmt" "testing" "github.com/google/btree" @@ -242,7 +241,7 @@ func FuzzOnNewBlocks10(f *testing.F) { t.Skip() } protocolBaseFee, blockBaseFee := uint64(protocolBaseFee1), uint64(blockBaseFee1) - protocolBaseFeeU256, blockBaseFeeU256 := uint256.NewInt(protocolBaseFee), uint256.NewInt(blockBaseFee) + //protocolBaseFeeU256, blockBaseFeeU256 := uint256.NewInt(protocolBaseFee), uint256.NewInt(blockBaseFee) if protocolBaseFee == 0 || blockBaseFee == 0 { t.Skip() } @@ -263,111 +262,116 @@ func FuzzOnNewBlocks10(f *testing.F) { pool := New(ch) pool.senderInfo = senders pool.senderIDs = senderIDs - check := func(unwindTxs, minedTxs TxSlots) { + check := func(unwindTxs, minedTxs TxSlots, msg string) { pending, baseFee, queued := pool.pending, pool.baseFee, pool.queued - if pending.Len() > 0 || baseFee.Len() > 0 || queued.Len() > 0 { - fmt.Printf("len: %d,%d,%d\n", pending.Len(), baseFee.Len(), queued.Len()) - } + //if pending.Len() > 0 || baseFee.Len() > 0 || queued.Len() > 0 { + // fmt.Printf("len: %d,%d,%d\n", pending.Len(), baseFee.Len(), queued.Len()) + //} best, worst := pending.Best(), pending.Worst() assert.LessOrEqual(pending.Len(), PendingSubPoolLimit) - assert.False(worst != nil && best == nil) - assert.False(worst == nil && best != nil) + assert.False(worst != nil && best == nil, msg) + assert.False(worst == nil && best != nil, msg) if worst != nil && worst.SubPool < 0b11110 { t.Fatalf("pending worst too small %b", worst.SubPool) } iterateSubPoolUnordered(pending, func(tx *MetaTx) { i := tx.Tx - assert.GreaterOrEqual(i.nonce, senders[i.senderID].nonce) + if tx.SubPool&NoNonceGaps > 0 { + assert.GreaterOrEqual(i.nonce, senders[i.senderID].nonce, msg) + } if tx.SubPool&EnoughBalance > 0 { assert.True(tx.SenderHasEnoughBalance) } - - need := uint256.NewInt(i.gas) - need = need.Mul(need, uint256.NewInt(i.feeCap)) - need = need.Add(need, &i.value) - assert.True(need.Lt(protocolBaseFeeU256) || need.Eq(protocolBaseFeeU256)) - assert.True(need.Lt(blockBaseFeeU256) || need.Eq(blockBaseFeeU256)) + if tx.SubPool&EnoughFeeCapProtocol > 0 { + assert.LessOrEqual(protocolBaseFee, tx.Tx.feeCap, msg) + } + if tx.SubPool&EnoughFeeCapBlock > 0 { + assert.LessOrEqual(blockBaseFee, tx.Tx.feeCap, msg) + } // side data structures must have all txs - assert.True(senders[i.senderID].txNonce2Tx.Has(&nonce2TxItem{tx})) + assert.True(senders[i.senderID].txNonce2Tx.Has(&nonce2TxItem{tx}), msg) _, ok = pool.byHash[string(i.idHash[:])] assert.True(ok) // pools can't have more then 1 tx with same SenderID+Nonce + iterateSubPoolUnordered(baseFee, func(mtx2 *MetaTx) { + tx2 := mtx2.Tx + assert.False(tx2.senderID == i.senderID && tx2.nonce == i.nonce, msg) + }) iterateSubPoolUnordered(queued, func(mtx2 *MetaTx) { tx2 := mtx2.Tx - assert.False(tx2.senderID == i.senderID && tx2.nonce == i.nonce) - }) - iterateSubPoolUnordered(pending, func(mtx2 *MetaTx) { - tx2 := mtx2.Tx - assert.False(tx2.senderID == i.senderID && tx2.nonce == i.nonce) + assert.False(tx2.senderID == i.senderID && tx2.nonce == i.nonce, msg) }) }) best, worst = baseFee.Best(), baseFee.Worst() - assert.False(worst != nil && best == nil) - assert.False(worst == nil && best != nil) - assert.LessOrEqual(baseFee.Len(), BaseFeeSubPoolLimit) + assert.False(worst != nil && best == nil, msg) + assert.False(worst == nil && best != nil, msg) + assert.LessOrEqual(baseFee.Len(), BaseFeeSubPoolLimit, msg) if worst != nil && worst.SubPool < 0b11100 { t.Fatalf("baseFee worst too small %b", worst.SubPool) } iterateSubPoolUnordered(baseFee, func(tx *MetaTx) { i := tx.Tx - assert.GreaterOrEqual(i.nonce, senders[i.senderID].nonce) + if tx.SubPool&NoNonceGaps > 0 { + assert.GreaterOrEqual(i.nonce, senders[i.senderID].nonce, msg) + } if tx.SubPool&EnoughBalance != 0 { - - assert.True(tx.SenderHasEnoughBalance) + assert.True(tx.SenderHasEnoughBalance, msg) + } + if tx.SubPool&EnoughFeeCapProtocol > 0 { + assert.LessOrEqual(protocolBaseFee, tx.Tx.feeCap, msg) + } + if tx.SubPool&EnoughFeeCapBlock > 0 { + assert.LessOrEqual(blockBaseFee, tx.Tx.feeCap, msg) } - need := uint256.NewInt(i.gas) - need = need.Mul(need, uint256.NewInt(i.feeCap)) - need = need.Add(need, &i.value) - assert.True(need.Lt(protocolBaseFeeU256) || need.Eq(protocolBaseFeeU256)) - assert.True(need.Lt(blockBaseFeeU256) || need.Eq(blockBaseFeeU256)) - - assert.True(senders[i.senderID].txNonce2Tx.Has(&nonce2TxItem{tx})) + assert.True(senders[i.senderID].txNonce2Tx.Has(&nonce2TxItem{tx}), msg) _, ok = pool.byHash[string(i.idHash[:])] - assert.True(ok) + assert.True(ok, msg) }) best, worst = queued.Best(), queued.Worst() assert.LessOrEqual(queued.Len(), QueuedSubPoolLimit) - assert.False(worst != nil && best == nil) - assert.False(worst == nil && best != nil) + assert.False(worst != nil && best == nil, msg) + assert.False(worst == nil && best != nil, msg) if worst != nil && worst.SubPool < 0b10000 { t.Fatalf("queued worst too small %b", worst.SubPool) } iterateSubPoolUnordered(queued, func(tx *MetaTx) { i := tx.Tx - assert.GreaterOrEqual(i.nonce, senders[i.senderID].nonce) + if tx.SubPool&NoNonceGaps > 0 { + assert.GreaterOrEqual(i.nonce, senders[i.senderID].nonce, msg) + } if tx.SubPool&EnoughBalance > 0 { - assert.True(tx.SenderHasEnoughBalance) + assert.True(tx.SenderHasEnoughBalance, msg) + } + if tx.SubPool&EnoughFeeCapProtocol > 0 { + assert.LessOrEqual(protocolBaseFee, tx.Tx.feeCap, msg) + } + if tx.SubPool&EnoughFeeCapBlock > 0 { + assert.LessOrEqual(blockBaseFee, tx.Tx.feeCap, msg) } - need := uint256.NewInt(i.gas) - need = need.Mul(need, uint256.NewInt(i.feeCap)) - need = need.Add(need, &i.value) - assert.True(need.Lt(protocolBaseFeeU256) || need.Eq(protocolBaseFeeU256)) - assert.True(need.Lt(blockBaseFeeU256) || need.Eq(blockBaseFeeU256)) - - assert.True(senders[i.senderID].txNonce2Tx.Has(&nonce2TxItem{tx})) + assert.True(senders[i.senderID].txNonce2Tx.Has(&nonce2TxItem{tx}), msg) _, ok = pool.byHash[string(i.idHash[:])] - assert.True(ok) + assert.True(ok, msg) }) // all txs in side data structures must be in some queue for _, txn := range pool.byHash { - assert.True(txn.bestIndex >= 0) - assert.True(txn.worstIndex >= 0) + assert.True(txn.bestIndex >= 0, msg) + assert.True(txn.worstIndex >= 0, msg) } for i := range senders { //assert.True(senders[i].txNonce2Tx.Len() > 0) senders[i].txNonce2Tx.Ascend(func(i btree.Item) bool { mt := i.(*nonce2TxItem).MetaTx - assert.True(mt.worstIndex >= 0) - assert.True(mt.bestIndex >= 0) + assert.True(mt.worstIndex >= 0, msg) + assert.True(mt.bestIndex >= 0, msg) return true }) } @@ -375,11 +379,11 @@ func FuzzOnNewBlocks10(f *testing.F) { // mined txs must be removed for i := range minedTxs.txs { _, ok = pool.byHash[string(minedTxs.txs[i].idHash[:])] - assert.False(ok) + assert.False(ok, msg) } } - checkNotify := func(unwindTxs, minedTxs TxSlots) { + checkNotify := func(unwindTxs, minedTxs TxSlots, msg string) { select { case newHashes := <-ch: //assert.Equal(len(unwindTxs.txs), newHashes.Len()) @@ -400,8 +404,8 @@ func FuzzOnNewBlocks10(f *testing.F) { break } } - assert.True(foundInUnwind) - assert.False(foundInMined) + assert.True(foundInUnwind, msg) + assert.False(foundInMined, msg) } default: @@ -413,20 +417,20 @@ func FuzzOnNewBlocks10(f *testing.F) { unwindTxs, minedTxs1, p2pReceived, minedTxs2 := splitDataset(txs) err = pool.OnNewBlock(unwindTxs, minedTxs1, protocolBaseFee, blockBaseFee) assert.NoError(err) - check(unwindTxs, minedTxs1) - checkNotify(unwindTxs, minedTxs1) + check(unwindTxs, minedTxs1, "fork1") + checkNotify(unwindTxs, minedTxs1, "fork1") // unwind everything and switch to new fork (need unwind mined now) err = pool.OnNewBlock(minedTxs1, minedTxs2, protocolBaseFee, blockBaseFee) assert.NoError(err) - check(minedTxs1, minedTxs2) - checkNotify(minedTxs1, minedTxs2) + check(minedTxs1, minedTxs2, "fork2") + checkNotify(minedTxs1, minedTxs2, "fork2") // add some remote txs from p2p err = pool.OnNewTxs(p2pReceived) assert.NoError(err) - check(p2pReceived, TxSlots{}) - checkNotify(p2pReceived, TxSlots{}) + check(p2pReceived, TxSlots{}, "p2pmsg1") + checkNotify(p2pReceived, TxSlots{}, "p2pmsg1") }) } From caf413008925199092a5dcc511f57983985a49d0 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 6 Aug 2021 11:06:03 +0700 Subject: [PATCH 10/12] del from pools on replace-by-nonce case --- txpool/pool.go | 48 +++++++++++++++++++++++++++++++++++----- txpool/pool_fuzz_test.go | 2 +- 2 files changed, 43 insertions(+), 7 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index f8e479c4d..6ce635411 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -246,6 +246,21 @@ func onNewTxs(senderInfo map[uint64]*senderInfo, newTxs TxSlots, protocolBaseFee i.SubPool |= IsLocal } byHash[string(i.Tx.idHash[:])] = i + replaced := senderInfo[i.Tx.senderID].txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{i}) + if replaced != nil { + replacedMT := replaced.(*nonce2TxItem).MetaTx + delete(byHash, string(replacedMT.Tx.idHash[:])) + switch replacedMT.currentSubPool { + case PendingSubPool: + pending.UnsafeRemove(replacedMT) + case BaseFeeSubPool: + baseFee.UnsafeRemove(replacedMT) + case QueuedSubPool: + queued.UnsafeRemove(replacedMT) + default: + //already removed + } + } }) for i := range senderInfo { @@ -357,6 +372,21 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs [ i.SubPool |= IsLocal } byHash[string(i.Tx.idHash[:])] = i + replaced := senderInfo[i.Tx.senderID].txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{i}) + if replaced != nil { + replacedMT := replaced.(*nonce2TxItem).MetaTx + delete(byHash, string(replacedMT.Tx.idHash[:])) + switch replacedMT.currentSubPool { + case PendingSubPool: + pending.UnsafeRemove(replacedMT) + case BaseFeeSubPool: + baseFee.UnsafeRemove(replacedMT) + case QueuedSubPool: + queued.UnsafeRemove(replacedMT) + default: + //already removed + } + } }) for i := range senderInfo { @@ -371,6 +401,7 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs [ promote(pending, baseFee, queued, func(i *MetaTx) { //fmt.Printf("del1 nonce: %d, %d,%d\n", i.Tx.senderID, senderInfo[i.Tx.senderID].nonce, i.Tx.nonce) //fmt.Printf("del2 balance: %d,%d,%d\n", i.Tx.value.Uint64(), i.Tx.tip, senderInfo[i.Tx.senderID].balance.Uint64()) + fmt.Printf("del: %d, %x\n", i.Tx.nonce, i.Tx.idHash) delete(byHash, string(i.Tx.idHash[:])) senderInfo[i.Tx.senderID].txNonce2Tx.Delete(&nonce2TxItem{i}) if i.SubPool&IsLocal != 0 { @@ -439,11 +470,7 @@ func unsafeAddToPool(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, to *S continue } } - //if sender.nonce > tx.nonce { - // continue - //} beforeAdd(mt) - sender.txNonce2Tx.ReplaceOrInsert(&nonce2TxItem{mt}) to.UnsafeAdd(mt, PendingSubPool) } } @@ -633,13 +660,22 @@ func (p *SubPool) Add(i *MetaTx, subPoolType SubPoolType) { // UnsafeRemove - does break Heap invariants, but it has O(1) instead of O(log(n)) complexity. // Must manually call heap.Init after such changes. // Make sense to batch unsafe changes -func (p *SubPool) UnsafeRemove(i *MetaTx) *MetaTx { +func (p *SubPool) UnsafeRemove(i *MetaTx) { + if p.Len() == 0 { + return + } + if p.Len() == 1 && i.bestIndex == 0 { + p.worst.Pop() + p.best.Pop() + return + } + fmt.Printf("remove: %d,%d\n", p.Len(), i.bestIndex) // manually call funcs instead of heap.Pop p.worst.Swap(i.worstIndex, p.worst.Len()-1) p.worst.Pop() p.best.Swap(i.bestIndex, p.best.Len()-1) p.best.Pop() - return i + return } func (p *SubPool) UnsafeAdd(i *MetaTx, subPoolType SubPoolType) { i.currentSubPool = subPoolType diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index 0993c0a79..cd114f2ff 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -356,7 +356,7 @@ func FuzzOnNewBlocks10(f *testing.F) { assert.LessOrEqual(blockBaseFee, tx.Tx.feeCap, msg) } - assert.True(senders[i.senderID].txNonce2Tx.Has(&nonce2TxItem{tx}), msg) + assert.True(senders[i.senderID].txNonce2Tx.Has(&nonce2TxItem{tx}), "%s, %d, %x", msg, tx.Tx.nonce, tx.Tx.idHash) _, ok = pool.byHash[string(i.idHash[:])] assert.True(ok, msg) }) From db8cbe62445e4179d6934705956b10b450cc3a81 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 6 Aug 2021 11:42:41 +0700 Subject: [PATCH 11/12] more fixes --- txpool/pool.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 6ce635411..301d46ae0 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -240,7 +240,7 @@ func onNewTxs(senderInfo map[uint64]*senderInfo, newTxs TxSlots, protocolBaseFee } } - unsafeAddToPool(senderInfo, newTxs, queued, func(i *MetaTx) { + unsafeAddToPool(senderInfo, newTxs, queued, QueuedSubPool, func(i *MetaTx) { if _, ok := localsHistory.Get(i.Tx.idHash); ok { //TODO: also check if sender is in list of local-senders i.SubPool |= IsLocal @@ -365,7 +365,7 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs [ // they effective lose their priority over the "remote" transactions. In order to prevent that, // somehow the fact that certain transactions were local, needs to be remembered for some // time (up to some "immutability threshold"). - unsafeAddToPool(senderInfo, unwindTxs, pending, func(i *MetaTx) { + unsafeAddToPool(senderInfo, unwindTxs, pending, PendingSubPool, func(i *MetaTx) { //fmt.Printf("add: %d,%d\n", i.Tx.senderID, i.Tx.nonce) if _, ok := localsHistory.Get(i.Tx.idHash); ok { //TODO: also check if sender is in list of local-senders @@ -401,7 +401,6 @@ func onNewBlock(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, minedTxs [ promote(pending, baseFee, queued, func(i *MetaTx) { //fmt.Printf("del1 nonce: %d, %d,%d\n", i.Tx.senderID, senderInfo[i.Tx.senderID].nonce, i.Tx.nonce) //fmt.Printf("del2 balance: %d,%d,%d\n", i.Tx.value.Uint64(), i.Tx.tip, senderInfo[i.Tx.senderID].balance.Uint64()) - fmt.Printf("del: %d, %x\n", i.Tx.nonce, i.Tx.idHash) delete(byHash, string(i.Tx.idHash[:])) senderInfo[i.Tx.senderID].txNonce2Tx.Delete(&nonce2TxItem{i}) if i.SubPool&IsLocal != 0 { @@ -456,7 +455,7 @@ func removeMined(senderInfo map[uint64]*senderInfo, minedTxs []*TxSlot, pending, } // unwind -func unsafeAddToPool(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, to *SubPool, beforeAdd func(tx *MetaTx)) { +func unsafeAddToPool(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, to *SubPool, subPoolType SubPoolType, beforeAdd func(tx *MetaTx)) { for i, tx := range unwindTxs.txs { sender, ok := senderInfo[tx.senderID] if !ok { @@ -471,7 +470,7 @@ func unsafeAddToPool(senderInfo map[uint64]*senderInfo, unwindTxs TxSlots, to *S } } beforeAdd(mt) - to.UnsafeAdd(mt, PendingSubPool) + to.UnsafeAdd(mt, subPoolType) } } @@ -669,7 +668,6 @@ func (p *SubPool) UnsafeRemove(i *MetaTx) { p.best.Pop() return } - fmt.Printf("remove: %d,%d\n", p.Len(), i.bestIndex) // manually call funcs instead of heap.Pop p.worst.Swap(i.worstIndex, p.worst.Len()-1) p.worst.Pop() From 030b518cebbc6720c33ec800e5d6de2b249cb35f Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Fri, 6 Aug 2021 11:44:40 +0700 Subject: [PATCH 12/12] more fixes --- txpool/pool.go | 1 - txpool/pool_fuzz_test.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 301d46ae0..727e6317c 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -673,7 +673,6 @@ func (p *SubPool) UnsafeRemove(i *MetaTx) { p.worst.Pop() p.best.Swap(i.bestIndex, p.best.Len()-1) p.best.Pop() - return } func (p *SubPool) UnsafeAdd(i *MetaTx, subPoolType SubPoolType) { i.currentSubPool = subPoolType diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index cd114f2ff..a28adaaa1 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -264,7 +264,7 @@ func FuzzOnNewBlocks10(f *testing.F) { pool.senderIDs = senderIDs check := func(unwindTxs, minedTxs TxSlots, msg string) { pending, baseFee, queued := pool.pending, pool.baseFee, pool.queued - //if pending.Len() > 0 || baseFee.Len() > 0 || queued.Len() > 0 { + //if pending.Len() > 10 && baseFee.Len() > 10 && queued.Len() > 10 { // fmt.Printf("len: %d,%d,%d\n", pending.Len(), baseFee.Len(), queued.Len()) //}