From 917bdc029c445d1bfdf01cc231b908a61044ca85 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Mon, 30 Aug 2021 11:44:53 +0700 Subject: [PATCH] clean --- txpool/pool.go | 1232 ++++++++++++++++++++++++------------------------ 1 file changed, 615 insertions(+), 617 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 1b75025d0..a97a838b8 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -392,52 +392,6 @@ func (sc *SendersCache) setTxSenderID(tx kv.Tx, txs TxSlots) (map[uint64]string, } return toLoad, nil } -func (sc *SendersCache) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error { - sc.lock.Lock() - defer sc.lock.Unlock() - - { - v, err := tx.GetOne(kv.PoolInfo, SenderCacheHeightKey) - if err != nil { - return err - } - if len(v) > 0 { - sc.blockHeight.Store(binary.BigEndian.Uint64(v)) - } - } - { - v, err := tx.GetOne(kv.PoolInfo, SenderCacheHashKey) - if err != nil { - return err - } - if len(v) > 0 { - sc.blockHash.Store(string(v)) - } - } - { - v, err := tx.GetOne(kv.PoolInfo, SenderCacheIDKey) - if err != nil { - return err - } - if len(v) > 0 { - sc.senderID = binary.BigEndian.Uint64(v) - } - } - { - v, err := tx.GetOne(kv.PoolInfo, SenderCommitIDKey) - if err != nil { - return err - } - if len(v) > 0 { - sc.commitID = binary.BigEndian.Uint64(v) - } - } - - if err := sc.syncMissedStateDiff(ctx, tx, coreTx, 0); err != nil { - return err - } - return nil -} func (sc *SendersCache) syncMissedStateDiff(ctx context.Context, tx kv.RwTx, coreTx kv.Tx, missedTo uint64) error { dropLocalSendersCache := false if missedTo > 0 && missedTo-sc.blockHeight.Load() > 1024 { @@ -492,270 +446,6 @@ func (sc *SendersCache) syncMissedStateDiff(ctx context.Context, tx kv.RwTx, cor } return nil } -func isCanonical(coreTx kv.Tx, num uint64, hash []byte) (bool, error) { - encNum := make([]byte, 8) - binary.BigEndian.PutUint64(encNum, num) - canonical, err := coreTx.GetOne(kv.HeaderCanonical, encNum) - if err != nil { - return false, err - } - - return bytes.Equal(hash, canonical), nil -} - -func changesets(ctx context.Context, from uint64, coreTx kv.Tx) (map[string]senderInfo, error) { - encNum := make([]byte, 8) - diff := map[string]senderInfo{} - binary.BigEndian.PutUint64(encNum, from) - logEvery := time.NewTicker(30 * time.Second) - defer logEvery.Stop() - //TODO: tx.ForEach must be implemented as buffered server-side stream - if err := coreTx.ForEach(kv.AccountChangeSet, encNum, func(k, v []byte) error { - info, err := loadSender(coreTx, v[:20]) - if err != nil { - return err - } - diff[string(v[:20])] = *info - select { - case <-logEvery.C: - log.Info("loading changesets", "block", binary.BigEndian.Uint64(k)) - case <-ctx.Done(): - return nil - default: - } - return nil - }); err != nil { - return nil, err - } - return diff, nil -} - -var SenderCommitTimeKey = []byte("sender_commit_time") -var SenderCacheIDKey = []byte("sender_cache_id") -var SenderCommitIDKey = []byte("sender_commit_id") -var SenderCacheHeightKey = []byte("sender_cache_block_height") -var SenderCacheHashKey = []byte("sender_cache_block_hash") -var PoolPendingBaseFeeKey = []byte("pending_base_fee") -var PoolProtocolBaseFeeKey = []byte("protocol_base_fee") - -func (sc *SendersCache) flush(tx kv.RwTx, byNonce *ByNonce, sendersWithoutTransactions *roaring64.Bitmap, evictAfterRounds uint64) (evicted uint64, err error) { - sc.lock.Lock() - defer sc.lock.Unlock() - sc.commitID++ - //var justDeleted, justInserted []uint64 - encID := make([]byte, 8) - for addr, id := range sc.senderIDs { - binary.BigEndian.PutUint64(encID, id) - currentV, err := tx.GetOne(kv.PoolSenderID, []byte(addr)) - if err != nil { - return evicted, err - } - if currentV != nil && bytes.Equal(currentV, encID) { - continue - } - //fmt.Printf("Put: %d\n", id) - if err := tx.Put(kv.PoolSenderID, []byte(addr), encID); err != nil { - return evicted, err - } - if err := tx.Put(kv.PoolSenderIDToAdress, encID, []byte(addr)); err != nil { - return evicted, err - } - //if ASSERT { - // justInserted = append(justInserted, id) - //} - if byNonce.count(id) == 0 { - sendersWithoutTransactions.Add(id) - } - } - //if ASSERT { - // sort.Slice(justInserted, func(i, j int) bool { return justInserted[i] < justInserted[j] }) - //} - - v := make([]byte, 8, 8+32) - for id, info := range sc.senderInfo { - if info.nonce == 0 && info.balance.IsZero() { - continue - } - binary.BigEndian.PutUint64(encID, id) - binary.BigEndian.PutUint64(v, info.nonce) - v = append(v[:8], info.balance.Bytes()...) - //TODO: check that nothing changed - if err := tx.Put(kv.PoolSender, encID, v); err != nil { - return evicted, err - } - } - //fmt.Printf("justDeleted:%d, justInserted:%d\n", justDeleted, justInserted) - if ASSERT { - { - duplicates := map[string]uint64{} - _ = tx.ForPrefix(kv.PoolSenderIDToAdress, nil, func(k, v []byte) error { - id, ok := duplicates[string(v)] - if ok { - fmt.Printf("duplicate: %d,%d,%x\n", id, binary.BigEndian.Uint64(k), string(v)) - panic(1) - } - return nil - }) - } - { - duplicates := map[uint64]string{} - _ = tx.ForPrefix(kv.PoolSenderIDToAdress, nil, func(k, v []byte) error { - id := binary.BigEndian.Uint64(v) - addr, ok := duplicates[id] - if ok { - fmt.Printf("duplicate: %x,%x,%d\n", addr, k, binary.BigEndian.Uint64(v)) - panic(1) - } - return nil - }) - } - } - - binary.BigEndian.PutUint64(encID, sc.commitID) - // Eviction logic. Store into db list of senders: - // - which have discarded transactions at this commit - // - but have no active transactions left - // after some time read old records from DB and if such senders still have no transactions - evict them - if sendersWithoutTransactions.GetCardinality() > 0 { - sendersWithoutTransactions.RunOptimize() - b, err := sendersWithoutTransactions.MarshalBinary() - if err != nil { - return 0, err - } - if err := tx.Append(kv.PoolStateEviction, encID, b); err != nil { - return evicted, err - } - } - - c, err := tx.RwCursor(kv.PoolStateEviction) - if err != nil { - return evicted, err - } - defer c.Close() - for k, v, err := c.First(); k != nil; k, v, err = c.Next() { - if err != nil { - return evicted, err - } - if sc.commitID-binary.BigEndian.Uint64(k) < evictAfterRounds { - break - } - ids := roaring64.New() - if err := ids.UnmarshalBinary(v); err != nil { - return 0, err - } - for _, senderID := range ids.ToArray() { - if _, ok := sc.senderInfo[senderID]; ok { - continue - } - if byNonce.count(senderID) > 0 { - continue - } - addr, err := tx.GetOne(kv.PoolSenderID, encID) - if err != nil { - return evicted, err - } - if _, ok := sc.senderIDs[string(addr)]; ok { - continue - } - if err := tx.Delete(kv.PoolSenderID, addr, nil); err != nil { - return evicted, err - } - if err := tx.Delete(kv.PoolSenderIDToAdress, encID, nil); err != nil { - return evicted, err - } - if err := tx.Delete(kv.PoolSender, encID, nil); err != nil { - return evicted, err - } - evicted++ - //if ASSERT { - // justDeleted = append(justDeleted, senderID) - //} - } - if err := c.DeleteCurrent(); err != nil { - return evicted, err - } - } - - if ASSERT { - _ = tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { - //id := binary.BigEndian.Uint64(v[:8]) - //for _, senderID := range justDeleted { - // if senderID == id { - // fmt.Printf("delted id still has tx in db: %d,%x\n", id, k) - // panic(1) - // } - //} - vv, err := tx.GetOne(kv.PoolSenderIDToAdress, v[:8]) - if err != nil { - return err - } - if len(vv) == 0 { - cc, _ := tx.Cursor(kv.PoolSenderIDToAdress) - last, lastAddr, _ := cc.Last() - slots := TxSlots{} - slots.Growth(1) - slots.txs[0] = &TxSlot{} - parseCtx := NewTxParseContext() - _, err := parseCtx.ParseTransaction(v[8+8:], 0, slots.txs[0], slots.senders.At(0)) - if err != nil { - log.Error("er", "er", err) - } - fmt.Printf("sender:%x, txHash: %x\n", slots.senders.At(0), slots.txs[0].idHash) - - fmt.Printf("last: %d,%x\n", binary.BigEndian.Uint64(last), lastAddr) - fmt.Printf("now: %d\n", sc.senderID) - fmt.Printf("not foundd: %d,%x,%x,%x\n", binary.BigEndian.Uint64(v[:8]), k, v, vv) - fmt.Printf("aa: %x,%x,%x\n", k, v, vv) - //fmt.Printf("justDeleted:%d, justInserted:%d\n", justDeleted, justInserted) - panic("no-no") - } - return nil - }) - } - - binary.BigEndian.PutUint64(encID, sc.blockHeight.Load()) - if err := tx.Put(kv.PoolInfo, SenderCacheHeightKey, encID); err != nil { - return evicted, err - } - if err := tx.Put(kv.PoolInfo, SenderCacheHashKey, []byte(sc.blockHash.Load())); err != nil { - return evicted, err - } - binary.BigEndian.PutUint64(encID, sc.senderID) - if err := tx.Put(kv.PoolInfo, SenderCacheIDKey, encID); err != nil { - return evicted, err - } - binary.BigEndian.PutUint64(encID, sc.commitID) - if err := tx.Put(kv.PoolInfo, SenderCommitIDKey, encID); err != nil { - return evicted, err - } - lastCommitTime, err := time.Now().MarshalBinary() - if err != nil { - return evicted, err - } - if err := tx.Put(kv.PoolInfo, SenderCommitTimeKey, lastCommitTime); err != nil { - return evicted, err - } - - sc.senderIDs = map[string]uint64{} - sc.senderInfo = map[uint64]*senderInfo{} - return evicted, nil -} - -func loadSender(coreTx kv.Tx, addr []byte) (*senderInfo, error) { - encoded, err := coreTx.GetOne(kv.PlainState, addr) - if err != nil { - return nil, err - } - if len(encoded) == 0 { - //return nil, nil - return newSenderInfo(0, *uint256.NewInt(0)), nil - } - nonce, balance, err := DecodeSender(encoded) - if err != nil { - return nil, err - } - return newSenderInfo(nonce, balance), nil -} // TxPool - holds all pool-related data structures and lock-based tiny methods // most of logic implemented by pure tests-friendly functions @@ -1085,203 +775,6 @@ func (p *TxPool) OnNewBlock(stateChanges map[string]senderInfo, unwindTxs, mined log.Info("new block", "number", blockHeight, "in", time.Since(t)) return nil } -func (p *TxPool) flush(db kv.RwDB) (evicted, written uint64, err error) { - p.lock.Lock() - defer p.lock.Unlock() - //it's important that write db tx is done inside lock, to make last writes visible for all read operations - if err := db.Update(context.Background(), func(tx kv.RwTx) error { - evicted, err = p.flushLocked(tx) - if err != nil { - return err - } - written, _, err = tx.(*mdbx.MdbxTx).SpaceDirty() - if err != nil { - return err - } - return nil - }); err != nil { - return 0, 0, err - } - return evicted, written, nil -} -func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) { - sendersWithoutTransactions := roaring64.New() - for i := 0; i < len(p.deletedTxs); i++ { - if p.txNonce2Tx.count(p.deletedTxs[i].Tx.senderID) == 0 { - sendersWithoutTransactions.Add(p.deletedTxs[i].Tx.senderID) - } - if err := tx.Delete(kv.PoolTransaction, p.deletedTxs[i].Tx.idHash[:], nil); err != nil { - return evicted, err - } - p.deletedTxs[i] = nil // for gc - } - - txHashes := p.localsHistory.Keys() - encID := make([]byte, 8) - if err := tx.ClearBucket(kv.RecentLocalTransaction); err != nil { - return evicted, err - } - for i := range txHashes { - binary.BigEndian.PutUint64(encID, uint64(i)) - if err := tx.Append(kv.RecentLocalTransaction, encID, txHashes[i].([]byte)); err != nil { - return evicted, err - } - } - - if ASSERT { - _ = tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { - vv, err := tx.GetOne(kv.PoolSenderIDToAdress, v[:8]) - if err != nil { - return err - } - if len(vv) == 0 { - cc, _ := tx.Cursor(kv.PoolSenderIDToAdress) - last, lastAddr, _ := cc.Last() - if len(last) > 0 { - fmt.Printf("last: %d,%x\n", binary.BigEndian.Uint64(last), lastAddr) - } - fmt.Printf("not foundd: %d,%x,%x,%x\n", binary.BigEndian.Uint64(v[:8]), k, v, vv) - fmt.Printf("aa: %x,%x,%x\n", k, v, vv) - panic("no-no") - } - return nil - }) - } - - v := make([]byte, 0, 1024) - for txHash, metaTx := range p.byHash { - if metaTx.Tx.rlp == nil { - continue - } - if ASSERT { - if p.txNonce2Tx.count(metaTx.Tx.senderID) == 0 { - panic("here i am") - } - } - v = ensureEnoughSize(v, 8+8+len(metaTx.Tx.rlp)) - binary.BigEndian.PutUint64(v, metaTx.Tx.senderID) - binary.BigEndian.PutUint64(v[8:], 0) // block num - timestamp - copy(v[8+8:], metaTx.Tx.rlp) - if ASSERT { - if _, ok := p.senders.senderInfo[metaTx.Tx.senderID]; !ok { - info, err := p.senders.info(metaTx.Tx.senderID, tx, false) - if err != nil { - panic(err) - } - if info == nil { - panic("lost") - } - } - } - - if err := tx.Put(kv.PoolTransaction, []byte(txHash), v); err != nil { - return evicted, err - } - metaTx.Tx.rlp = nil - } - - if ASSERT { - txs := TxSlots{} - parseCtx := NewTxParseContext() - parseCtx.WithSender(false) - i := 0 - hashID := [32]byte{} - if err := tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { - txs.Growth(i + 1) - txs.txs[i] = &TxSlot{} - - _, err := parseCtx.ParseTransaction(v[8+8:], 0, txs.txs[i], nil) - if err != nil { - return fmt.Errorf("err: %w, rlp: %x\n", err, v[8+8:]) - } - txs.txs[i].rlp = nil // means that we don't need store it in db anymore - txs.txs[i].senderID = binary.BigEndian.Uint64(v) - //bkock num = binary.BigEndian.Uint64(v[8:]) - copy(hashID[:], k) - _, isLocalTx := p.localsHistory.Get(hashID) - txs.isLocal[i] = isLocalTx - - if !p.txNonce2Tx.has(newMetaTx(txs.txs[i], txs.isLocal[i])) { - panic("aaaaaa") - } - i++ - return nil - }); err != nil { - panic(err) - } - } - - binary.BigEndian.PutUint64(encID, p.protocolBaseFee.Load()) - if err := tx.Put(kv.PoolInfo, PoolProtocolBaseFeeKey, encID); err != nil { - return evicted, err - } - binary.BigEndian.PutUint64(encID, p.pendingBaseFee.Load()) - if err := tx.Put(kv.PoolInfo, PoolPendingBaseFeeKey, encID); err != nil { - return evicted, err - } - - evicted, err = p.senders.flush(tx, p.txNonce2Tx, sendersWithoutTransactions, p.cfg.evictSendersAfterRounds) - if err != nil { - return evicted, err - } - if ASSERT { - _ = tx.ForEach(kv.PoolSenderIDToAdress, nil, func(idBytes, addr []byte) error { - found := false - _ = tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { - if bytes.Equal(v[:8], idBytes) { - found = true - return fmt.Errorf("stop") - } - return nil - }) - if !found { - found = false - _ = tx.ForEach(kv.PoolStateEviction, nil, func(k, v []byte) error { - ids := roaring64.New() - if err := ids.UnmarshalBinary(v); err != nil { - return err - } - for _, id := range ids.ToArray() { - if binary.BigEndian.Uint64(idBytes) == id { - found = true - return fmt.Errorf("stop") - } - } - return nil - }) - } - if !found { - if p.txNonce2Tx.count(binary.BigEndian.Uint64(idBytes)) > 0 { - panic("?") - } - _ = tx.ForEach(kv.PoolStateEviction, nil, func(k, v []byte) error { - fmt.Printf("ev: %x\n", v) - return nil - }) - _ = tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { - fmt.Printf("tr: %x\n", v) - return nil - }) - _ = tx.ForEach(kv.PoolSenderIDToAdress, nil, func(idBytes, addr []byte) error { - fmt.Printf("id2addr: %x\n", idBytes) - return nil - }) - p.senders.printDebug("gb") - fmt.Printf("sz:%d,%d\n", p.txNonce2Tx.tree.Len(), sendersWithoutTransactions.ToArray()) - fmt.Printf("garbage found: %x\n", idBytes) - panic(1) - } - return nil - }) - } - - // clean - in-memory data structure as later as possible - because if during this Tx will happen error, - // DB will stay consitant but some in-memory structures may be alread cleaned, and retry will not work - // failed write transaction must not create side-effects - p.deletedTxs = p.deletedTxs[:0] - - return evicted, nil -} func (p *TxPool) discardLocked(mt *metaTx) { delete(p.byHash, string(mt.Tx.idHash[:])) p.deletedTxs = append(p.deletedTxs, mt) @@ -1289,117 +782,7 @@ func (p *TxPool) discardLocked(mt *metaTx) { if mt.subPool&IsLocal != 0 { p.localsHistory.Add(mt.Tx.idHash, struct{}{}) } - - if ASSERT && p.txNonce2Tx.tree.Len() != len(p.byHash) { - panic(1) - } } - -func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error { - p.lock.Lock() - defer p.lock.Unlock() - if ASSERT { - _ = tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { - vv, err := tx.GetOne(kv.PoolSenderIDToAdress, v[:8]) - if err != nil { - return err - } - if len(vv) == 0 { - cc, _ := tx.Cursor(kv.PoolSenderIDToAdress) - last, lastAddr, _ := cc.Last() - fmt.Printf("last: %d,%x\n", binary.BigEndian.Uint64(last), lastAddr) - fmt.Printf("now: %d\n", p.senders.senderID) - fmt.Printf("not foundd: %d,%x,%x,%x\n", binary.BigEndian.Uint64(v[:8]), k, v, vv) - panic("no-no") - } - return nil - }) - } - - if err := p.senders.fromDB(ctx, tx, coreTx); err != nil { - return err - } - - if err := tx.ForEach(kv.RecentLocalTransaction, nil, func(k, v []byte) error { - hashID := [32]byte{} - copy(hashID[:], v) - p.localsHistory.Add(hashID, struct{}{}) - return nil - }); err != nil { - return err - } - - txs := TxSlots{} - parseCtx := NewTxParseContext() - parseCtx.WithSender(false) - i := 0 - hashID := [32]byte{} - if err := tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { - txs.Growth(i + 1) - txs.txs[i] = &TxSlot{} - - _, err := parseCtx.ParseTransaction(v[8+8:], 0, txs.txs[i], nil) - if err != nil { - return fmt.Errorf("err: %w, rlp: %x\n", err, v[8+8:]) - } - txs.txs[i].rlp = nil // means that we don't need store it in db anymore - txs.txs[i].senderID = binary.BigEndian.Uint64(v) - - senderAddr, err := tx.GetOne(kv.PoolSenderIDToAdress, v[:8]) - if err != nil { - return err - } - if len(senderAddr) == 0 { - panic("must not happen") - } - copy(txs.senders.At(i), senderAddr) - //bkock num = binary.BigEndian.Uint64(v[8:]) - copy(hashID[:], k) - _, isLocalTx := p.localsHistory.Get(hashID) - txs.isLocal[i] = isLocalTx - i++ - return nil - }); err != nil { - return err - } - - var protocolBaseFee, pendingBaseFee uint64 - { - v, err := tx.GetOne(kv.PoolInfo, PoolProtocolBaseFeeKey) - if err != nil { - return err - } - if len(v) > 0 { - protocolBaseFee = binary.BigEndian.Uint64(v) - } - } - { - v, err := tx.GetOne(kv.PoolInfo, PoolPendingBaseFeeKey) - if err != nil { - return err - } - if len(v) > 0 { - pendingBaseFee = binary.BigEndian.Uint64(v) - } - } - cacheMisses, err := p.senders.onNewTxs(tx, txs) - if err != nil { - return err - } - if len(cacheMisses) > 0 { - if err := p.senders.loadFromCore(coreTx, cacheMisses); err != nil { - return err - } - } - if err := onNewTxs(tx, p.senders, txs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.txNonce2Tx, p.byHash, p.discardLocked); err != nil { - return err - } - p.pendingBaseFee.Store(pendingBaseFee) - p.protocolBaseFee.Store(protocolBaseFee) - - return nil -} - func onNewBlock(tx kv.Tx, senders *SendersCache, unwindTxs TxSlots, minedTxs []*TxSlot, protocolBaseFee, pendingBaseFee uint64, pending, baseFee, queued *SubPool, byNonce *ByNonce, byHash map[string]*metaTx, discard func(*metaTx)) error { for i := range unwindTxs.txs { if unwindTxs.txs[i].senderID == 0 { @@ -1963,6 +1346,621 @@ func copyBytes(b []byte) (copiedBytes []byte) { return } +func (p *TxPool) flush(db kv.RwDB) (evicted, written uint64, err error) { + p.lock.Lock() + defer p.lock.Unlock() + //it's important that write db tx is done inside lock, to make last writes visible for all read operations + if err := db.Update(context.Background(), func(tx kv.RwTx) error { + evicted, err = p.flushLocked(tx) + if err != nil { + return err + } + written, _, err = tx.(*mdbx.MdbxTx).SpaceDirty() + if err != nil { + return err + } + return nil + }); err != nil { + return 0, 0, err + } + return evicted, written, nil +} +func (p *TxPool) flushLocked(tx kv.RwTx) (evicted uint64, err error) { + sendersWithoutTransactions := roaring64.New() + for i := 0; i < len(p.deletedTxs); i++ { + if p.txNonce2Tx.count(p.deletedTxs[i].Tx.senderID) == 0 { + sendersWithoutTransactions.Add(p.deletedTxs[i].Tx.senderID) + } + if err := tx.Delete(kv.PoolTransaction, p.deletedTxs[i].Tx.idHash[:], nil); err != nil { + return evicted, err + } + p.deletedTxs[i] = nil // for gc + } + + txHashes := p.localsHistory.Keys() + encID := make([]byte, 8) + if err := tx.ClearBucket(kv.RecentLocalTransaction); err != nil { + return evicted, err + } + for i := range txHashes { + binary.BigEndian.PutUint64(encID, uint64(i)) + if err := tx.Append(kv.RecentLocalTransaction, encID, txHashes[i].([]byte)); err != nil { + return evicted, err + } + } + + if ASSERT { + _ = tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { + vv, err := tx.GetOne(kv.PoolSenderIDToAdress, v[:8]) + if err != nil { + return err + } + if len(vv) == 0 { + cc, _ := tx.Cursor(kv.PoolSenderIDToAdress) + last, lastAddr, _ := cc.Last() + if len(last) > 0 { + fmt.Printf("last: %d,%x\n", binary.BigEndian.Uint64(last), lastAddr) + } + fmt.Printf("not foundd: %d,%x,%x,%x\n", binary.BigEndian.Uint64(v[:8]), k, v, vv) + fmt.Printf("aa: %x,%x,%x\n", k, v, vv) + panic("no-no") + } + return nil + }) + } + + v := make([]byte, 0, 1024) + for txHash, metaTx := range p.byHash { + if metaTx.Tx.rlp == nil { + continue + } + if ASSERT { + if p.txNonce2Tx.count(metaTx.Tx.senderID) == 0 { + panic("here i am") + } + } + v = ensureEnoughSize(v, 8+8+len(metaTx.Tx.rlp)) + binary.BigEndian.PutUint64(v, metaTx.Tx.senderID) + binary.BigEndian.PutUint64(v[8:], 0) // block num - timestamp + copy(v[8+8:], metaTx.Tx.rlp) + if ASSERT { + if _, ok := p.senders.senderInfo[metaTx.Tx.senderID]; !ok { + info, err := p.senders.info(metaTx.Tx.senderID, tx, false) + if err != nil { + panic(err) + } + if info == nil { + panic("lost") + } + } + } + + if err := tx.Put(kv.PoolTransaction, []byte(txHash), v); err != nil { + return evicted, err + } + metaTx.Tx.rlp = nil + } + + if ASSERT { + txs := TxSlots{} + parseCtx := NewTxParseContext() + parseCtx.WithSender(false) + i := 0 + hashID := [32]byte{} + if err := tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { + txs.Growth(i + 1) + txs.txs[i] = &TxSlot{} + + _, err := parseCtx.ParseTransaction(v[8+8:], 0, txs.txs[i], nil) + if err != nil { + return fmt.Errorf("err: %w, rlp: %x\n", err, v[8+8:]) + } + txs.txs[i].rlp = nil // means that we don't need store it in db anymore + txs.txs[i].senderID = binary.BigEndian.Uint64(v) + //bkock num = binary.BigEndian.Uint64(v[8:]) + copy(hashID[:], k) + _, isLocalTx := p.localsHistory.Get(hashID) + txs.isLocal[i] = isLocalTx + + if !p.txNonce2Tx.has(newMetaTx(txs.txs[i], txs.isLocal[i])) { + panic("aaaaaa") + } + i++ + return nil + }); err != nil { + panic(err) + } + } + + binary.BigEndian.PutUint64(encID, p.protocolBaseFee.Load()) + if err := tx.Put(kv.PoolInfo, PoolProtocolBaseFeeKey, encID); err != nil { + return evicted, err + } + binary.BigEndian.PutUint64(encID, p.pendingBaseFee.Load()) + if err := tx.Put(kv.PoolInfo, PoolPendingBaseFeeKey, encID); err != nil { + return evicted, err + } + + evicted, err = p.senders.flush(tx, p.txNonce2Tx, sendersWithoutTransactions, p.cfg.evictSendersAfterRounds) + if err != nil { + return evicted, err + } + if ASSERT { + _ = tx.ForEach(kv.PoolSenderIDToAdress, nil, func(idBytes, addr []byte) error { + found := false + _ = tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { + if bytes.Equal(v[:8], idBytes) { + found = true + return fmt.Errorf("stop") + } + return nil + }) + if !found { + found = false + _ = tx.ForEach(kv.PoolStateEviction, nil, func(k, v []byte) error { + ids := roaring64.New() + if err := ids.UnmarshalBinary(v); err != nil { + return err + } + for _, id := range ids.ToArray() { + if binary.BigEndian.Uint64(idBytes) == id { + found = true + return fmt.Errorf("stop") + } + } + return nil + }) + } + if !found { + if p.txNonce2Tx.count(binary.BigEndian.Uint64(idBytes)) > 0 { + panic("?") + } + _ = tx.ForEach(kv.PoolStateEviction, nil, func(k, v []byte) error { + fmt.Printf("ev: %x\n", v) + return nil + }) + _ = tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { + fmt.Printf("tr: %x\n", v) + return nil + }) + _ = tx.ForEach(kv.PoolSenderIDToAdress, nil, func(idBytes, addr []byte) error { + fmt.Printf("id2addr: %x\n", idBytes) + return nil + }) + p.senders.printDebug("gb") + fmt.Printf("sz:%d,%d\n", p.txNonce2Tx.tree.Len(), sendersWithoutTransactions.ToArray()) + fmt.Printf("garbage found: %x\n", idBytes) + panic(1) + } + return nil + }) + } + + // clean - in-memory data structure as later as possible - because if during this Tx will happen error, + // DB will stay consitant but some in-memory structures may be alread cleaned, and retry will not work + // failed write transaction must not create side-effects + p.deletedTxs = p.deletedTxs[:0] + + return evicted, nil +} + +func (sc *SendersCache) flush(tx kv.RwTx, byNonce *ByNonce, sendersWithoutTransactions *roaring64.Bitmap, evictAfterRounds uint64) (evicted uint64, err error) { + sc.lock.Lock() + defer sc.lock.Unlock() + sc.commitID++ + //var justDeleted, justInserted []uint64 + encID := make([]byte, 8) + for addr, id := range sc.senderIDs { + binary.BigEndian.PutUint64(encID, id) + currentV, err := tx.GetOne(kv.PoolSenderID, []byte(addr)) + if err != nil { + return evicted, err + } + if currentV != nil && bytes.Equal(currentV, encID) { + continue + } + //fmt.Printf("Put: %d\n", id) + if err := tx.Put(kv.PoolSenderID, []byte(addr), encID); err != nil { + return evicted, err + } + if err := tx.Put(kv.PoolSenderIDToAdress, encID, []byte(addr)); err != nil { + return evicted, err + } + //if ASSERT { + // justInserted = append(justInserted, id) + //} + if byNonce.count(id) == 0 { + sendersWithoutTransactions.Add(id) + } + } + //if ASSERT { + // sort.Slice(justInserted, func(i, j int) bool { return justInserted[i] < justInserted[j] }) + //} + + v := make([]byte, 8, 8+32) + for id, info := range sc.senderInfo { + if info.nonce == 0 && info.balance.IsZero() { + continue + } + binary.BigEndian.PutUint64(encID, id) + binary.BigEndian.PutUint64(v, info.nonce) + v = append(v[:8], info.balance.Bytes()...) + //TODO: check that nothing changed + if err := tx.Put(kv.PoolSender, encID, v); err != nil { + return evicted, err + } + } + //fmt.Printf("justDeleted:%d, justInserted:%d\n", justDeleted, justInserted) + if ASSERT { + { + duplicates := map[string]uint64{} + _ = tx.ForPrefix(kv.PoolSenderIDToAdress, nil, func(k, v []byte) error { + id, ok := duplicates[string(v)] + if ok { + fmt.Printf("duplicate: %d,%d,%x\n", id, binary.BigEndian.Uint64(k), string(v)) + panic(1) + } + return nil + }) + } + { + duplicates := map[uint64]string{} + _ = tx.ForPrefix(kv.PoolSenderIDToAdress, nil, func(k, v []byte) error { + id := binary.BigEndian.Uint64(v) + addr, ok := duplicates[id] + if ok { + fmt.Printf("duplicate: %x,%x,%d\n", addr, k, binary.BigEndian.Uint64(v)) + panic(1) + } + return nil + }) + } + } + + binary.BigEndian.PutUint64(encID, sc.commitID) + // Eviction logic. Store into db list of senders: + // - which have discarded transactions at this commit + // - but have no active transactions left + // after some time read old records from DB and if such senders still have no transactions - evict them + if sendersWithoutTransactions.GetCardinality() > 0 { + sendersWithoutTransactions.RunOptimize() + b, err := sendersWithoutTransactions.MarshalBinary() + if err != nil { + return 0, err + } + if err := tx.Append(kv.PoolStateEviction, encID, b); err != nil { + return evicted, err + } + } + + c, err := tx.RwCursor(kv.PoolStateEviction) + if err != nil { + return evicted, err + } + defer c.Close() + for k, v, err := c.First(); k != nil; k, v, err = c.Next() { + if err != nil { + return evicted, err + } + if sc.commitID-binary.BigEndian.Uint64(k) < evictAfterRounds { + break + } + ids := roaring64.New() + if err := ids.UnmarshalBinary(v); err != nil { + return 0, err + } + for _, senderID := range ids.ToArray() { + if _, ok := sc.senderInfo[senderID]; ok { + continue + } + if byNonce.count(senderID) > 0 { + continue + } + addr, err := tx.GetOne(kv.PoolSenderID, encID) + if err != nil { + return evicted, err + } + if _, ok := sc.senderIDs[string(addr)]; ok { + continue + } + if err := tx.Delete(kv.PoolSenderID, addr, nil); err != nil { + return evicted, err + } + if err := tx.Delete(kv.PoolSenderIDToAdress, encID, nil); err != nil { + return evicted, err + } + if err := tx.Delete(kv.PoolSender, encID, nil); err != nil { + return evicted, err + } + evicted++ + //if ASSERT { + // justDeleted = append(justDeleted, senderID) + //} + } + if err := c.DeleteCurrent(); err != nil { + return evicted, err + } + } + + if ASSERT { + _ = tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { + //id := binary.BigEndian.Uint64(v[:8]) + //for _, senderID := range justDeleted { + // if senderID == id { + // fmt.Printf("delted id still has tx in db: %d,%x\n", id, k) + // panic(1) + // } + //} + vv, err := tx.GetOne(kv.PoolSenderIDToAdress, v[:8]) + if err != nil { + return err + } + if len(vv) == 0 { + cc, _ := tx.Cursor(kv.PoolSenderIDToAdress) + last, lastAddr, _ := cc.Last() + slots := TxSlots{} + slots.Growth(1) + slots.txs[0] = &TxSlot{} + parseCtx := NewTxParseContext() + _, err := parseCtx.ParseTransaction(v[8+8:], 0, slots.txs[0], slots.senders.At(0)) + if err != nil { + log.Error("er", "er", err) + } + fmt.Printf("sender:%x, txHash: %x\n", slots.senders.At(0), slots.txs[0].idHash) + + fmt.Printf("last: %d,%x\n", binary.BigEndian.Uint64(last), lastAddr) + fmt.Printf("now: %d\n", sc.senderID) + fmt.Printf("not foundd: %d,%x,%x,%x\n", binary.BigEndian.Uint64(v[:8]), k, v, vv) + fmt.Printf("aa: %x,%x,%x\n", k, v, vv) + //fmt.Printf("justDeleted:%d, justInserted:%d\n", justDeleted, justInserted) + panic("no-no") + } + return nil + }) + } + + binary.BigEndian.PutUint64(encID, sc.blockHeight.Load()) + if err := tx.Put(kv.PoolInfo, SenderCacheHeightKey, encID); err != nil { + return evicted, err + } + if err := tx.Put(kv.PoolInfo, SenderCacheHashKey, []byte(sc.blockHash.Load())); err != nil { + return evicted, err + } + binary.BigEndian.PutUint64(encID, sc.senderID) + if err := tx.Put(kv.PoolInfo, SenderCacheIDKey, encID); err != nil { + return evicted, err + } + binary.BigEndian.PutUint64(encID, sc.commitID) + if err := tx.Put(kv.PoolInfo, SenderCommitIDKey, encID); err != nil { + return evicted, err + } + lastCommitTime, err := time.Now().MarshalBinary() + if err != nil { + return evicted, err + } + if err := tx.Put(kv.PoolInfo, SenderCommitTimeKey, lastCommitTime); err != nil { + return evicted, err + } + + sc.senderIDs = map[string]uint64{} + sc.senderInfo = map[uint64]*senderInfo{} + return evicted, nil +} + +func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error { + p.lock.Lock() + defer p.lock.Unlock() + if ASSERT { + _ = tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { + vv, err := tx.GetOne(kv.PoolSenderIDToAdress, v[:8]) + if err != nil { + return err + } + if len(vv) == 0 { + cc, _ := tx.Cursor(kv.PoolSenderIDToAdress) + last, lastAddr, _ := cc.Last() + fmt.Printf("last: %d,%x\n", binary.BigEndian.Uint64(last), lastAddr) + fmt.Printf("now: %d\n", p.senders.senderID) + fmt.Printf("not foundd: %d,%x,%x,%x\n", binary.BigEndian.Uint64(v[:8]), k, v, vv) + panic("no-no") + } + return nil + }) + } + + if err := p.senders.fromDB(ctx, tx, coreTx); err != nil { + return err + } + + if err := tx.ForEach(kv.RecentLocalTransaction, nil, func(k, v []byte) error { + hashID := [32]byte{} + copy(hashID[:], v) + p.localsHistory.Add(hashID, struct{}{}) + return nil + }); err != nil { + return err + } + + txs := TxSlots{} + parseCtx := NewTxParseContext() + parseCtx.WithSender(false) + i := 0 + hashID := [32]byte{} + if err := tx.ForEach(kv.PoolTransaction, nil, func(k, v []byte) error { + txs.Growth(i + 1) + txs.txs[i] = &TxSlot{} + + _, err := parseCtx.ParseTransaction(v[8+8:], 0, txs.txs[i], nil) + if err != nil { + return fmt.Errorf("err: %w, rlp: %x\n", err, v[8+8:]) + } + txs.txs[i].rlp = nil // means that we don't need store it in db anymore + txs.txs[i].senderID = binary.BigEndian.Uint64(v) + + senderAddr, err := tx.GetOne(kv.PoolSenderIDToAdress, v[:8]) + if err != nil { + return err + } + if len(senderAddr) == 0 { + panic("must not happen") + } + copy(txs.senders.At(i), senderAddr) + //bkock num = binary.BigEndian.Uint64(v[8:]) + copy(hashID[:], k) + _, isLocalTx := p.localsHistory.Get(hashID) + txs.isLocal[i] = isLocalTx + i++ + return nil + }); err != nil { + return err + } + + var protocolBaseFee, pendingBaseFee uint64 + { + v, err := tx.GetOne(kv.PoolInfo, PoolProtocolBaseFeeKey) + if err != nil { + return err + } + if len(v) > 0 { + protocolBaseFee = binary.BigEndian.Uint64(v) + } + } + { + v, err := tx.GetOne(kv.PoolInfo, PoolPendingBaseFeeKey) + if err != nil { + return err + } + if len(v) > 0 { + pendingBaseFee = binary.BigEndian.Uint64(v) + } + } + cacheMisses, err := p.senders.onNewTxs(tx, txs) + if err != nil { + return err + } + if len(cacheMisses) > 0 { + if err := p.senders.loadFromCore(coreTx, cacheMisses); err != nil { + return err + } + } + if err := onNewTxs(tx, p.senders, txs, protocolBaseFee, pendingBaseFee, p.pending, p.baseFee, p.queued, p.txNonce2Tx, p.byHash, p.discardLocked); err != nil { + return err + } + p.pendingBaseFee.Store(pendingBaseFee) + p.protocolBaseFee.Store(protocolBaseFee) + + return nil +} + +func (sc *SendersCache) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx) error { + sc.lock.Lock() + defer sc.lock.Unlock() + + { + v, err := tx.GetOne(kv.PoolInfo, SenderCacheHeightKey) + if err != nil { + return err + } + if len(v) > 0 { + sc.blockHeight.Store(binary.BigEndian.Uint64(v)) + } + } + { + v, err := tx.GetOne(kv.PoolInfo, SenderCacheHashKey) + if err != nil { + return err + } + if len(v) > 0 { + sc.blockHash.Store(string(v)) + } + } + { + v, err := tx.GetOne(kv.PoolInfo, SenderCacheIDKey) + if err != nil { + return err + } + if len(v) > 0 { + sc.senderID = binary.BigEndian.Uint64(v) + } + } + { + v, err := tx.GetOne(kv.PoolInfo, SenderCommitIDKey) + if err != nil { + return err + } + if len(v) > 0 { + sc.commitID = binary.BigEndian.Uint64(v) + } + } + + if err := sc.syncMissedStateDiff(ctx, tx, coreTx, 0); err != nil { + return err + } + return nil +} + +func isCanonical(coreTx kv.Tx, num uint64, hash []byte) (bool, error) { + encNum := make([]byte, 8) + binary.BigEndian.PutUint64(encNum, num) + canonical, err := coreTx.GetOne(kv.HeaderCanonical, encNum) + if err != nil { + return false, err + } + + return bytes.Equal(hash, canonical), nil +} + +func changesets(ctx context.Context, from uint64, coreTx kv.Tx) (map[string]senderInfo, error) { + encNum := make([]byte, 8) + diff := map[string]senderInfo{} + binary.BigEndian.PutUint64(encNum, from) + logEvery := time.NewTicker(30 * time.Second) + defer logEvery.Stop() + //TODO: tx.ForEach must be implemented as buffered server-side stream + if err := coreTx.ForEach(kv.AccountChangeSet, encNum, func(k, v []byte) error { + info, err := loadSender(coreTx, v[:20]) + if err != nil { + return err + } + diff[string(v[:20])] = *info + select { + case <-logEvery.C: + log.Info("loading changesets", "block", binary.BigEndian.Uint64(k)) + case <-ctx.Done(): + return nil + default: + } + return nil + }); err != nil { + return nil, err + } + return diff, nil +} + +var SenderCommitTimeKey = []byte("sender_commit_time") +var SenderCacheIDKey = []byte("sender_cache_id") +var SenderCommitIDKey = []byte("sender_commit_id") +var SenderCacheHeightKey = []byte("sender_cache_block_height") +var SenderCacheHashKey = []byte("sender_cache_block_hash") +var PoolPendingBaseFeeKey = []byte("pending_base_fee") +var PoolProtocolBaseFeeKey = []byte("protocol_base_fee") + +func loadSender(coreTx kv.Tx, addr []byte) (*senderInfo, error) { + encoded, err := coreTx.GetOne(kv.PlainState, addr) + if err != nil { + return nil, err + } + if len(encoded) == 0 { + //return nil, nil + return newSenderInfo(0, *uint256.NewInt(0)), nil + } + nonce, balance, err := DecodeSender(encoded) + if err != nil { + return nil, err + } + return newSenderInfo(nonce, balance), nil +} + // recentlyConnectedPeers does buffer IDs of recently connected good peers // then sync of pooled Transaction can happen to all of then at once // DoS protection and performance saving