diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index d48052f26..670c4373b 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -23,6 +23,7 @@ import ( "github.com/ledgerwatch/erigon-lib/kv/temporal/historyv2" libstate "github.com/ledgerwatch/erigon-lib/state" "github.com/ledgerwatch/log/v3" + "golang.org/x/sync/errgroup" "github.com/ledgerwatch/erigon/common/changeset" "github.com/ledgerwatch/erigon/common/dbutils" @@ -419,11 +420,27 @@ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint batch.Rollback() }() + var readAhead chan uint64 + if initialCycle { + // snapshots are often stored on chaper drives. don't expect low-read-latency and manually read-ahead. + // can't use OS-level ReadAhead - because Data >> RAM + // it also warmsup state a bit - by touching senders/coninbase accounts and code + var clean func() + readAhead, clean = blocksReadAhead(ctx, &cfg, 4) + defer clean() + } + Loop: for blockNum := stageProgress + 1; blockNum <= to; blockNum++ { if stoppedErr = common.Stopped(quit); stoppedErr != nil { break } + if initialCycle { + select { + case readAhead <- blockNum: + default: + } + } blockHash, err := rawdb.ReadCanonicalHash(tx, blockNum) if err != nil { @@ -517,6 +534,93 @@ Loop: return stoppedErr } +func blocksReadAhead(ctx context.Context, cfg *ExecuteBlockCfg, workers int) (chan uint64, context.CancelFunc) { + const readAheadBlocks = 100 + readAhead := make(chan uint64, readAheadBlocks) + g, gCtx := errgroup.WithContext(ctx) + for workerNum := 0; workerNum < workers; workerNum++ { + g.Go(func() (err error) { + var bn uint64 + var ok bool + var tx kv.Tx + defer func() { + if tx != nil { + tx.Rollback() + } + }() + + for i := 0; ; i++ { + select { + case bn, ok = <-readAhead: + if !ok { + return + } + case <-gCtx.Done(): + return gCtx.Err() + } + + if i%100 == 0 { + if tx != nil { + tx.Rollback() + } + tx, err = cfg.db.BeginRo(ctx) + if err != nil { + return err + } + } + + if err := blocksReadAheadFunc(gCtx, tx, cfg, bn+readAheadBlocks); err != nil { + return err + } + } + }) + } + return readAhead, func() { + close(readAhead) + _ = g.Wait() + } +} +func blocksReadAheadFunc(ctx context.Context, tx kv.Tx, cfg *ExecuteBlockCfg, blockNum uint64) error { + blockHash, err := rawdb.ReadCanonicalHash(tx, blockNum) + if err != nil { + return err + } + block, senders, err := cfg.blockReader.BlockWithSenders(ctx, tx, blockHash, blockNum) + if err != nil { + return err + } + if block == nil { + return nil + } + stateReader := state.NewPlainStateReader(tx) //TODO: can do on batch! if make batch thread-safe + for _, sender := range senders { + a, _ := stateReader.ReadAccountData(sender) + if a == nil || a.Incarnation == 0 { + continue + } + if code, _ := stateReader.ReadAccountCode(sender, a.Incarnation, a.CodeHash); len(code) > 0 { + _, _ = code[0], code[len(code)-1] + } + } + + for _, txn := range block.Transactions() { + to := txn.GetTo() + if to == nil { + continue + } + a, _ := stateReader.ReadAccountData(*to) + if a == nil || a.Incarnation == 0 { + continue + } + if code, _ := stateReader.ReadAccountCode(*to, a.Incarnation, a.CodeHash); len(code) > 0 { + _, _ = code[0], code[len(code)-1] + } + } + _, _ = stateReader.ReadAccountData(block.Coinbase()) + _, _ = block, senders + return nil +} + func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, currentBlock uint64, prevTx, currentTx uint64, gas uint64, gasState float64, batch ethdb.DbWithPendingMutations, logger log.Logger) (uint64, uint64, time.Time) { currentTime := time.Now() diff --git a/turbo/snapshotsync/block_reader.go b/turbo/snapshotsync/block_reader.go index 0a5ac5bb3..c51936e32 100644 --- a/turbo/snapshotsync/block_reader.go +++ b/turbo/snapshotsync/block_reader.go @@ -364,7 +364,6 @@ func (back *BlockReaderWithSnapshots) BlockWithSenders(ctx context.Context, tx k return block, senders, nil } var txs []types.Transaction - var senders []libcommon.Address ok, err = back.sn.ViewTxs(blockHeight, func(seg *TxnSegment) error { txs, senders, err = back.txsFromSnapshot(baseTxnId, txsAmount, seg, buf) if err != nil {