From a13c5ba904ef1d14fde51dd34fa8c3385e476c10 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sat, 30 Apr 2022 11:42:36 +0700 Subject: [PATCH] Fix non-starting download (#4031) * save * save * save * save --- cmd/integration/commands/stages.go | 4 +- core/rawdb/accessors_chain.go | 170 ++++++++++++++--------------- core/rawdb/accessors_chain_test.go | 5 +- eth/stagedsync/stage.go | 4 +- eth/stagedsync/stage_execute.go | 4 +- eth/stagedsync/stage_headers.go | 20 ++-- eth/stagedsync/stage_senders.go | 27 +++-- 7 files changed, 119 insertions(+), 115 deletions(-) diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 92f59b34c..d764c1985 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -482,7 +482,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context) error { return fmt.Errorf("re-read Bodies progress: %w", err) } { // hard-unwind stage_body also - if err := rawdb.TruncateBlocks(tx, progress+1); err != nil { + if err := rawdb.TruncateBlocks(ctx, tx, progress+1); err != nil { return err } progressBodies, err := stages.GetStageProgress(tx, stages.Bodies) @@ -506,7 +506,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context) error { if err != nil { return err } - if err = tx.Put(kv.HeadHeaderKey, []byte(kv.HeadHeaderKey), hash[:]); err != nil { + if err = rawdb.WriteHeadHeaderHash(tx, hash); err != nil { return err } diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index a4995f497..d0933fdff 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -1077,8 +1077,8 @@ func min(a, b uint64) uint64 { // DeleteAncientBlocks - delete [1, to) old blocks after moving it to snapshots. // keeps genesis in db: [1, to) -// doesn't delete Reciepts -// doesn't delete Canonical markers +// doesn't change sequnces of kv.EthTx and kv.NonCanonicalTxs +// doesn't delete Reciepts, Senders, Canonical markers, TotalDifficulty func DeleteAncientBlocks(db kv.RwTx, blockTo uint64, blocksDeleteLimit int) error { c, err := db.Cursor(kv.Headers) if err != nil { @@ -1106,7 +1106,35 @@ func DeleteAncientBlocks(db kv.RwTx, blockTo uint64, blocksDeleteLimit int) erro if n >= stopAtBlock { // [from, to) break } - if err := delBlock(db, k, true); err != nil { + + canonicalHash, err := ReadCanonicalHash(db, n) + if err != nil { + return err + } + isCanonical := bytes.Equal(k[8:], canonicalHash[:]) + + b, err := ReadBodyForStorageByKey(db, k) + if err != nil { + return err + } + if b == nil { + return fmt.Errorf("DeleteAncientBlocks: block body not found for block %d", n) + } + txIDBytes := make([]byte, 8) + for txID := b.BaseTxId; txID < b.BaseTxId+uint64(b.TxAmount); txID++ { + binary.BigEndian.PutUint64(txIDBytes, txID) + bucket := kv.EthTx + if !isCanonical { + bucket = kv.NonCanonicalTxs + } + if err := db.Delete(bucket, txIDBytes, nil); err != nil { + return err + } + } + if err := db.Delete(kv.Headers, k, nil); err != nil { + return err + } + if err := db.Delete(kv.BlockBody, k, nil); err != nil { return err } } @@ -1114,7 +1142,7 @@ func DeleteAncientBlocks(db kv.RwTx, blockTo uint64, blocksDeleteLimit int) erro return nil } -func lastKey(tx kv.Tx, table string) ([]byte, error) { +func LastKey(tx kv.Tx, table string) ([]byte, error) { c, err := tx.Cursor(table) if err != nil { return nil, err @@ -1127,93 +1155,26 @@ func lastKey(tx kv.Tx, table string) ([]byte, error) { return k, nil } -// delBlock - low-level method to delete 1 block by key -// keeps genesis in db: [1, to) -// doesn't delete Reciepts -// doesn't delete Canonical markers -// doesn't delete TotalDifficulty -// keepSequence - can track decrement sequnces of Canonical and Non-Canonical txs -func delBlock(tx kv.RwTx, k []byte, keepSequence bool) error { - n := binary.BigEndian.Uint64(k) - canonicalHash, err := ReadCanonicalHash(tx, n) +func FirstKey(tx kv.Tx, table string) ([]byte, error) { + c, err := tx.Cursor(table) if err != nil { - return err + return nil, err } - isCanonical := bytes.Equal(k[8:], canonicalHash[:]) - b, err := ReadBodyForStorageByKey(tx, k) + defer c.Close() + k, _, err := c.First() if err != nil { - return err + return nil, err } - if b == nil { - return fmt.Errorf("DeleteAncientBlocks: block body not found for block %d", n) - } - txIDBytes := make([]byte, 8) - for txID := b.BaseTxId; txID < b.BaseTxId+uint64(b.TxAmount); txID++ { - binary.BigEndian.PutUint64(txIDBytes, txID) - bucket := kv.EthTx - if !isCanonical { - bucket = kv.NonCanonicalTxs - } - - if !keepSequence { - lastK, err := lastKey(tx, bucket) - if err != nil { - return err - } - seq, err := tx.ReadSequence(bucket) - if err != nil { - return err - } - if lastK != nil && binary.BigEndian.Uint64(lastK) > seq { - return fmt.Errorf("please delete blocks from newest to older. txnId: %d > %d", binary.BigEndian.Uint64(lastK), seq) - } - if seq > b.BaseTxId+uint64(b.TxAmount) { - return fmt.Errorf("please delete blocks from newest to older. seq: %d > %d+%d", seq, b.BaseTxId, uint64(b.TxAmount)) - } - if err := ResetSequence(tx, bucket, b.BaseTxId); err != nil { - return err - } - } - - if err := tx.Delete(bucket, txIDBytes, nil); err != nil { - return err - } - } - if err := tx.Delete(kv.Headers, k, nil); err != nil { - return err - } - if err := tx.Delete(kv.BlockBody, k, nil); err != nil { - return err - } - if err := tx.Delete(kv.Senders, k, nil); err != nil { - return err - } - - if !keepSequence { - bucket := kv.EthTx - if !isCanonical { - bucket = kv.NonCanonicalTxs - } - lastK, err := lastKey(tx, bucket) - if err != nil { - return err - } - seq, err := tx.ReadSequence(bucket) - if err != nil { - return err - } - if lastK != nil && binary.BigEndian.Uint64(lastK) > seq { - return fmt.Errorf("end: please delete blocks from newest to older. txnId: %d > %d", binary.BigEndian.Uint64(lastK), seq) - } - if seq > b.BaseTxId+uint64(b.TxAmount) { - return fmt.Errorf("end: please delete blocks from newest to older. seq: %d > %d+%d", seq, b.BaseTxId, uint64(b.TxAmount)) - } - } - return nil + return k, nil } // TruncateBlocks - delete block >= blockFrom -func TruncateBlocks(tx kv.RwTx, blockFrom uint64) error { +// does decrement sequnces of kv.EthTx and kv.NonCanonicalTxs +// doesn't delete Reciepts, Senders, Canonical markers, TotalDifficulty +func TruncateBlocks(ctx context.Context, tx kv.RwTx, blockFrom uint64) error { + logEvery := time.NewTicker(20 * time.Second) + defer logEvery.Stop() + c, err := tx.Cursor(kv.Headers) if err != nil { return err @@ -1230,10 +1191,49 @@ func TruncateBlocks(tx kv.RwTx, blockFrom uint64) error { if n < blockFrom { // [from, to) break } - if err := delBlock(tx, k, false); err != nil { + canonicalHash, err := ReadCanonicalHash(tx, n) + if err != nil { return err } + isCanonical := bytes.Equal(k[8:], canonicalHash[:]) + + b, err := ReadBodyForStorageByKey(tx, k) + if err != nil { + return err + } + if b != nil { + bucket := kv.EthTx + if !isCanonical { + bucket = kv.NonCanonicalTxs + } + if err := tx.ForEach(bucket, dbutils.EncodeBlockNumber(b.BaseTxId), func(k, _ []byte) error { + if err := tx.Delete(bucket, k, nil); err != nil { + return err + } + return nil + }); err != nil { + return err + } + if err := ResetSequence(tx, bucket, b.BaseTxId); err != nil { + return err + } + } + if err := tx.Delete(kv.Headers, k, nil); err != nil { + return err + } + if err := tx.Delete(kv.BlockBody, k, nil); err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-logEvery.C: + log.Info("TruncateBlocks", "block", n) + default: + } } + return nil } diff --git a/core/rawdb/accessors_chain_test.go b/core/rawdb/accessors_chain_test.go index f1d96e81d..d91c4db2a 100644 --- a/core/rawdb/accessors_chain_test.go +++ b/core/rawdb/accessors_chain_test.go @@ -18,6 +18,7 @@ package rawdb import ( "bytes" + "context" "encoding/hex" "fmt" "math/big" @@ -158,7 +159,7 @@ func TestBlockStorage(t *testing.T) { } else if entry.Hash() != block.Hash() { t.Fatalf("Retrieved header mismatch: have %v, want %v", entry, block.Header()) } - if err := TruncateBlocks(tx, 2); err != nil { + if err := TruncateBlocks(context.Background(), tx, 2); err != nil { t.Fatal(err) } if entry := ReadCanonicalBodyWithTransactions(tx, block.Hash(), block.NumberU64()); entry == nil { @@ -167,7 +168,7 @@ func TestBlockStorage(t *testing.T) { t.Fatalf("Retrieved body mismatch: have %v, want %v", entry, block.Body()) } // Delete the block and verify the execution - if err := TruncateBlocks(tx, block.NumberU64()); err != nil { + if err := TruncateBlocks(context.Background(), tx, block.NumberU64()); err != nil { t.Fatal(err) } //if err := DeleteBlock(tx, block.Hash(), block.NumberU64()); err != nil { diff --git a/eth/stagedsync/stage.go b/eth/stagedsync/stage.go index bb7c94e34..5224e3c80 100644 --- a/eth/stagedsync/stage.go +++ b/eth/stagedsync/stage.go @@ -110,7 +110,7 @@ func (s *PruneState) DoneAt(db kv.Putter, blockNum uint64) error { } // PruneTable has `limit` parameter to avoid too large data deletes per one sync cycle - better delete by small portions to reduce db.FreeList size -func PruneTable(tx kv.RwTx, table string, logPrefix string, pruneTo uint64, logEvery *time.Ticker, ctx context.Context, limit int) error { +func PruneTable(tx kv.RwTx, table string, pruneTo uint64, ctx context.Context, limit int) error { c, err := tx.RwCursor(table) if err != nil { @@ -133,8 +133,6 @@ func PruneTable(tx kv.RwTx, table string, logPrefix string, pruneTo uint64, logE break } select { - case <-logEvery.C: - log.Info(fmt.Sprintf("[%s]", logPrefix), "table", table, "block", blockNum) case <-ctx.Done(): return libcommon.ErrStopped default: diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 002f066ff..08f8aaf3d 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -587,11 +587,11 @@ func PruneExecutionStage(s *PruneState, tx kv.RwTx, cfg ExecuteBlockCfg, ctx con } if cfg.prune.Receipts.Enabled() { - if err = PruneTable(tx, kv.Receipts, logPrefix, cfg.prune.Receipts.PruneTo(s.ForwardProgress), logEvery, ctx, math.MaxInt32); err != nil { + if err = PruneTable(tx, kv.Receipts, cfg.prune.Receipts.PruneTo(s.ForwardProgress), ctx, math.MaxInt32); err != nil { return err } // LogIndex.Prune will read everything what not pruned here - if err = PruneTable(tx, kv.Log, logPrefix, cfg.prune.Receipts.PruneTo(s.ForwardProgress), logEvery, ctx, math.MaxInt32); err != nil { + if err = PruneTable(tx, kv.Log, cfg.prune.Receipts.PruneTo(s.ForwardProgress), ctx, math.MaxInt32); err != nil { return err } } diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index b29a0db40..7ac006dba 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -1116,14 +1116,10 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R } } - if s.BlockNumber < 2 { // allow genesis + if s.BlockNumber < cfg.snapshots.BlocksAvailable() { // allow genesis logEvery := time.NewTicker(logInterval) defer logEvery.Stop() - //tx.ClearBucket(kv.HeaderCanonical) - //tx.ClearBucket(kv.HeaderTD) - //tx.ClearBucket(kv.HeaderNumber) - // fill some small tables from snapshots, in future we may store this data in snapshots also, but // for now easier just store them in db td := big.NewInt(0) @@ -1165,10 +1161,16 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R if !ok { return fmt.Errorf("snapshot not found for block: %d", cfg.snapshots.BlocksAvailable()) } - } - - // Add last headers from snapshots to HeaderDownloader (as persistent links) - if s.BlockNumber < cfg.snapshots.BlocksAvailable() { + if err := s.Update(tx, cfg.snapshots.BlocksAvailable()); err != nil { + return err + } + canonicalHash, err := cfg.blockReader.CanonicalHash(ctx, tx, cfg.snapshots.BlocksAvailable()) + if err != nil { + return err + } + if err = rawdb.WriteHeadHeaderHash(tx, canonicalHash); err != nil { + return err + } if err := cfg.hd.AddHeaderFromSnapshot(tx, cfg.snapshots.BlocksAvailable(), cfg.blockReader); err != nil { return err } diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index a9b3f021e..a0fe3604b 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -373,12 +373,23 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co defer tx.Rollback() } + // With snapsync - can prune old data only after snapshot for this data created: CanDeleteTo() if cfg.blockRetire.Snapshots() != nil && cfg.blockRetire.Snapshots().Cfg().Enabled { - if err := retireBlocks(s, tx, cfg, ctx); err != nil { - return fmt.Errorf("retireBlocks: %w", err) + if !cfg.blockRetire.Snapshots().Cfg().KeepBlocks { + canDeleteTo := snapshotsync.CanDeleteTo(s.ForwardProgress, cfg.blockRetire.Snapshots()) + if err := rawdb.DeleteAncientBlocks(tx, canDeleteTo, 1_000); err != nil { + return nil + } + if err = PruneTable(tx, kv.Senders, canDeleteTo, ctx, 1_000); err != nil { + return err + } + } + + if err := retireBlocksInSingleBackgroundThread(s, cfg, ctx); err != nil { + return fmt.Errorf("retireBlocksInSingleBackgroundThread: %w", err) } } else if cfg.prune.TxIndex.Enabled() { - if err = PruneTable(tx, kv.Senders, s.LogPrefix(), to, logEvery, ctx, 1_000); err != nil { + if err = PruneTable(tx, kv.Senders, to, ctx, 1_000); err != nil { return err } } @@ -391,15 +402,7 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co return nil } -func retireBlocks(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context) (err error) { - // delete portion of old blocks in any case - if !cfg.blockRetire.Snapshots().Cfg().KeepBlocks { - canDeleteTo := snapshotsync.CanDeleteTo(s.ForwardProgress, cfg.blockRetire.Snapshots()) - if err := rawdb.DeleteAncientBlocks(tx, canDeleteTo, 1_000); err != nil { - return nil - } - } - +func retireBlocksInSingleBackgroundThread(s *PruneState, cfg SendersCfg, ctx context.Context) (err error) { // if something already happens in background - noop if cfg.blockRetire.Working() { return nil