Split erigon2.2 and erigon2.3 prototypes (#4811)

* Split erigon2.2 and erigon2.3 prototypes

* Renaming

* Interruptible and resumable erigon22

* Always regenerate trie

* Introduce aggregator

* Fixes

* cleanup

* Fix lint

* Update to erigon-lib main

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
This commit is contained in:
ledgerwatch 2022-07-28 12:16:37 +01:00 committed by GitHub
parent aee4b53788
commit 09c104803a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1393 additions and 1107 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,564 @@
package commands
import (
"context"
"errors"
"fmt"
"math/bits"
"os"
"os/signal"
"path"
"path/filepath"
"runtime"
"syscall"
"time"
"github.com/holiman/uint256"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx"
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/consensus/misc"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/state"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/types/accounts"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
)
const (
AggregationStep = 3_125_000 /* number of transactions in smallest static file */
)
func init() {
withBlock(erigon23Cmd)
withDataDir(erigon23Cmd)
withChain(erigon23Cmd)
rootCmd.AddCommand(erigon23Cmd)
}
var erigon23Cmd = &cobra.Command{
Use: "erigon23",
Short: "Exerimental command to re-execute blocks from beginning using erigon2 state representation and histoty (ugrade 3)",
RunE: func(cmd *cobra.Command, args []string) error {
logger := log.New()
return Erigon23(genesis, chainConfig, logger)
},
}
func Erigon23(genesis *core.Genesis, chainConfig *params.ChainConfig, logger log.Logger) error {
sigs := make(chan os.Signal, 1)
interruptCh := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
interruptCh <- true
}()
historyDb, err := kv2.NewMDBX(logger).Path(path.Join(datadir, "chaindata")).Open()
if err != nil {
return fmt.Errorf("opening chaindata as read only: %v", err)
}
defer historyDb.Close()
ctx := context.Background()
historyTx, err1 := historyDb.BeginRo(ctx)
if err1 != nil {
return err1
}
defer historyTx.Rollback()
stateDbPath := path.Join(datadir, "db23")
if _, err = os.Stat(stateDbPath); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}
} else if err = os.RemoveAll(stateDbPath); err != nil {
return err
}
db, err2 := kv2.NewMDBX(logger).Path(stateDbPath).WriteMap().Open()
if err2 != nil {
return err2
}
defer db.Close()
aggPath := filepath.Join(datadir, "erigon23")
var rwTx kv.RwTx
defer func() {
if rwTx != nil {
rwTx.Rollback()
}
}()
if rwTx, err = db.BeginRw(ctx); err != nil {
return err
}
agg, err3 := libstate.NewAggregator(aggPath, AggregationStep)
if err3 != nil {
return fmt.Errorf("create aggregator: %w", err3)
}
defer agg.Close()
startTxNum := agg.EndTxNumMinimax()
fmt.Printf("Max txNum in files: %d\n", startTxNum)
interrupt := false
if startTxNum == 0 {
_, genesisIbs, err := genesis.ToBlock()
if err != nil {
return err
}
agg.SetTx(rwTx)
agg.SetTxNum(0)
if err = genesisIbs.CommitBlock(&params.Rules{}, &WriterWrapper23{w: agg}); err != nil {
return fmt.Errorf("cannot write state: %w", err)
}
if err = agg.FinishTx(); err != nil {
return err
}
}
logger.Info("Initialised chain configuration", "config", chainConfig)
var (
blockNum uint64
trace bool
vmConfig vm.Config
txNum uint64 = 2 // Consider that each block contains at least first system tx and enclosing transactions, except for Clique consensus engine
)
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
statx := &stat23{
prevBlock: blockNum,
prevTime: time.Now(),
}
go func() {
for range logEvery.C {
aStats := agg.Stats()
statx.delta(aStats, blockNum).print(aStats, logger)
}
}()
var blockReader services.FullBlockReader
var allSnapshots *snapshotsync.RoSnapshots
allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots"))
defer allSnapshots.Close()
if err := allSnapshots.ReopenWithDB(db); err != nil {
return fmt.Errorf("reopen snapshot segments: %w", err)
}
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
engine := initConsensusEngine(chainConfig, logger, allSnapshots)
getHeader := func(hash common.Hash, number uint64) *types.Header {
h, err := blockReader.Header(ctx, historyTx, hash, number)
if err != nil {
panic(err)
}
return h
}
readWrapper := &ReaderWrapper23{ac: agg.MakeContext(), roTx: rwTx}
writeWrapper := &WriterWrapper23{w: agg}
for !interrupt {
blockNum++
trace = traceBlock > 0 && blockNum == uint64(traceBlock)
blockHash, err := blockReader.CanonicalHash(ctx, historyTx, blockNum)
if err != nil {
return err
}
b, _, err := blockReader.BlockWithSenders(ctx, historyTx, blockHash, blockNum)
if err != nil {
return err
}
if b == nil {
log.Info("history: block is nil", "block", blockNum)
break
}
agg.SetTx(rwTx)
agg.SetTxNum(txNum)
if txNum, _, err = processBlock23(startTxNum, trace, txNum, readWrapper, writeWrapper, chainConfig, engine, getHeader, b, vmConfig); err != nil {
return fmt.Errorf("processing block %d: %w", blockNum, err)
}
// Check for interrupts
select {
case interrupt = <-interruptCh:
log.Info(fmt.Sprintf("interrupted, please wait for cleanup, next time start with --block %d", blockNum))
default:
}
// Commit transaction only when interrupted or just before computing commitment (so it can be re-done)
commit := interrupt
if !commit && (blockNum+1)%uint64(commitmentFrequency) == 0 {
var spaceDirty uint64
if spaceDirty, _, err = rwTx.(*mdbx.MdbxTx).SpaceDirty(); err != nil {
return fmt.Errorf("retrieving spaceDirty: %w", err)
}
if spaceDirty >= dirtySpaceThreshold {
log.Info("Initiated tx commit", "block", blockNum, "space dirty", libcommon.ByteCount(spaceDirty))
commit = true
}
}
if commit {
if err = rwTx.Commit(); err != nil {
return err
}
if !interrupt {
if rwTx, err = db.BeginRw(ctx); err != nil {
return err
}
}
agg.SetTx(rwTx)
readWrapper.roTx = rwTx
}
}
return nil
}
type stat23 struct {
blockNum uint64
hits uint64
misses uint64
prevBlock uint64
prevMisses uint64
prevHits uint64
hitMissRatio float64
speed float64
prevTime time.Time
mem runtime.MemStats
}
func (s *stat23) print(aStats libstate.FilesStats, logger log.Logger) {
totalFiles := 0
totalDatSize := 0
totalIdxSize := 0
logger.Info("Progress", "block", s.blockNum, "blk/s", s.speed, "state files", totalFiles,
"total dat", libcommon.ByteCount(uint64(totalDatSize)), "total idx", libcommon.ByteCount(uint64(totalIdxSize)),
"hit ratio", s.hitMissRatio, "hits+misses", s.hits+s.misses,
"alloc", libcommon.ByteCount(s.mem.Alloc), "sys", libcommon.ByteCount(s.mem.Sys),
)
}
func (s *stat23) delta(aStats libstate.FilesStats, blockNum uint64) *stat23 {
currentTime := time.Now()
libcommon.ReadMemStats(&s.mem)
interval := currentTime.Sub(s.prevTime).Seconds()
s.blockNum = blockNum
s.speed = float64(s.blockNum-s.prevBlock) / interval
s.prevBlock = blockNum
s.prevTime = currentTime
total := s.hits + s.misses
if total > 0 {
s.hitMissRatio = float64(s.hits) / float64(total)
}
return s
}
func processBlock23(startTxNum uint64, trace bool, txNumStart uint64, rw *ReaderWrapper23, ww *WriterWrapper23, chainConfig *params.ChainConfig,
engine consensus.Engine, getHeader func(hash common.Hash, number uint64) *types.Header, block *types.Block, vmConfig vm.Config,
) (uint64, types.Receipts, error) {
defer blockExecutionTimer.UpdateDuration(time.Now())
header := block.Header()
vmConfig.Debug = true
gp := new(core.GasPool).AddGas(block.GasLimit())
usedGas := new(uint64)
var receipts types.Receipts
rules := chainConfig.Rules(block.NumberU64())
txNum := txNumStart
ww.w.SetTxNum(txNum)
rw.blockNum = block.NumberU64()
ww.blockNum = block.NumberU64()
daoFork := txNum >= startTxNum && chainConfig.DAOForkSupport && chainConfig.DAOForkBlock != nil && chainConfig.DAOForkBlock.Cmp(block.Number()) == 0
if daoFork {
ibs := state.New(rw)
// TODO Actually add tracing to the DAO related accounts
misc.ApplyDAOHardFork(ibs)
if err := ibs.FinalizeTx(rules, ww); err != nil {
return 0, nil, err
}
if err := ww.w.FinishTx(); err != nil {
return 0, nil, fmt.Errorf("finish daoFork failed: %w", err)
}
}
txNum++ // Pre-block transaction
ww.w.SetTxNum(txNum)
getHashFn := core.GetHashFn(header, getHeader)
for i, tx := range block.Transactions() {
if txNum >= startTxNum {
ibs := state.New(rw)
ibs.Prepare(tx.Hash(), block.Hash(), i)
ct := NewCallTracer()
vmConfig.Tracer = ct
receipt, _, err := core.ApplyTransaction(chainConfig, getHashFn, engine, nil, gp, ibs, ww, header, tx, usedGas, vmConfig, nil)
if err != nil {
return 0, nil, fmt.Errorf("could not apply tx %d [%x] failed: %w", i, tx.Hash(), err)
}
for from := range ct.froms {
if err := ww.w.AddTraceFrom(from[:]); err != nil {
return 0, nil, err
}
}
for to := range ct.tos {
if err := ww.w.AddTraceTo(to[:]); err != nil {
return 0, nil, err
}
}
receipts = append(receipts, receipt)
for _, log := range receipt.Logs {
if err = ww.w.AddLogAddr(log.Address[:]); err != nil {
return 0, nil, fmt.Errorf("adding event log for addr %x: %w", log.Address, err)
}
for _, topic := range log.Topics {
if err = ww.w.AddLogTopic(topic[:]); err != nil {
return 0, nil, fmt.Errorf("adding event log for topic %x: %w", topic, err)
}
}
}
if err = ww.w.FinishTx(); err != nil {
return 0, nil, fmt.Errorf("finish tx %d [%x] failed: %w", i, tx.Hash(), err)
}
if trace {
fmt.Printf("FinishTx called for blockNum=%d, txIndex=%d, txNum=%d txHash=[%x]\n", block.NumberU64(), i, txNum, tx.Hash())
}
}
txNum++
ww.w.SetTxNum(txNum)
}
if txNum >= startTxNum && chainConfig.IsByzantium(block.NumberU64()) {
receiptSha := types.DeriveSha(receipts)
if receiptSha != block.ReceiptHash() {
fmt.Printf("mismatched receipt headers for block %d\n", block.NumberU64())
for j, receipt := range receipts {
fmt.Printf("tx %d, used gas: %d\n", j, receipt.GasUsed)
}
}
}
if txNum >= startTxNum {
ibs := state.New(rw)
if err := ww.w.AddTraceTo(block.Coinbase().Bytes()); err != nil {
return 0, nil, fmt.Errorf("adding coinbase trace: %w", err)
}
for _, uncle := range block.Uncles() {
if err := ww.w.AddTraceTo(uncle.Coinbase.Bytes()); err != nil {
return 0, nil, fmt.Errorf("adding uncle trace: %w", err)
}
}
// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
if _, _, err := engine.Finalize(chainConfig, header, ibs, block.Transactions(), block.Uncles(), receipts, nil, nil, nil); err != nil {
return 0, nil, fmt.Errorf("finalize of block %d failed: %w", block.NumberU64(), err)
}
if err := ibs.CommitBlock(rules, ww); err != nil {
return 0, nil, fmt.Errorf("committing block %d failed: %w", block.NumberU64(), err)
}
if err := ww.w.FinishTx(); err != nil {
return 0, nil, fmt.Errorf("failed to finish tx: %w", err)
}
if trace {
fmt.Printf("FinishTx called for %d block %d\n", txNum, block.NumberU64())
}
}
txNum++ // Post-block transaction
ww.w.SetTxNum(txNum)
return txNum, receipts, nil
}
// Implements StateReader and StateWriter
type ReaderWrapper23 struct {
roTx kv.Tx
ac *libstate.AggregatorContext
blockNum uint64
}
type WriterWrapper23 struct {
blockNum uint64
w *libstate.Aggregator
}
func (rw *ReaderWrapper23) ReadAccountData(address common.Address) (*accounts.Account, error) {
enc, err := rw.ac.ReadAccountData(address.Bytes(), rw.roTx)
if err != nil {
return nil, err
}
if len(enc) == 0 {
return nil, nil
}
var a accounts.Account
a.Reset()
pos := 0
nonceBytes := int(enc[pos])
pos++
if nonceBytes > 0 {
a.Nonce = bytesToUint64(enc[pos : pos+nonceBytes])
pos += nonceBytes
}
balanceBytes := int(enc[pos])
pos++
if balanceBytes > 0 {
a.Balance.SetBytes(enc[pos : pos+balanceBytes])
pos += balanceBytes
}
codeHashBytes := int(enc[pos])
pos++
if codeHashBytes > 0 {
copy(a.CodeHash[:], enc[pos:pos+codeHashBytes])
pos += codeHashBytes
}
incBytes := int(enc[pos])
pos++
if incBytes > 0 {
a.Incarnation = bytesToUint64(enc[pos : pos+incBytes])
}
return &a, nil
}
func (rw *ReaderWrapper23) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) {
enc, err := rw.ac.ReadAccountStorage(address.Bytes(), key.Bytes(), rw.roTx)
if err != nil {
return nil, err
}
if enc == nil {
return nil, nil
}
if len(enc) == 1 && enc[0] == 0 {
return nil, nil
}
return enc, nil
}
func (rw *ReaderWrapper23) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) {
return rw.ac.ReadAccountCode(address.Bytes(), rw.roTx)
}
func (rw *ReaderWrapper23) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) {
return rw.ac.ReadAccountCodeSize(address.Bytes(), rw.roTx)
}
func (rw *ReaderWrapper23) ReadAccountIncarnation(address common.Address) (uint64, error) {
return 0, nil
}
func (ww *WriterWrapper23) UpdateAccountData(address common.Address, original, account *accounts.Account) error {
var l int
l++
if account.Nonce > 0 {
l += (bits.Len64(account.Nonce) + 7) / 8
}
l++
if !account.Balance.IsZero() {
l += account.Balance.ByteLen()
}
l++
if !account.IsEmptyCodeHash() {
l += 32
}
l++
if account.Incarnation > 0 {
l += (bits.Len64(account.Incarnation) + 7) / 8
}
value := make([]byte, l)
pos := 0
if account.Nonce == 0 {
value[pos] = 0
pos++
} else {
nonceBytes := (bits.Len64(account.Nonce) + 7) / 8
value[pos] = byte(nonceBytes)
var nonce = account.Nonce
for i := nonceBytes; i > 0; i-- {
value[pos+i] = byte(nonce)
nonce >>= 8
}
pos += nonceBytes + 1
}
if account.Balance.IsZero() {
value[pos] = 0
pos++
} else {
balanceBytes := account.Balance.ByteLen()
value[pos] = byte(balanceBytes)
pos++
account.Balance.WriteToSlice(value[pos : pos+balanceBytes])
pos += balanceBytes
}
if account.IsEmptyCodeHash() {
value[pos] = 0
pos++
} else {
value[pos] = 32
pos++
copy(value[pos:pos+32], account.CodeHash[:])
pos += 32
}
if account.Incarnation == 0 {
value[pos] = 0
} else {
incBytes := (bits.Len64(account.Incarnation) + 7) / 8
value[pos] = byte(incBytes)
var inc = account.Incarnation
for i := incBytes; i > 0; i-- {
value[pos+i] = byte(inc)
inc >>= 8
}
}
if err := ww.w.UpdateAccountData(address.Bytes(), value); err != nil {
return err
}
return nil
}
func (ww *WriterWrapper23) UpdateAccountCode(address common.Address, incarnation uint64, codeHash common.Hash, code []byte) error {
if err := ww.w.UpdateAccountCode(address.Bytes(), code); err != nil {
return err
}
return nil
}
func (ww *WriterWrapper23) DeleteAccount(address common.Address, original *accounts.Account) error {
if err := ww.w.DeleteAccount(address.Bytes()); err != nil {
return err
}
return nil
}
func (ww *WriterWrapper23) WriteAccountStorage(address common.Address, incarnation uint64, key *common.Hash, original, value *uint256.Int) error {
if err := ww.w.WriteAccountStorage(address.Bytes(), key.Bytes(), value.Bytes()); err != nil {
return err
}
return nil
}
func (ww *WriterWrapper23) CreateContract(address common.Address) error {
return nil
}

View File

@ -103,8 +103,8 @@ func ReplayTx(genesis *core.Genesis) error {
txNum = txnum
}
fmt.Printf("txNum = %d\n", txNum)
aggPath := filepath.Join(datadir, "erigon23")
agg, err := libstate.NewAggregator(aggPath, AggregationStep)
aggPath := filepath.Join(datadir, "agg22")
agg, err := libstate.NewAggregator22(aggPath, AggregationStep)
if err != nil {
return fmt.Errorf("create history: %w", err)
}
@ -119,7 +119,7 @@ func ReplayTx(genesis *core.Genesis) error {
}
func replayTxNum(ctx context.Context, allSnapshots *snapshotsync.RoSnapshots, blockReader services.FullBlockReader,
txNum uint64, txNums []uint64, rs *state.ReconState, ac *libstate.AggregatorContext,
txNum uint64, txNums []uint64, rs *state.ReconState, ac *libstate.Aggregator22Context,
) error {
bn := uint64(sort.Search(len(txNums), func(i int) bool {
return txNums[i] > txNum

View File

@ -72,7 +72,7 @@ type ReconWorker struct {
}
func NewReconWorker(lock sync.Locker, wg *sync.WaitGroup, rs *state.ReconState,
a *libstate.Aggregator, blockReader services.FullBlockReader, allSnapshots *snapshotsync.RoSnapshots,
a *libstate.Aggregator22, blockReader services.FullBlockReader, allSnapshots *snapshotsync.RoSnapshots,
chainConfig *params.ChainConfig, logger log.Logger, genesis *core.Genesis,
) *ReconWorker {
ac := a.MakeContext()
@ -122,7 +122,7 @@ func (rw *ReconWorker) runTxTask(txTask state.TxTask) {
daoForkTx := rw.chainConfig.DAOForkSupport && rw.chainConfig.DAOForkBlock != nil && rw.chainConfig.DAOForkBlock.Uint64() == txTask.BlockNum && txTask.TxIndex == -1
var err error
if txTask.BlockNum == 0 && txTask.TxIndex == -1 {
//fmt.Printf("txNum=%d, blockNum=%d, Genesis\n", txNum, blockNum)
//fmt.Printf("txNum=%d, blockNum=%d, Genesis\n", txTask.TxNum, txTask.BlockNum)
// Genesis block
_, ibs, err = rw.genesis.ToBlock()
if err != nil {
@ -180,7 +180,7 @@ func (rw *ReconWorker) runTxTask(txTask state.TxTask) {
type FillWorker struct {
txNum uint64
doneCount *uint64
ac *libstate.AggregatorContext
ac *libstate.Aggregator22Context
fromKey, toKey []byte
currentKey []byte
bitmap roaring64.Bitmap
@ -188,7 +188,7 @@ type FillWorker struct {
progress uint64
}
func NewFillWorker(txNum uint64, doneCount *uint64, a *libstate.Aggregator, fromKey, toKey []byte) *FillWorker {
func NewFillWorker(txNum uint64, doneCount *uint64, a *libstate.Aggregator22, fromKey, toKey []byte) *FillWorker {
fw := &FillWorker{
txNum: txNum,
doneCount: doneCount,
@ -355,8 +355,8 @@ func Recon(genesis *core.Genesis, logger log.Logger) error {
interruptCh <- true
}()
ctx := context.Background()
aggPath := filepath.Join(datadir, "erigon23")
agg, err := libstate.NewAggregator(aggPath, AggregationStep)
aggPath := filepath.Join(datadir, "agg22")
agg, err := libstate.NewAggregator22(aggPath, AggregationStep)
if err != nil {
return fmt.Errorf("create history: %w", err)
}

View File

@ -1,546 +0,0 @@
package commands
import (
"container/heap"
"context"
"errors"
"fmt"
"os"
"os/signal"
"path"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"syscall"
"time"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/consensus/misc"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/state"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/core/vm"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
"golang.org/x/sync/semaphore"
)
func init() {
withBlock(recon1Cmd)
withDataDir(recon1Cmd)
rootCmd.AddCommand(recon1Cmd)
}
var recon1Cmd = &cobra.Command{
Use: "recon1",
Short: "Exerimental command to reconstitute the state at given block",
RunE: func(cmd *cobra.Command, args []string) error {
logger := log.New()
return Recon1(genesis, logger)
},
}
type ReconWorker1 struct {
lock sync.Locker
wg *sync.WaitGroup
rs *state.ReconState1
blockReader services.FullBlockReader
allSnapshots *snapshotsync.RoSnapshots
stateWriter *state.StateReconWriter1
stateReader *state.StateReconReader1
getHeader func(hash common.Hash, number uint64) *types.Header
ctx context.Context
engine consensus.Engine
txNums []uint64
chainConfig *params.ChainConfig
logger log.Logger
genesis *core.Genesis
resultCh chan state.TxTask
}
func NewReconWorker1(lock sync.Locker, wg *sync.WaitGroup, rs *state.ReconState1,
blockReader services.FullBlockReader, allSnapshots *snapshotsync.RoSnapshots,
txNums []uint64, chainConfig *params.ChainConfig, logger log.Logger, genesis *core.Genesis,
resultCh chan state.TxTask,
) *ReconWorker1 {
return &ReconWorker1{
lock: lock,
wg: wg,
rs: rs,
blockReader: blockReader,
allSnapshots: allSnapshots,
ctx: context.Background(),
stateWriter: state.NewStateReconWriter1(rs),
stateReader: state.NewStateReconReader1(rs),
txNums: txNums,
chainConfig: chainConfig,
logger: logger,
genesis: genesis,
resultCh: resultCh,
}
}
func (rw *ReconWorker1) SetTx(tx kv.Tx) {
rw.stateReader.SetTx(tx)
}
func (rw *ReconWorker1) run() {
defer rw.wg.Done()
rw.getHeader = func(hash common.Hash, number uint64) *types.Header {
h, err := rw.blockReader.Header(rw.ctx, nil, hash, number)
if err != nil {
panic(err)
}
return h
}
rw.engine = initConsensusEngine(rw.chainConfig, rw.logger, rw.allSnapshots)
for txTask, ok := rw.rs.Schedule(); ok; txTask, ok = rw.rs.Schedule() {
rw.runTxTask(&txTask)
rw.resultCh <- txTask // Needs to have outside of the lock
}
}
func (rw *ReconWorker1) runTxTask(txTask *state.TxTask) {
rw.lock.Lock()
defer rw.lock.Unlock()
txTask.Error = nil
rw.stateReader.SetTxNum(txTask.TxNum)
rw.stateWriter.SetTxNum(txTask.TxNum)
rw.stateReader.ResetReadSet()
rw.stateWriter.ResetWriteSet()
ibs := state.New(rw.stateReader)
daoForkTx := rw.chainConfig.DAOForkSupport && rw.chainConfig.DAOForkBlock != nil && rw.chainConfig.DAOForkBlock.Uint64() == txTask.BlockNum && txTask.TxIndex == -1
var err error
if txTask.BlockNum == 0 && txTask.TxIndex == -1 {
//fmt.Printf("txNum=%d, blockNum=%d, Genesis\n", txTask.TxNum, txTask.BlockNum)
// Genesis block
_, ibs, err = rw.genesis.ToBlock()
if err != nil {
panic(err)
}
} else if daoForkTx {
//fmt.Printf("txNum=%d, blockNum=%d, DAO fork\n", txTask.TxNum, txTask.BlockNum)
misc.ApplyDAOHardFork(ibs)
ibs.SoftFinalise()
} else if txTask.TxIndex == -1 {
// Block initialisation
} else if txTask.Final {
if txTask.BlockNum > 0 {
//fmt.Printf("txNum=%d, blockNum=%d, finalisation of the block\n", txTask.TxNum, txTask.BlockNum)
// End of block transaction in a block
if _, _, err := rw.engine.Finalize(rw.chainConfig, txTask.Header, ibs, txTask.Block.Transactions(), txTask.Block.Uncles(), nil /* receipts */, nil, nil, nil); err != nil {
panic(fmt.Errorf("finalize of block %d failed: %w", txTask.BlockNum, err))
}
}
} else {
//fmt.Printf("txNum=%d, blockNum=%d, txIndex=%d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex)
txHash := txTask.Tx.Hash()
gp := new(core.GasPool).AddGas(txTask.Tx.GetGas())
vmConfig := vm.Config{NoReceipts: true, SkipAnalysis: core.SkipAnalysis(rw.chainConfig, txTask.BlockNum)}
contractHasTEVM := func(contractHash common.Hash) (bool, error) { return false, nil }
ibs.Prepare(txHash, txTask.BlockHash, txTask.TxIndex)
getHashFn := core.GetHashFn(txTask.Header, rw.getHeader)
blockContext := core.NewEVMBlockContext(txTask.Header, getHashFn, rw.engine, nil /* author */, contractHasTEVM)
msg, err := txTask.Tx.AsMessage(*types.MakeSigner(rw.chainConfig, txTask.BlockNum), txTask.Header.BaseFee, txTask.Rules)
if err != nil {
panic(err)
}
txContext := core.NewEVMTxContext(msg)
vmenv := vm.NewEVM(blockContext, txContext, ibs, rw.chainConfig, vmConfig)
if _, err = core.ApplyMessage(vmenv, msg, gp, true /* refunds */, false /* gasBailout */); err != nil {
txTask.Error = err
//fmt.Printf("error=%v\n", err)
}
// Update the state with pending changes
ibs.SoftFinalise()
}
// Prepare read set, write set and balanceIncrease set and send for serialisation
if txTask.Error == nil {
txTask.BalanceIncreaseSet = ibs.BalanceIncreaseSet()
//for addr, bal := range txTask.BalanceIncreaseSet {
// fmt.Printf("[%x]=>[%d]\n", addr, &bal)
//}
if err = ibs.MakeWriteSet(txTask.Rules, rw.stateWriter); err != nil {
panic(err)
}
txTask.ReadLists = rw.stateReader.ReadSet()
txTask.WriteLists = rw.stateWriter.WriteSet()
size := (20 + 32) * len(txTask.BalanceIncreaseSet)
for _, list := range txTask.ReadLists {
for _, b := range list.Keys {
size += len(b)
}
for _, b := range list.Vals {
size += len(b)
}
}
for _, list := range txTask.WriteLists {
for _, b := range list.Keys {
size += len(b)
}
for _, b := range list.Vals {
size += len(b)
}
}
txTask.ResultsSize = int64(size)
}
}
func processResultQueue(rws *state.TxTaskQueue, outputTxNum *uint64, rs *state.ReconState1, applyTx kv.Tx,
triggerCount *uint64, outputBlockNum *uint64, repeatCount *uint64, resultsSize *int64) {
for rws.Len() > 0 && (*rws)[0].TxNum == *outputTxNum {
txTask := heap.Pop(rws).(state.TxTask)
atomic.AddInt64(resultsSize, -txTask.ResultsSize)
if txTask.Error == nil && rs.ReadsValid(txTask.ReadLists) {
if err := rs.Apply(txTask.Rules.IsSpuriousDragon, applyTx, txTask); err != nil {
panic(err)
}
*triggerCount += rs.CommitTxNum(txTask.Sender, txTask.TxNum)
*outputTxNum++
*outputBlockNum = txTask.BlockNum
//fmt.Printf("Applied %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex)
} else {
rs.AddWork(txTask)
*repeatCount++
//fmt.Printf("Rolled back %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex)
}
}
}
func Recon1(genesis *core.Genesis, logger log.Logger) error {
sigs := make(chan os.Signal, 1)
interruptCh := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
interruptCh <- true
}()
ctx := context.Background()
reconDbPath := path.Join(datadir, "recon1db")
var err error
if _, err = os.Stat(reconDbPath); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}
} else if err = os.RemoveAll(reconDbPath); err != nil {
return err
}
limiter := semaphore.NewWeighted(int64(runtime.NumCPU() + 1))
db, err := kv2.NewMDBX(logger).Path(reconDbPath).RoTxsLimiter(limiter).Open()
if err != nil {
return err
}
startTime := time.Now()
var blockReader services.FullBlockReader
var allSnapshots *snapshotsync.RoSnapshots
allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots"))
defer allSnapshots.Close()
if err := allSnapshots.ReopenWithDB(db); err != nil {
return fmt.Errorf("reopen snapshot segments: %w", err)
}
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
// Compute mapping blockNum -> last TxNum in that block
txNums := make([]uint64, allSnapshots.BlocksAvailable()+1)
if err = allSnapshots.Bodies.View(func(bs []*snapshotsync.BodySegment) error {
for _, b := range bs {
if err = b.Iterate(func(blockNum, baseTxNum, txAmount uint64) {
txNums[blockNum] = baseTxNum + txAmount
}); err != nil {
return err
}
}
return nil
}); err != nil {
return fmt.Errorf("build txNum => blockNum mapping: %w", err)
}
blockNum := block + 1
txNum := txNums[blockNum-1]
fmt.Printf("Corresponding block num = %d, txNum = %d\n", blockNum, txNum)
workerCount := runtime.NumCPU()
workCh := make(chan state.TxTask, 128)
rs := state.NewReconState1()
var lock sync.RWMutex
reconWorkers := make([]*ReconWorker1, workerCount)
var applyTx kv.Tx
defer func() {
if applyTx != nil {
applyTx.Rollback()
}
}()
if applyTx, err = db.BeginRo(ctx); err != nil {
return err
}
roTxs := make([]kv.Tx, workerCount)
defer func() {
for i := 0; i < workerCount; i++ {
if roTxs[i] != nil {
roTxs[i].Rollback()
}
}
}()
for i := 0; i < workerCount; i++ {
roTxs[i], err = db.BeginRo(ctx)
if err != nil {
return err
}
}
var wg sync.WaitGroup
resultCh := make(chan state.TxTask, 128)
for i := 0; i < workerCount; i++ {
reconWorkers[i] = NewReconWorker1(lock.RLocker(), &wg, rs, blockReader, allSnapshots, txNums, chainConfig, logger, genesis, resultCh)
reconWorkers[i].SetTx(roTxs[i])
}
wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go reconWorkers[i].run()
}
commitThreshold := uint64(1024 * 1024 * 1024)
resultsThreshold := int64(1024 * 1024 * 1024)
count := uint64(0)
repeatCount := uint64(0)
triggerCount := uint64(0)
total := txNum
prevCount := uint64(0)
prevRepeatCount := uint64(0)
//prevTriggerCount := uint64(0)
resultsSize := int64(0)
prevTime := time.Now()
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
var rws state.TxTaskQueue
var rwsLock sync.Mutex
rwsReceiveCond := sync.NewCond(&rwsLock)
heap.Init(&rws)
var outputTxNum uint64
var inputBlockNum, outputBlockNum uint64
var prevOutputBlockNum uint64
// Go-routine gathering results from the workers
go func() {
defer rs.Finish()
for outputTxNum < txNum {
select {
case txTask := <-resultCh:
//fmt.Printf("Saved %d block %d txIndex %d\n", txTask.TxNum, txTask.BlockNum, txTask.TxIndex)
func() {
rwsLock.Lock()
defer rwsLock.Unlock()
atomic.AddInt64(&resultsSize, txTask.ResultsSize)
heap.Push(&rws, txTask)
processResultQueue(&rws, &outputTxNum, rs, applyTx, &triggerCount, &outputBlockNum, &repeatCount, &resultsSize)
rwsReceiveCond.Signal()
}()
case <-logEvery.C:
var m runtime.MemStats
libcommon.ReadMemStats(&m)
sizeEstimate := rs.SizeEstimate()
count = rs.DoneCount()
currentTime := time.Now()
interval := currentTime.Sub(prevTime)
speedTx := float64(count-prevCount) / (float64(interval) / float64(time.Second))
speedBlock := float64(outputBlockNum-prevOutputBlockNum) / (float64(interval) / float64(time.Second))
progress := 100.0 * float64(count) / float64(total)
var repeatRatio float64
if count > prevCount {
repeatRatio = 100.0 * float64(repeatCount-prevRepeatCount) / float64(count-prevCount)
}
log.Info("Transaction replay",
//"workers", workerCount,
"at block", outputBlockNum,
"input block", atomic.LoadUint64(&inputBlockNum),
"progress", fmt.Sprintf("%.2f%%", progress),
"blk/s", fmt.Sprintf("%.1f", speedBlock),
"tx/s", fmt.Sprintf("%.1f", speedTx),
//"repeats", repeatCount-prevRepeatCount,
//"triggered", triggerCount-prevTriggerCount,
"result queue", rws.Len(),
"results size", libcommon.ByteCount(uint64(atomic.LoadInt64(&resultsSize))),
"repeat ratio", fmt.Sprintf("%.2f%%", repeatRatio),
"buffer", libcommon.ByteCount(sizeEstimate),
"alloc", libcommon.ByteCount(m.Alloc), "sys", libcommon.ByteCount(m.Sys),
)
prevTime = currentTime
prevCount = count
prevOutputBlockNum = outputBlockNum
prevRepeatCount = repeatCount
//prevTriggerCount = triggerCount
if sizeEstimate >= commitThreshold {
commitStart := time.Now()
log.Info("Committing...")
err := func() error {
rwsLock.Lock()
defer rwsLock.Unlock()
// Drain results (and process) channel because read sets do not carry over
for {
var drained bool
for !drained {
select {
case txTask := <-resultCh:
atomic.AddInt64(&resultsSize, txTask.ResultsSize)
heap.Push(&rws, txTask)
default:
drained = true
}
}
processResultQueue(&rws, &outputTxNum, rs, applyTx, &triggerCount, &outputBlockNum, &repeatCount, &resultsSize)
if rws.Len() == 0 {
break
}
}
rwsReceiveCond.Signal()
lock.Lock() // This is to prevent workers from starting work on any new txTask
defer lock.Unlock()
// Drain results channel because read sets do not carry over
var drained bool
for !drained {
select {
case txTask := <-resultCh:
rs.AddWork(txTask)
default:
drained = true
}
}
// Drain results queue as well
for rws.Len() > 0 {
txTask := heap.Pop(&rws).(state.TxTask)
atomic.AddInt64(&resultsSize, -txTask.ResultsSize)
rs.AddWork(txTask)
}
applyTx.Rollback()
for i := 0; i < workerCount; i++ {
roTxs[i].Rollback()
}
rwTx, err := db.BeginRw(ctx)
if err != nil {
return err
}
if err = rs.Flush(rwTx); err != nil {
return err
}
if err = rwTx.Commit(); err != nil {
return err
}
if applyTx, err = db.BeginRo(ctx); err != nil {
return err
}
for i := 0; i < workerCount; i++ {
if roTxs[i], err = db.BeginRo(ctx); err != nil {
return err
}
reconWorkers[i].SetTx(roTxs[i])
}
return nil
}()
if err != nil {
panic(err)
}
log.Info("Committed", "time", time.Since(commitStart))
}
}
}
}()
var inputTxNum uint64
var header *types.Header
for blockNum := uint64(0); blockNum <= block; blockNum++ {
atomic.StoreUint64(&inputBlockNum, blockNum)
rules := chainConfig.Rules(blockNum)
if header, err = blockReader.HeaderByNumber(ctx, nil, blockNum); err != nil {
return err
}
blockHash := header.Hash()
b, _, err := blockReader.BlockWithSenders(ctx, nil, blockHash, blockNum)
if err != nil {
return err
}
txs := b.Transactions()
for txIndex := -1; txIndex <= len(txs); txIndex++ {
// Do not oversend, wait for the result heap to go under certain size
func() {
rwsLock.Lock()
defer rwsLock.Unlock()
for rws.Len() > 128 || atomic.LoadInt64(&resultsSize) >= resultsThreshold || rs.SizeEstimate() >= commitThreshold {
rwsReceiveCond.Wait()
}
}()
txTask := state.TxTask{
Header: header,
BlockNum: blockNum,
Rules: rules,
Block: b,
TxNum: inputTxNum,
TxIndex: txIndex,
BlockHash: blockHash,
Final: txIndex == len(txs),
}
if txIndex >= 0 && txIndex < len(txs) {
txTask.Tx = txs[txIndex]
if sender, ok := txs[txIndex].GetSender(); ok {
txTask.Sender = &sender
}
if ok := rs.RegisterSender(txTask); ok {
rs.AddWork(txTask)
}
} else {
rs.AddWork(txTask)
}
inputTxNum++
}
}
close(workCh)
wg.Wait()
applyTx.Rollback()
for i := 0; i < workerCount; i++ {
roTxs[i].Rollback()
}
rwTx, err := db.BeginRw(ctx)
if err != nil {
return err
}
defer func() {
if rwTx != nil {
rwTx.Rollback()
}
}()
if err = rs.Flush(rwTx); err != nil {
return err
}
if err = rwTx.Commit(); err != nil {
return err
}
if rwTx, err = db.BeginRw(ctx); err != nil {
return err
}
log.Info("Transaction replay complete", "duration", time.Since(startTime))
log.Info("Computing hashed state")
tmpDir := filepath.Join(datadir, "tmp")
if err = stagedsync.PromoteHashedStateCleanly("recon", rwTx, stagedsync.StageHashStateCfg(db, tmpDir), ctx); err != nil {
return err
}
if err = rwTx.Commit(); err != nil {
return err
}
if rwTx, err = db.BeginRw(ctx); err != nil {
return err
}
var rootHash common.Hash
if rootHash, err = stagedsync.RegenerateIntermediateHashes("recon", rwTx, stagedsync.StageTrieCfg(db, false /* checkRoot */, false /* saveHashesToDB */, false /* badBlockHalt */, tmpDir, blockReader, nil /* HeaderDownload */), common.Hash{}, make(chan struct{}, 1)); err != nil {
return err
}
if err = rwTx.Commit(); err != nil {
return err
}
if rootHash != header.Root {
log.Error("Incorrect root hash", "expected", fmt.Sprintf("%x", header.Root))
}
return nil
}

View File

@ -19,7 +19,7 @@ func (r *RequiredStateError) Error() string {
}
type HistoryReaderNoState struct {
ac *libstate.AggregatorContext
ac *libstate.Aggregator22Context
tx kv.Tx
txNum uint64
trace bool
@ -29,7 +29,7 @@ type HistoryReaderNoState struct {
composite []byte
}
func NewHistoryReaderNoState(ac *libstate.AggregatorContext, rs *ReconState) *HistoryReaderNoState {
func NewHistoryReaderNoState(ac *libstate.Aggregator22Context, rs *ReconState) *HistoryReaderNoState {
return &HistoryReaderNoState{ac: ac, rs: rs}
}

View File

@ -5,6 +5,7 @@ import (
"container/heap"
"encoding/binary"
"fmt"
"math/bits"
"sort"
"sync"
"unsafe"
@ -13,6 +14,7 @@ import (
"github.com/holiman/uint256"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/types"
@ -37,6 +39,10 @@ type TxTask struct {
BalanceIncreaseSet map[common.Address]uint256.Int
ReadLists map[string]*KvList
WriteLists map[string]*KvList
AccountPrevs map[string][]byte
AccountDels map[string]*accounts.Account
StoragePrevs map[string][]byte
CodePrevs map[string][]byte
ResultsSize int64
Error error
}
@ -67,7 +73,7 @@ func (h *TxTaskQueue) Pop() interface{} {
const CodeSizeTable = "CodeSize"
type ReconState1 struct {
type State22 struct {
lock sync.RWMutex
receiveWork *sync.Cond
triggers map[uint64]TxTask
@ -75,65 +81,65 @@ type ReconState1 struct {
triggerLock sync.RWMutex
queue TxTaskQueue
queueLock sync.Mutex
changes map[string]*btree.BTreeG[ReconStateItem1]
changes map[string]*btree.BTreeG[StateItem]
sizeEstimate uint64
txsDone uint64
finished bool
}
type ReconStateItem1 struct {
type StateItem struct {
key []byte
val []byte
}
func reconStateItem1Less(i, j ReconStateItem1) bool {
func stateItemLess(i, j StateItem) bool {
return bytes.Compare(i.key, j.key) < 0
}
func NewReconState1() *ReconState1 {
rs := &ReconState1{
func NewState22() *State22 {
rs := &State22{
triggers: map[uint64]TxTask{},
senderTxNums: map[common.Address]uint64{},
changes: map[string]*btree.BTreeG[ReconStateItem1]{},
changes: map[string]*btree.BTreeG[StateItem]{},
}
rs.receiveWork = sync.NewCond(&rs.queueLock)
return rs
}
func (rs *ReconState1) put(table string, key, val []byte) {
func (rs *State22) put(table string, key, val []byte) {
t, ok := rs.changes[table]
if !ok {
t = btree.NewG[ReconStateItem1](32, reconStateItem1Less)
t = btree.NewG[StateItem](32, stateItemLess)
rs.changes[table] = t
}
item := ReconStateItem1{key: libcommon.Copy(key), val: libcommon.Copy(val)}
item := StateItem{key: key, val: val}
t.ReplaceOrInsert(item)
rs.sizeEstimate += uint64(unsafe.Sizeof(item)) + uint64(len(key)) + uint64(len(val))
}
func (rs *ReconState1) Get(table string, key []byte) []byte {
func (rs *State22) Get(table string, key []byte) []byte {
rs.lock.RLock()
defer rs.lock.RUnlock()
return rs.get(table, key)
}
func (rs *ReconState1) get(table string, key []byte) []byte {
func (rs *State22) get(table string, key []byte) []byte {
t, ok := rs.changes[table]
if !ok {
return nil
}
if i, ok := t.Get(ReconStateItem1{key: key}); ok {
if i, ok := t.Get(StateItem{key: key}); ok {
return i.val
}
return nil
}
func (rs *ReconState1) Flush(rwTx kv.RwTx) error {
func (rs *State22) Flush(rwTx kv.RwTx) error {
rs.lock.Lock()
defer rs.lock.Unlock()
for table, t := range rs.changes {
var err error
t.Ascend(func(item ReconStateItem1) bool {
t.Ascend(func(item StateItem) bool {
if len(item.val) == 0 {
if err = rwTx.Delete(table, item.key); err != nil {
return false
@ -156,7 +162,7 @@ func (rs *ReconState1) Flush(rwTx kv.RwTx) error {
return nil
}
func (rs *ReconState1) Schedule() (TxTask, bool) {
func (rs *State22) Schedule() (TxTask, bool) {
rs.queueLock.Lock()
defer rs.queueLock.Unlock()
for !rs.finished && rs.queue.Len() == 0 {
@ -168,7 +174,7 @@ func (rs *ReconState1) Schedule() (TxTask, bool) {
return TxTask{}, false
}
func (rs *ReconState1) RegisterSender(txTask TxTask) bool {
func (rs *State22) RegisterSender(txTask TxTask) bool {
rs.triggerLock.Lock()
defer rs.triggerLock.Unlock()
lastTxNum, deferral := rs.senderTxNums[*txTask.Sender]
@ -183,7 +189,7 @@ func (rs *ReconState1) RegisterSender(txTask TxTask) bool {
return !deferral
}
func (rs *ReconState1) CommitTxNum(sender *common.Address, txNum uint64) uint64 {
func (rs *State22) CommitTxNum(sender *common.Address, txNum uint64) uint64 {
rs.queueLock.Lock()
defer rs.queueLock.Unlock()
rs.triggerLock.Lock()
@ -205,7 +211,7 @@ func (rs *ReconState1) CommitTxNum(sender *common.Address, txNum uint64) uint64
return count
}
func (rs *ReconState1) AddWork(txTask TxTask) {
func (rs *State22) AddWork(txTask TxTask) {
txTask.BalanceIncreaseSet = nil
txTask.ReadLists = nil
txTask.WriteLists = nil
@ -216,72 +222,229 @@ func (rs *ReconState1) AddWork(txTask TxTask) {
rs.receiveWork.Signal()
}
func (rs *ReconState1) Finish() {
func (rs *State22) Finish() {
rs.queueLock.Lock()
defer rs.queueLock.Unlock()
rs.finished = true
rs.receiveWork.Broadcast()
}
func (rs *ReconState1) Apply(emptyRemoval bool, roTx kv.Tx, txTask TxTask) error {
rs.lock.Lock()
defer rs.lock.Unlock()
if txTask.WriteLists != nil {
for table, list := range txTask.WriteLists {
for i, key := range list.Keys {
val := list.Vals[i]
rs.put(table, key, val)
}
func serialise2(a *accounts.Account) []byte {
var l int
l++
if a.Nonce > 0 {
l += (bits.Len64(a.Nonce) + 7) / 8
}
l++
if !a.Balance.IsZero() {
l += a.Balance.ByteLen()
}
l++
if !a.IsEmptyCodeHash() {
l += 32
}
l++
if a.Incarnation > 0 {
l += (bits.Len64(a.Incarnation) + 7) / 8
}
value := make([]byte, l)
pos := 0
if a.Nonce == 0 {
value[pos] = 0
pos++
} else {
nonceBytes := (bits.Len64(a.Nonce) + 7) / 8
value[pos] = byte(nonceBytes)
var nonce = a.Nonce
for i := nonceBytes; i > 0; i-- {
value[pos+i] = byte(nonce)
nonce >>= 8
}
pos += nonceBytes + 1
}
if a.Balance.IsZero() {
value[pos] = 0
pos++
} else {
balanceBytes := a.Balance.ByteLen()
value[pos] = byte(balanceBytes)
pos++
a.Balance.WriteToSlice(value[pos : pos+balanceBytes])
pos += balanceBytes
}
if a.IsEmptyCodeHash() {
value[pos] = 0
pos++
} else {
value[pos] = 32
pos++
copy(value[pos:pos+32], a.CodeHash[:])
pos += 32
}
if a.Incarnation == 0 {
value[pos] = 0
} else {
incBytes := (bits.Len64(a.Incarnation) + 7) / 8
value[pos] = byte(incBytes)
var inc = a.Incarnation
for i := incBytes; i > 0; i-- {
value[pos+i] = byte(inc)
inc >>= 8
}
}
return value
}
func (rs *State22) Apply(emptyRemoval bool, roTx kv.Tx, txTask TxTask, agg *libstate.Aggregator22) error {
rs.lock.Lock()
defer rs.lock.Unlock()
agg.SetTxNum(txTask.TxNum)
for addr, increase := range txTask.BalanceIncreaseSet {
//if increase.IsZero() {
// continue
//}
enc := rs.get(kv.PlainState, addr.Bytes())
if enc == nil {
enc0 := rs.get(kv.PlainState, addr.Bytes())
if enc0 == nil {
var err error
enc, err = roTx.GetOne(kv.PlainState, addr.Bytes())
enc0, err = roTx.GetOne(kv.PlainState, addr.Bytes())
if err != nil {
return err
}
}
var a accounts.Account
if err := a.DecodeForStorage(enc); err != nil {
if err := a.DecodeForStorage(enc0); err != nil {
return err
}
if len(enc0) > 0 {
// Need to convert before balance increase
enc0 = serialise2(&a)
}
a.Balance.Add(&a.Balance, &increase)
var enc1 []byte
if emptyRemoval && a.Nonce == 0 && a.Balance.IsZero() && a.IsEmptyCodeHash() {
enc = []byte{}
enc1 = []byte{}
} else {
l := a.EncodingLengthForStorage()
enc = make([]byte, l)
a.EncodeForStorage(enc)
enc1 = make([]byte, l)
a.EncodeForStorage(enc1)
}
rs.put(kv.PlainState, addr.Bytes(), enc1)
if err := agg.AddAccountPrev(addr.Bytes(), enc0); err != nil {
return err
}
}
for addrS, original := range txTask.AccountDels {
addr := []byte(addrS)
addr1 := make([]byte, len(addr)+8)
copy(addr1, addr)
binary.BigEndian.PutUint64(addr1[len(addr):], original.Incarnation)
prev := serialise2(original)
if err := agg.AddAccountPrev(addr, prev); err != nil {
return err
}
codePrev := rs.get(kv.Code, original.CodeHash.Bytes())
if codePrev == nil {
var err error
codePrev, err = roTx.GetOne(kv.Code, original.CodeHash.Bytes())
if err != nil {
return err
}
}
if err := agg.AddCodePrev(addr, codePrev); err != nil {
return err
}
// Iterate over storage
cursor, err := roTx.Cursor(kv.PlainState)
if err != nil {
return err
}
defer cursor.Close()
var k, v []byte
var e error
if k, v, e = cursor.Seek(addr1); err != nil {
return e
}
if !bytes.HasPrefix(k, addr1) {
k = nil
}
rs.changes[kv.PlainState].AscendGreaterOrEqual(StateItem{key: addr1}, func(item StateItem) bool {
if !bytes.HasPrefix(item.key, addr1) {
return false
}
for ; e == nil && k != nil && bytes.Compare(k, item.key) <= 0; k, v, e = cursor.Next() {
if !bytes.HasPrefix(k, addr1) {
k = nil
}
if !bytes.Equal(k, item.key) {
if e = agg.AddStoragePrev(addr, libcommon.Copy(k[28:]), libcommon.Copy(v)); e != nil {
return false
}
}
}
if e != nil {
return false
}
if e = agg.AddStoragePrev(addr, item.key[28:], item.val); e != nil {
return false
}
return true
})
for ; e == nil && k != nil && bytes.HasPrefix(k, addr1); k, v, e = cursor.Next() {
if e = agg.AddStoragePrev(addr, libcommon.Copy(k[28:]), libcommon.Copy(v)); e != nil {
return e
}
}
if e != nil {
return e
}
}
for addrS, enc0 := range txTask.AccountPrevs {
if err := agg.AddAccountPrev([]byte(addrS), enc0); err != nil {
return err
}
}
for compositeS, val := range txTask.StoragePrevs {
composite := []byte(compositeS)
if err := agg.AddStoragePrev(composite[:20], composite[28:], val); err != nil {
return err
}
}
for addrS, val := range txTask.CodePrevs {
if err := agg.AddCodePrev([]byte(addrS), val); err != nil {
return err
}
}
if err := agg.FinishTx(); err != nil {
return err
}
if txTask.WriteLists == nil {
return nil
}
for table, list := range txTask.WriteLists {
for i, key := range list.Keys {
val := list.Vals[i]
rs.put(table, key, val)
}
rs.put(kv.PlainState, addr.Bytes(), enc)
}
return nil
}
func (rs *ReconState1) DoneCount() uint64 {
func (rs *State22) DoneCount() uint64 {
rs.lock.RLock()
defer rs.lock.RUnlock()
return rs.txsDone
}
func (rs *ReconState1) SizeEstimate() uint64 {
func (rs *State22) SizeEstimate() uint64 {
rs.lock.RLock()
defer rs.lock.RUnlock()
return rs.sizeEstimate
}
func (rs *ReconState1) ReadsValid(readLists map[string]*KvList) bool {
func (rs *State22) ReadsValid(readLists map[string]*KvList) bool {
rs.lock.RLock()
defer rs.lock.RUnlock()
//fmt.Printf("ValidReads\n")
for table, list := range readLists {
//fmt.Printf("Table %s\n", table)
var t *btree.BTreeG[ReconStateItem1]
var t *btree.BTreeG[StateItem]
var ok bool
if table == CodeSizeTable {
t, ok = rs.changes[kv.Code]
@ -293,7 +456,7 @@ func (rs *ReconState1) ReadsValid(readLists map[string]*KvList) bool {
}
for i, key := range list.Keys {
val := list.Vals[i]
if item, ok := t.Get(ReconStateItem1{key: key}); ok {
if item, ok := t.Get(StateItem{key: key}); ok {
//fmt.Printf("key [%x] => [%x] vs [%x]\n", key, val, rereadVal)
if table == CodeSizeTable {
if binary.BigEndian.Uint64(val) != uint64(len(item.val)) {
@ -328,14 +491,18 @@ func (l *KvList) Swap(i, j int) {
l.Vals[i], l.Vals[j] = l.Vals[j], l.Vals[i]
}
type StateReconWriter1 struct {
rs *ReconState1
txNum uint64
writeLists map[string]*KvList
type StateWriter22 struct {
rs *State22
txNum uint64
writeLists map[string]*KvList
accountPrevs map[string][]byte
accountDels map[string]*accounts.Account
storagePrevs map[string][]byte
codePrevs map[string][]byte
}
func NewStateReconWriter1(rs *ReconState1) *StateReconWriter1 {
return &StateReconWriter1{
func NewStateWriter22(rs *State22) *StateWriter22 {
return &StateWriter22{
rs: rs,
writeLists: map[string]*KvList{
kv.PlainState: {},
@ -343,39 +510,56 @@ func NewStateReconWriter1(rs *ReconState1) *StateReconWriter1 {
kv.PlainContractCode: {},
kv.IncarnationMap: {},
},
accountPrevs: map[string][]byte{},
accountDels: map[string]*accounts.Account{},
storagePrevs: map[string][]byte{},
codePrevs: map[string][]byte{},
}
}
func (w *StateReconWriter1) SetTxNum(txNum uint64) {
func (w *StateWriter22) SetTxNum(txNum uint64) {
w.txNum = txNum
}
func (w *StateReconWriter1) ResetWriteSet() {
func (w *StateWriter22) ResetWriteSet() {
w.writeLists = map[string]*KvList{
kv.PlainState: {},
kv.Code: {},
kv.PlainContractCode: {},
kv.IncarnationMap: {},
}
w.accountPrevs = map[string][]byte{}
w.accountDels = map[string]*accounts.Account{}
w.storagePrevs = map[string][]byte{}
w.codePrevs = map[string][]byte{}
}
func (w *StateReconWriter1) WriteSet() map[string]*KvList {
func (w *StateWriter22) WriteSet() map[string]*KvList {
for _, list := range w.writeLists {
sort.Sort(list)
}
return w.writeLists
}
func (w *StateReconWriter1) UpdateAccountData(address common.Address, original, account *accounts.Account) error {
func (w *StateWriter22) PrevAndDels() (map[string][]byte, map[string]*accounts.Account, map[string][]byte, map[string][]byte) {
return w.accountPrevs, w.accountDels, w.storagePrevs, w.codePrevs
}
func (w *StateWriter22) UpdateAccountData(address common.Address, original, account *accounts.Account) error {
value := make([]byte, account.EncodingLengthForStorage())
account.EncodeForStorage(value)
//fmt.Printf("account [%x]=>{Balance: %d, Nonce: %d, Root: %x, CodeHash: %x} txNum: %d\n", address, &account.Balance, account.Nonce, account.Root, account.CodeHash, w.txNum)
w.writeLists[kv.PlainState].Keys = append(w.writeLists[kv.PlainState].Keys, address.Bytes())
w.writeLists[kv.PlainState].Vals = append(w.writeLists[kv.PlainState].Vals, value)
var prev []byte
if original.Initialised {
prev = serialise2(original)
}
w.accountPrevs[string(address.Bytes())] = prev
return nil
}
func (w *StateReconWriter1) UpdateAccountCode(address common.Address, incarnation uint64, codeHash common.Hash, code []byte) error {
func (w *StateWriter22) UpdateAccountCode(address common.Address, incarnation uint64, codeHash common.Hash, code []byte) error {
w.writeLists[kv.Code].Keys = append(w.writeLists[kv.Code].Keys, codeHash.Bytes())
w.writeLists[kv.Code].Vals = append(w.writeLists[kv.Code].Vals, code)
if len(code) > 0 {
@ -383,10 +567,11 @@ func (w *StateReconWriter1) UpdateAccountCode(address common.Address, incarnatio
w.writeLists[kv.PlainContractCode].Keys = append(w.writeLists[kv.PlainContractCode].Keys, dbutils.PlainGenerateStoragePrefix(address[:], incarnation))
w.writeLists[kv.PlainContractCode].Vals = append(w.writeLists[kv.PlainContractCode].Vals, codeHash.Bytes())
}
w.codePrevs[string(address.Bytes())] = nil
return nil
}
func (w *StateReconWriter1) DeleteAccount(address common.Address, original *accounts.Account) error {
func (w *StateWriter22) DeleteAccount(address common.Address, original *accounts.Account) error {
w.writeLists[kv.PlainState].Keys = append(w.writeLists[kv.PlainState].Keys, address.Bytes())
w.writeLists[kv.PlainState].Vals = append(w.writeLists[kv.PlainState].Vals, []byte{})
if original.Incarnation > 0 {
@ -395,36 +580,41 @@ func (w *StateReconWriter1) DeleteAccount(address common.Address, original *acco
w.writeLists[kv.IncarnationMap].Keys = append(w.writeLists[kv.IncarnationMap].Keys, address.Bytes())
w.writeLists[kv.IncarnationMap].Vals = append(w.writeLists[kv.IncarnationMap].Vals, b[:])
}
if original.Initialised {
w.accountDels[string(address.Bytes())] = original
}
return nil
}
func (w *StateReconWriter1) WriteAccountStorage(address common.Address, incarnation uint64, key *common.Hash, original, value *uint256.Int) error {
func (w *StateWriter22) WriteAccountStorage(address common.Address, incarnation uint64, key *common.Hash, original, value *uint256.Int) error {
if *original == *value {
return nil
}
w.writeLists[kv.PlainState].Keys = append(w.writeLists[kv.PlainState].Keys, dbutils.PlainGenerateCompositeStorageKey(address.Bytes(), incarnation, key.Bytes()))
composite := dbutils.PlainGenerateCompositeStorageKey(address.Bytes(), incarnation, key.Bytes())
w.writeLists[kv.PlainState].Keys = append(w.writeLists[kv.PlainState].Keys, composite)
w.writeLists[kv.PlainState].Vals = append(w.writeLists[kv.PlainState].Vals, value.Bytes())
//fmt.Printf("storage [%x] [%x] => [%x], txNum: %d\n", address, *key, v, w.txNum)
w.storagePrevs[string(composite)] = original.Bytes()
return nil
}
func (w *StateReconWriter1) CreateContract(address common.Address) error {
func (w *StateWriter22) CreateContract(address common.Address) error {
return nil
}
type StateReconReader1 struct {
type StateReader22 struct {
tx kv.Tx
txNum uint64
trace bool
rs *ReconState1
rs *State22
readError bool
stateTxNum uint64
composite []byte
readLists map[string]*KvList
}
func NewStateReconReader1(rs *ReconState1) *StateReconReader1 {
return &StateReconReader1{
func NewStateReader22(rs *State22) *StateReader22 {
return &StateReader22{
rs: rs,
readLists: map[string]*KvList{
kv.PlainState: {},
@ -435,15 +625,15 @@ func NewStateReconReader1(rs *ReconState1) *StateReconReader1 {
}
}
func (r *StateReconReader1) SetTxNum(txNum uint64) {
func (r *StateReader22) SetTxNum(txNum uint64) {
r.txNum = txNum
}
func (r *StateReconReader1) SetTx(tx kv.Tx) {
func (r *StateReader22) SetTx(tx kv.Tx) {
r.tx = tx
}
func (r *StateReconReader1) ResetReadSet() {
func (r *StateReader22) ResetReadSet() {
r.readLists = map[string]*KvList{
kv.PlainState: {},
kv.Code: {},
@ -452,18 +642,18 @@ func (r *StateReconReader1) ResetReadSet() {
}
}
func (r *StateReconReader1) ReadSet() map[string]*KvList {
func (r *StateReader22) ReadSet() map[string]*KvList {
for _, list := range r.readLists {
sort.Sort(list)
}
return r.readLists
}
func (r *StateReconReader1) SetTrace(trace bool) {
func (r *StateReader22) SetTrace(trace bool) {
r.trace = trace
}
func (r *StateReconReader1) ReadAccountData(address common.Address) (*accounts.Account, error) {
func (r *StateReader22) ReadAccountData(address common.Address) (*accounts.Account, error) {
enc := r.rs.Get(kv.PlainState, address.Bytes())
if enc == nil {
var err error
@ -487,7 +677,7 @@ func (r *StateReconReader1) ReadAccountData(address common.Address) (*accounts.A
return &a, nil
}
func (r *StateReconReader1) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) {
func (r *StateReader22) ReadAccountStorage(address common.Address, incarnation uint64, key *common.Hash) ([]byte, error) {
if cap(r.composite) < 20+8+32 {
r.composite = make([]byte, 20+8+32)
} else if len(r.composite) != 20+8+32 {
@ -520,7 +710,7 @@ func (r *StateReconReader1) ReadAccountStorage(address common.Address, incarnati
return enc, nil
}
func (r *StateReconReader1) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) {
func (r *StateReader22) ReadAccountCode(address common.Address, incarnation uint64, codeHash common.Hash) ([]byte, error) {
enc := r.rs.Get(kv.Code, codeHash.Bytes())
if enc == nil {
var err error
@ -537,7 +727,7 @@ func (r *StateReconReader1) ReadAccountCode(address common.Address, incarnation
return enc, nil
}
func (r *StateReconReader1) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) {
func (r *StateReader22) ReadAccountCodeSize(address common.Address, incarnation uint64, codeHash common.Hash) (int, error) {
enc := r.rs.Get(kv.Code, codeHash.Bytes())
if enc == nil {
var err error
@ -557,7 +747,7 @@ func (r *StateReconReader1) ReadAccountCodeSize(address common.Address, incarnat
return size, nil
}
func (r *StateReconReader1) ReadAccountIncarnation(address common.Address) (uint64, error) {
func (r *StateReader22) ReadAccountIncarnation(address common.Address) (uint64, error) {
enc := r.rs.Get(kv.IncarnationMap, address.Bytes())
if enc == nil {
var err error

View File

@ -188,12 +188,12 @@ func (rs *ReconState) SizeEstimate() uint64 {
}
type StateReconWriter struct {
ac *libstate.AggregatorContext
ac *libstate.Aggregator22Context
rs *ReconState
txNum uint64
}
func NewStateReconWriter(ac *libstate.AggregatorContext, rs *ReconState) *StateReconWriter {
func NewStateReconWriter(ac *libstate.Aggregator22Context, rs *ReconState) *StateReconWriter {
return &StateReconWriter{
ac: ac,
rs: rs,

2
go.mod
View File

@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon
go 1.18
require (
github.com/ledgerwatch/erigon-lib v0.0.0-20220727050920-4a04d9284002
github.com/ledgerwatch/erigon-lib v0.0.0-20220728074713-fadc9b21d1dd
github.com/ledgerwatch/erigon-snapshot v1.0.0
github.com/ledgerwatch/log/v3 v3.4.1
github.com/ledgerwatch/secp256k1 v1.0.0

4
go.sum
View File

@ -390,8 +390,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20220727050920-4a04d9284002 h1:37VrYyM+RBV6CCxnb3i0RW4euw//PO31ld+KqRi8EUs=
github.com/ledgerwatch/erigon-lib v0.0.0-20220727050920-4a04d9284002/go.mod h1:19wwSb5qbagorz9a4QN9FzNqSPjmOJkwa5TezGjloks=
github.com/ledgerwatch/erigon-lib v0.0.0-20220728074713-fadc9b21d1dd h1:N8Erhb3TNWyXBFYZD6tthmDaIhwWN4bVoATuoxJuSpA=
github.com/ledgerwatch/erigon-lib v0.0.0-20220728074713-fadc9b21d1dd/go.mod h1:19wwSb5qbagorz9a4QN9FzNqSPjmOJkwa5TezGjloks=
github.com/ledgerwatch/erigon-snapshot v1.0.0 h1:bp/7xoPdM5lK7LFdqEMH008RZmqxMZV0RUVEQiWs7v4=
github.com/ledgerwatch/erigon-snapshot v1.0.0/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc=