persistence

This commit is contained in:
alex.sharov 2021-08-25 19:44:21 +07:00
parent a1c7795034
commit 334df676b2
3 changed files with 42 additions and 22 deletions

View File

@ -342,16 +342,18 @@ var ChaindataTables = []string{
} }
const ( const (
RecentLocalTransaction = "RecentLocalTransaction" // sequence_u64 -> tx_hash RecentLocalTransaction = "RecentLocalTransaction" // sequence_u64 -> tx_hash
PooledSenderID = "PooledSenderID" // sender_20bytes -> sender_id_u64 PooledSenderID = "PooledSenderID" // sender_20bytes -> sender_id_u64
PooledSender = "PooledSender" // sender_id_u64 -> nonce, balance PooledSenderIDToAdress = "PooledSenderIDToAddress" // sender_id_u64 -> sender_20bytes
PooledTransaction = "PooledTransaction" // txHash -> sender_id_u64+blockNum_u64+tx_rlp PooledSender = "PooledSender" // sender_id_u64 -> nonce, balance
PoolInfo = "PoolInfo" // option_key -> option_value PooledTransaction = "PooledTransaction" // txHash -> sender_id_u64+blockNum_u64+tx_rlp
PoolInfo = "PoolInfo" // option_key -> option_value
) )
var TxPoolTables = []string{ var TxPoolTables = []string{
RecentLocalTransaction, RecentLocalTransaction,
PooledSenderID, PooledSenderID,
PooledSenderIDToAdress,
PooledSender, PooledSender,
PooledTransaction, PooledTransaction,
PoolInfo, PoolInfo,

View File

@ -166,10 +166,7 @@ func (sc *SendersCache) info(id uint64, tx kv.Tx) (*senderInfo, error) {
return nil, err return nil, err
} }
if len(v) == 0 { if len(v) == 0 {
panic("what to do in this case?") return nil, nil // don't fallback to core db, it will be manually done in right place
info = newSenderInfo(0, *uint256.NewInt(0))
sc.senderInfo[id] = info
return info, nil
} }
balance := uint256.NewInt(0) balance := uint256.NewInt(0)
balance.SetBytes(v[8:]) balance.SetBytes(v[8:])
@ -367,8 +364,11 @@ func (sc *SendersCache) setTxSenderID(tx kv.Tx, txs TxSlots) (map[uint64]string,
txs.txs[i].senderID = id txs.txs[i].senderID = id
// load data from db if need // load data from db if need
_, ok = sc.senderInfo[txs.txs[i].senderID] info, err := sc.info(txs.txs[i].senderID, tx)
if ok { if err != nil {
return nil, err
}
if info != nil {
continue continue
} }
_, ok = toLoad[txs.txs[i].senderID] _, ok = toLoad[txs.txs[i].senderID]
@ -500,10 +500,16 @@ func (sc *SendersCache) flush(tx kv.RwTx) error {
if currentV != nil && bytes.Equal(currentV, encID) { if currentV != nil && bytes.Equal(currentV, encID) {
continue continue
} }
fmt.Printf("flushed ids:%x,%x\n", []byte(addr), encID)
if err := tx.Put(kv.PooledSenderID, []byte(addr), encID); err != nil { if err := tx.Put(kv.PooledSenderID, []byte(addr), encID); err != nil {
return err return err
} }
if err := tx.Put(kv.PooledSenderIDToAdress, encID, []byte(addr)); err != nil {
return err
}
} }
v := make([]byte, 8, 8+32) v := make([]byte, 8, 8+32)
for id, info := range sc.senderInfo { for id, info := range sc.senderInfo {
binary.BigEndian.PutUint64(encID, id) binary.BigEndian.PutUint64(encID, id)
@ -525,6 +531,9 @@ func (sc *SendersCache) flush(tx kv.RwTx) error {
if err := tx.Put(kv.PoolInfo, SenderCacheHeightKey, encID); err != nil { if err := tx.Put(kv.PoolInfo, SenderCacheHeightKey, encID); err != nil {
return err return err
} }
if err := tx.Put(kv.PoolInfo, SenderCacheHashKey, []byte(sc.blockHash.Load())); err != nil {
return err
}
binary.BigEndian.PutUint64(encID, sc.senderID) binary.BigEndian.PutUint64(encID, sc.senderID)
if err := tx.Put(kv.PoolInfo, SenderCacheIDKey, encID); err != nil { if err := tx.Put(kv.PoolInfo, SenderCacheIDKey, encID); err != nil {
return err return err
@ -958,6 +967,15 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx, senders *
return err return err
} }
txs.txs[i].senderID = binary.BigEndian.Uint64(v) txs.txs[i].senderID = binary.BigEndian.Uint64(v)
senderAddr, err := tx.GetOne(kv.PooledSenderIDToAdress, v[:8])
if err != nil {
return err
}
fmt.Printf("load ids:%x,%x\n", senderAddr, v[:8])
if len(senderAddr) == 0 {
panic("must not happen")
}
copy(txs.senders.At(i), senderAddr)
//bkock num = binary.BigEndian.Uint64(v[8:]) //bkock num = binary.BigEndian.Uint64(v[8:])
copy(hashID[:], k) copy(hashID[:], k)
_, isLocalTx := p.localsHistory.Get(hashID) _, isLocalTx := p.localsHistory.Get(hashID)
@ -989,7 +1007,6 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.RwTx, coreTx kv.Tx, senders *
return err return err
} }
if len(cacheMisses) > 0 { if len(cacheMisses) > 0 {
//fmt.Printf("%d\n",len(cacheMisses))
if err := senders.loadFromCore(coreTx, cacheMisses); err != nil { if err := senders.loadFromCore(coreTx, cacheMisses); err != nil {
return err return err
} }

View File

@ -7,7 +7,6 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/binary" "encoding/binary"
"fmt"
"testing" "testing"
"github.com/google/btree" "github.com/google/btree"
@ -521,6 +520,8 @@ func FuzzOnNewBlocks11(f *testing.F) {
assert.NoError(err) assert.NoError(err)
err = p2.fromDB(context.Background(), tx, nil, s2) err = p2.fromDB(context.Background(), tx, nil, s2)
require.NoError(t, err) require.NoError(t, err)
//todo: check that after load from db tx linked to same sender
check(txs2, TxSlots{}, "fromDB") check(txs2, TxSlots{}, "fromDB")
//checkNotify(txs2, TxSlots{}, "fromDB") //checkNotify(txs2, TxSlots{}, "fromDB")
assert.Equal(sendersCache.senderID, s2.senderID) assert.Equal(sendersCache.senderID, s2.senderID)
@ -529,15 +530,15 @@ func FuzzOnNewBlocks11(f *testing.F) {
require.Equal(t, 0, len(s2.senderIDs)) require.Equal(t, 0, len(s2.senderIDs))
require.Equal(t, len(sendersCache.senderInfo), len(s2.senderInfo)) require.Equal(t, len(sendersCache.senderInfo), len(s2.senderInfo))
require.Equal(t, len(pool.byHash), len(p2.byHash)) require.Equal(t, len(pool.byHash), len(p2.byHash))
if pool.pending.Len() != p2.pending.Len() { //if pool.pending.Len() != p2.pending.Len() {
pool.printDebug("p1") // pool.printDebug("p1")
p2.printDebug("p2") // p2.printDebug("p2")
sendersCache.printDebug("s1") // sendersCache.printDebug("s1")
s2.printDebug("s2") // s2.printDebug("s2")
//
fmt.Printf("bef: %d, %d, %d, %d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len(), len(pool.byHash)) // fmt.Printf("bef: %d, %d, %d, %d\n", pool.pending.Len(), pool.baseFee.Len(), pool.queued.Len(), len(pool.byHash))
fmt.Printf("bef2: %d, %d, %d, %d\n", p2.pending.Len(), p2.baseFee.Len(), p2.queued.Len(), len(p2.byHash)) // fmt.Printf("bef2: %d, %d, %d, %d\n", p2.pending.Len(), p2.baseFee.Len(), p2.queued.Len(), len(p2.byHash))
} //}
assert.Equal(pool.pending.Len(), p2.pending.Len()) assert.Equal(pool.pending.Len(), p2.pending.Len())
assert.Equal(pool.baseFee.Len(), p2.baseFee.Len()) assert.Equal(pool.baseFee.Len(), p2.baseFee.Len())