e2: ReadAhead of blocks, senders accounts, code (#7501)

It improves performance of initial sync (stage exec) by 5-20% when
blocks snapshots are mounted to high-latency drive and when chaindata is
on high-latency drive. And improving cold-start performance.

Current implementation using 2 goroutines for ReadAhead. It also
producing more garbage, can improve it later (here are dashboard with
impact).
```
mainnet2-1: with ReadAhead
mainnet2-3: no ReadAhead
```

<img width="949" alt="Screenshot 2023-05-12 at 09 24 31"
src="https://github.com/ledgerwatch/erigon/assets/46885206/b90b1fa8-9099-48ff-95b3-86e864a36d46">

<img width="845" alt="Screenshot 2023-05-12 at 09 24 13"
src="https://github.com/ledgerwatch/erigon/assets/46885206/39d90c0c-a9d5-4735-8c03-da1455b147aa">
This commit is contained in:
Alex Sharov 2023-05-15 15:08:35 +07:00 committed by GitHub
parent 9ab48c067b
commit b3aca15ff8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 104 additions and 1 deletions

View File

@ -23,6 +23,7 @@ import (
"github.com/ledgerwatch/erigon-lib/kv/temporal/historyv2"
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/log/v3"
"golang.org/x/sync/errgroup"
"github.com/ledgerwatch/erigon/common/changeset"
"github.com/ledgerwatch/erigon/common/dbutils"
@ -419,11 +420,27 @@ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, tx kv.RwTx, toBlock uint
batch.Rollback()
}()
var readAhead chan uint64
if initialCycle {
// snapshots are often stored on chaper drives. don't expect low-read-latency and manually read-ahead.
// can't use OS-level ReadAhead - because Data >> RAM
// it also warmsup state a bit - by touching senders/coninbase accounts and code
var clean func()
readAhead, clean = blocksReadAhead(ctx, &cfg, 4)
defer clean()
}
Loop:
for blockNum := stageProgress + 1; blockNum <= to; blockNum++ {
if stoppedErr = common.Stopped(quit); stoppedErr != nil {
break
}
if initialCycle {
select {
case readAhead <- blockNum:
default:
}
}
blockHash, err := rawdb.ReadCanonicalHash(tx, blockNum)
if err != nil {
@ -517,6 +534,93 @@ Loop:
return stoppedErr
}
func blocksReadAhead(ctx context.Context, cfg *ExecuteBlockCfg, workers int) (chan uint64, context.CancelFunc) {
const readAheadBlocks = 100
readAhead := make(chan uint64, readAheadBlocks)
g, gCtx := errgroup.WithContext(ctx)
for workerNum := 0; workerNum < workers; workerNum++ {
g.Go(func() (err error) {
var bn uint64
var ok bool
var tx kv.Tx
defer func() {
if tx != nil {
tx.Rollback()
}
}()
for i := 0; ; i++ {
select {
case bn, ok = <-readAhead:
if !ok {
return
}
case <-gCtx.Done():
return gCtx.Err()
}
if i%100 == 0 {
if tx != nil {
tx.Rollback()
}
tx, err = cfg.db.BeginRo(ctx)
if err != nil {
return err
}
}
if err := blocksReadAheadFunc(gCtx, tx, cfg, bn+readAheadBlocks); err != nil {
return err
}
}
})
}
return readAhead, func() {
close(readAhead)
_ = g.Wait()
}
}
func blocksReadAheadFunc(ctx context.Context, tx kv.Tx, cfg *ExecuteBlockCfg, blockNum uint64) error {
blockHash, err := rawdb.ReadCanonicalHash(tx, blockNum)
if err != nil {
return err
}
block, senders, err := cfg.blockReader.BlockWithSenders(ctx, tx, blockHash, blockNum)
if err != nil {
return err
}
if block == nil {
return nil
}
stateReader := state.NewPlainStateReader(tx) //TODO: can do on batch! if make batch thread-safe
for _, sender := range senders {
a, _ := stateReader.ReadAccountData(sender)
if a == nil || a.Incarnation == 0 {
continue
}
if code, _ := stateReader.ReadAccountCode(sender, a.Incarnation, a.CodeHash); len(code) > 0 {
_, _ = code[0], code[len(code)-1]
}
}
for _, txn := range block.Transactions() {
to := txn.GetTo()
if to == nil {
continue
}
a, _ := stateReader.ReadAccountData(*to)
if a == nil || a.Incarnation == 0 {
continue
}
if code, _ := stateReader.ReadAccountCode(*to, a.Incarnation, a.CodeHash); len(code) > 0 {
_, _ = code[0], code[len(code)-1]
}
}
_, _ = stateReader.ReadAccountData(block.Coinbase())
_, _ = block, senders
return nil
}
func logProgress(logPrefix string, prevBlock uint64, prevTime time.Time, currentBlock uint64, prevTx, currentTx uint64, gas uint64,
gasState float64, batch ethdb.DbWithPendingMutations, logger log.Logger) (uint64, uint64, time.Time) {
currentTime := time.Now()

View File

@ -364,7 +364,6 @@ func (back *BlockReaderWithSnapshots) BlockWithSenders(ctx context.Context, tx k
return block, senders, nil
}
var txs []types.Transaction
var senders []libcommon.Address
ok, err = back.sn.ViewTxs(blockHeight, func(seg *TxnSegment) error {
txs, senders, err = back.txsFromSnapshot(baseTxnId, txsAmount, seg, buf)
if err != nil {