Snapshots: better support of p2p (#3749)

This commit is contained in:
Alex Sharov 2022-03-21 20:36:03 +07:00 committed by GitHub
parent 752a52c4af
commit 4cf4c1bac1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 39 additions and 22 deletions

View File

@ -10,7 +10,6 @@ import (
"github.com/anacrolix/torrent/storage"
"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/common/dir"
"golang.org/x/time/rate"
)
// DefaultPieceSize - Erigon serves many big files, bigger pieces will reduce
@ -49,8 +48,8 @@ func New(snapshotsDir *dir.Rw, verbosity lg.Level, downloadRate, uploadRate data
torrentConfig.UpnpID = torrentConfig.UpnpID + "leecher"
// rates are divided by 2 - I don't know why it works, maybe bug inside torrent lib accounting
torrentConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(uploadRate.Bytes()/2), 2*DefaultPieceSize) // default: unlimited
torrentConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(downloadRate.Bytes()/2), 2*DefaultPieceSize) // default: unlimited
//torrentConfig.UploadRateLimiter = rate.NewLimiter(rate.Limit(uploadRate.Bytes()/2), 2*16384) // default: unlimited
//torrentConfig.DownloadRateLimiter = rate.NewLimiter(rate.Limit(downloadRate.Bytes()/2), 2*DefaultPieceSize) // default: unlimited
// debug
if lg.Debug == verbosity {

View File

@ -312,7 +312,6 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
var allSnapshots *snapshotsync.RoSnapshots
if config.Snapshot.Enabled {
allSnapshots = snapshotsync.NewRoSnapshots(config.Snapshot, config.SnapshotDir.Path)
allSnapshots.AsyncOpenAll(ctx)
blockReader = snapshotsync.NewBlockReaderWithSnapshots(allSnapshots)
if len(stack.Config().DownloaderAddr) > 0 {

View File

@ -572,7 +572,7 @@ func HeadersPOW(
test bool, // Set to true in tests, allows the stage to fail rather than wait indefinitely
useExternalTx bool,
) error {
if err := DownloadAndIndexSnapshotsIfNeed(s, ctx, tx, cfg); err != nil {
if err := DownloadAndIndexSnapshotsIfNeed(s, ctx, tx, cfg, initialCycle); err != nil {
return err
}
@ -987,22 +987,25 @@ func HeadersPrune(p *PruneState, tx kv.RwTx, cfg HeadersCfg, ctx context.Context
return nil
}
func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.RwTx, cfg HeadersCfg) error {
func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.RwTx, cfg HeadersCfg, initialCycle bool) error {
if cfg.snapshots == nil {
return nil
}
if !cfg.snapshots.SegmentsReady() || cfg.snapshots.SegmentsAvailable() < cfg.snapshotHashesCfg.ExpectBlocks {
if !initialCycle {
if err := WaitForDownloader(ctx, tx, cfg); err != nil {
return err
}
expect := cfg.snapshotHashesCfg.ExpectBlocks
if err := cfg.snapshots.ReopenSegments(); err != nil {
return fmt.Errorf("ReopenSegments: %w", err)
}
expect := cfg.snapshotHashesCfg.ExpectBlocks
if cfg.snapshots.SegmentsAvailable() < expect {
return fmt.Errorf("not enough snapshots available: %d > %d", expect, cfg.snapshots.SegmentsAvailable())
}
if err := cfg.snapshots.ReopenIndices(); err != nil {
return fmt.Errorf("ReopenIndices: %w", err)
}
}
// Create .idx files
@ -1127,25 +1130,34 @@ func WaitForDownloader(ctx context.Context, tx kv.RwTx, cfg HeadersCfg) error {
}
break
}
var prevBytesCompleted uint64
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
// Print download progress until all segments are available
Loop:
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
case <-logEvery.C:
if reply, err := cfg.snapshotDownloader.Stats(ctx, &proto_downloader.StatsRequest{}); err != nil {
log.Warn("Error while waiting for snapshots progress", "err", err)
} else if int(reply.Torrents) < len(snapshotsCfg.Preverified) {
log.Warn("Downloader has not enough snapshots (yet)")
} else if reply.Completed {
break Loop
} else {
readBytesPerSec := (reply.BytesCompleted - prevBytesCompleted) / uint64(logInterval.Seconds())
//result.writeBytesPerSec += (result.bytesWritten - prevStats.bytesWritten) / int64(interval.Seconds())
readiness := 100 * (float64(reply.BytesCompleted) / float64(reply.BytesTotal))
log.Info("[Snapshots] download", "progress", fmt.Sprintf("%.2f%%", readiness),
"download", libcommon.ByteCount(readBytesPerSec)+"/s",
)
prevBytesCompleted = reply.BytesCompleted
}
}
if reply, err := cfg.snapshotDownloader.Stats(ctx, &proto_downloader.StatsRequest{}); err != nil {
log.Warn("Error while waiting for snapshots progress", "err", err)
} else if int(reply.Torrents) < len(snapshotsCfg.Preverified) {
log.Warn("Downloader has not enough snapshots (yet)")
} else if reply.Completed {
break
} else {
readiness := 100 * (float64(reply.BytesCompleted) / float64(reply.BytesTotal))
log.Info("[Snapshots] download", "progress", fmt.Sprintf("%.2f%%", readiness))
}
time.Sleep(10 * time.Second)
}
if err := tx.Put(kv.DatabaseInfo, []byte(readyKey), []byte{1}); err != nil {

View File

@ -397,7 +397,7 @@ func retireBlocks(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context
}
if res := cfg.blockRetire.Result(); res != nil {
if res.Err != nil {
return fmt.Errorf("[%s] retire blocks last error: %w", s.LogPrefix(), res.Err)
return fmt.Errorf("[%s] retire blocks last error: %w, fromBlock=%d, toBlock=%d", s.LogPrefix(), res.Err, res.BlockFrom, res.BlockTo)
}
}
if !cfg.blockRetire.Snapshots().Cfg().KeepBlocks {

View File

@ -238,7 +238,7 @@ func writeRawBodyDeprecated(db kv.StatelessRwTx, hash common.Hash, number uint64
return fmt.Errorf("failed to write body: %w", err)
}
if err = rawdb.WriteRawTransactions(db, body.Transactions, baseTxId); err != nil {
return fmt.Errorf("failed to WriteRawTransactions: %w", err)
return fmt.Errorf("failed to WriteRawTransactions: %w, blockNum=%d", err, number)
}
return nil
}

View File

@ -219,6 +219,10 @@ func (back *BlockReaderWithSnapshots) HeaderByHash(ctx context.Context, tx kv.Ge
buf := make([]byte, 128)
if err := back.sn.Headers.View(func(segments []*HeaderSegment) error {
for i := len(segments) - 1; i >= 0; i-- {
if segments[i].idxHeaderHash == nil {
continue
}
h, err = back.headerFromSnapshotByHash(hash, segments[i], buf)
if err != nil {
return err
@ -546,6 +550,9 @@ func (back *BlockReaderWithSnapshots) txsFromSnapshot(baseTxnID uint64, txsAmoun
func (back *BlockReaderWithSnapshots) txnByHash(txnHash common.Hash, segments []*TxnSegment, buf []byte) (txn types.Transaction, blockNum, txnID uint64, err error) {
for i := len(segments) - 1; i >= 0; i-- {
sn := segments[i]
if sn.IdxTxnId == nil || sn.IdxTxnHash == nil || sn.IdxTxnHash2BlockNum == nil {
continue
}
reader := recsplit.NewIndexReader(sn.IdxTxnHash)
offset := reader.Lookup(txnHash[:])