From 334df676b2b5ca513f6c24db8355d2e5be09ecdb Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Wed, 25 Aug 2021 19:44:21 +0700 Subject: [PATCH] persistence --- kv/tables.go | 12 +++++++----- txpool/pool.go | 31 ++++++++++++++++++++++++------- txpool/pool_fuzz_test.go | 21 +++++++++++---------- 3 files changed, 42 insertions(+), 22 deletions(-) diff --git a/kv/tables.go b/kv/tables.go index 2a93c4219..f88710dc7 100644 --- a/kv/tables.go +++ b/kv/tables.go @@ -342,16 +342,18 @@ var ChaindataTables = []string{ } const ( - RecentLocalTransaction = "RecentLocalTransaction" // sequence_u64 -> tx_hash - PooledSenderID = "PooledSenderID" // sender_20bytes -> sender_id_u64 - PooledSender = "PooledSender" // sender_id_u64 -> nonce, balance - PooledTransaction = "PooledTransaction" // txHash -> sender_id_u64+blockNum_u64+tx_rlp - PoolInfo = "PoolInfo" // option_key -> option_value + RecentLocalTransaction = "RecentLocalTransaction" // sequence_u64 -> tx_hash + PooledSenderID = "PooledSenderID" // sender_20bytes -> sender_id_u64 + PooledSenderIDToAdress = "PooledSenderIDToAddress" // sender_id_u64 -> sender_20bytes + PooledSender = "PooledSender" // sender_id_u64 -> nonce, balance + PooledTransaction = "PooledTransaction" // txHash -> sender_id_u64+blockNum_u64+tx_rlp + PoolInfo = "PoolInfo" // option_key -> option_value ) var TxPoolTables = []string{ RecentLocalTransaction, PooledSenderID, + PooledSenderIDToAdress, PooledSender, PooledTransaction, PoolInfo, diff --git a/txpool/pool.go b/txpool/pool.go index 7e5aadd6f..f7d338bfd 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -166,10 +166,7 @@ func (sc *SendersCache) info(id uint64, tx kv.Tx) (*senderInfo, error) { return nil, err } if len(v) == 0 { - panic("what to do in this case?") - info = newSenderInfo(0, *uint256.NewInt(0)) - sc.senderInfo[id] = info - return info, nil + return nil, nil // don't fallback to core db, it will be manually done in right place } balance := uint256.NewInt(0) balance.SetBytes(v[8:]) @@ -367,8 +364,11 @@ func (sc *SendersCache) setTxSenderID(tx kv.Tx, txs TxSlots) (map[uint64]string, txs.txs[i].senderID = id // load data from db if need - _, ok = sc.senderInfo[txs.txs[i].senderID] - if ok { + info, err := sc.info(txs.txs[i].senderID, tx) + if err != nil { + return nil, err + } + if info != nil { continue } _, ok = toLoad[txs.txs[i].senderID] @@ -500,10 +500,16 @@ func (sc *SendersCache) flush(tx kv.RwTx) error { if currentV != nil && bytes.Equal(currentV, encID) { continue } + + fmt.Printf("flushed ids:%x,%x\n", []byte(addr), encID) if err := tx.Put(kv.PooledSenderID, []byte(addr), encID); err != nil { return err } + if err := tx.Put(kv.PooledSenderIDToAdress, encID, []byte(addr)); err != nil { + return err + } } + v := make([]byte, 8, 8+32) for id, info := range sc.senderInfo { binary.BigEndian.PutUint64(encID, id) @@ -525,6 +531,9 @@ func (sc *SendersCache) flush(tx kv.RwTx) error { if err := tx.Put(kv.PoolInfo, SenderCacheHeightKey, encID); err != nil { return err } + if err := tx.Put(kv.PoolInfo, SenderCacheHashKey, []byte(sc.blockHash.Load())); err != nil { + return err + } binary.BigEndian.PutUint64(encID, sc.senderID) if err := tx.Put(kv.PoolInfo, SenderCacheIDKey, encID); err != nil { return err @@ -958,6 +967,15 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx, senders * return err } txs.txs[i].senderID = binary.BigEndian.Uint64(v) + senderAddr, err := tx.GetOne(kv.PooledSenderIDToAdress, v[:8]) + if err != nil { + return err + } + fmt.Printf("load ids:%x,%x\n", senderAddr, v[:8]) + if len(senderAddr) == 0 { + panic("must not happen") + } + copy(txs.senders.At(i), senderAddr) //bkock num = binary.BigEndian.Uint64(v[8:]) copy(hashID[:], k) _, isLocalTx := p.localsHistory.Get(hashID) @@ -989,7 +1007,6 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx, senders * return err } if len(cacheMisses) > 0 { - //fmt.Printf("%d\n",len(cacheMisses)) if err := senders.loadFromCore(coreTx, cacheMisses); err != nil { return err } diff --git a/txpool/pool_fuzz_test.go b/txpool/pool_fuzz_test.go index 15bccfa37..8aeeb832a 100644 --- a/txpool/pool_fuzz_test.go +++ b/txpool/pool_fuzz_test.go @@ -7,7 +7,6 @@ import ( "bytes" "context" "encoding/binary" - "fmt" "testing" "github.com/google/btree" @@ -521,6 +520,8 @@ func FuzzOnNewBlocks11(f *testing.F) { assert.NoError(err) err = p2.fromDB(context.Background(), tx, nil, s2) require.NoError(t, err) + //todo: check that after load from db tx linked to same sender + check(txs2, TxSlots{}, "fromDB") //checkNotify(txs2, TxSlots{}, "fromDB") assert.Equal(sendersCache.senderID, s2.senderID) @@ -529,15 +530,15 @@ func FuzzOnNewBlocks11(f *testing.F) { require.Equal(t, 0, len(s2.senderIDs)) require.Equal(t, len(sendersCache.senderInfo), len(s2.senderInfo)) require.Equal(t, len(pool.byHash), len(p2.byHash)) - if pool.pending.Len() != p2.pending.Len() { - pool.printDebug("p1") - p2.printDebug("p2") - sendersCache.printDebug("s1") - s2.printDebug("s2") - - fmt.Printf("bef: %d, %d, %d, %d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len(), len(pool.byHash)) - fmt.Printf("bef2: %d, %d, %d, %d\n", p2.pending.Len(), p2.baseFee.Len(), p2.queued.Len(), len(p2.byHash)) - } + //if pool.pending.Len() != p2.pending.Len() { + // pool.printDebug("p1") + // p2.printDebug("p2") + // sendersCache.printDebug("s1") + // s2.printDebug("s2") + // + // fmt.Printf("bef: %d, %d, %d, %d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len(), len(pool.byHash)) + // fmt.Printf("bef2: %d, %d, %d, %d\n", p2.pending.Len(), p2.baseFee.Len(), p2.queued.Len(), len(p2.byHash)) + //} assert.Equal(pool.pending.Len(), p2.pending.Len()) assert.Equal(pool.baseFee.Len(), p2.baseFee.Len())