Fix non-starting download (#4031)

* save

* save

* save

* save
This commit is contained in:
Alex Sharov 2022-04-30 11:42:36 +07:00 committed by GitHub
parent af5bdceb14
commit a13c5ba904
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 119 additions and 115 deletions

View File

@ -482,7 +482,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context) error {
return fmt.Errorf("re-read Bodies progress: %w", err)
}
{ // hard-unwind stage_body also
if err := rawdb.TruncateBlocks(tx, progress+1); err != nil {
if err := rawdb.TruncateBlocks(ctx, tx, progress+1); err != nil {
return err
}
progressBodies, err := stages.GetStageProgress(tx, stages.Bodies)
@ -506,7 +506,7 @@ func stageHeaders(db kv.RwDB, ctx context.Context) error {
if err != nil {
return err
}
if err = tx.Put(kv.HeadHeaderKey, []byte(kv.HeadHeaderKey), hash[:]); err != nil {
if err = rawdb.WriteHeadHeaderHash(tx, hash); err != nil {
return err
}

View File

@ -1077,8 +1077,8 @@ func min(a, b uint64) uint64 {
// DeleteAncientBlocks - delete [1, to) old blocks after moving it to snapshots.
// keeps genesis in db: [1, to)
// doesn't delete Reciepts
// doesn't delete Canonical markers
// doesn't change sequnces of kv.EthTx and kv.NonCanonicalTxs
// doesn't delete Reciepts, Senders, Canonical markers, TotalDifficulty
func DeleteAncientBlocks(db kv.RwTx, blockTo uint64, blocksDeleteLimit int) error {
c, err := db.Cursor(kv.Headers)
if err != nil {
@ -1106,7 +1106,35 @@ func DeleteAncientBlocks(db kv.RwTx, blockTo uint64, blocksDeleteLimit int) erro
if n >= stopAtBlock { // [from, to)
break
}
if err := delBlock(db, k, true); err != nil {
canonicalHash, err := ReadCanonicalHash(db, n)
if err != nil {
return err
}
isCanonical := bytes.Equal(k[8:], canonicalHash[:])
b, err := ReadBodyForStorageByKey(db, k)
if err != nil {
return err
}
if b == nil {
return fmt.Errorf("DeleteAncientBlocks: block body not found for block %d", n)
}
txIDBytes := make([]byte, 8)
for txID := b.BaseTxId; txID < b.BaseTxId+uint64(b.TxAmount); txID++ {
binary.BigEndian.PutUint64(txIDBytes, txID)
bucket := kv.EthTx
if !isCanonical {
bucket = kv.NonCanonicalTxs
}
if err := db.Delete(bucket, txIDBytes, nil); err != nil {
return err
}
}
if err := db.Delete(kv.Headers, k, nil); err != nil {
return err
}
if err := db.Delete(kv.BlockBody, k, nil); err != nil {
return err
}
}
@ -1114,7 +1142,7 @@ func DeleteAncientBlocks(db kv.RwTx, blockTo uint64, blocksDeleteLimit int) erro
return nil
}
func lastKey(tx kv.Tx, table string) ([]byte, error) {
func LastKey(tx kv.Tx, table string) ([]byte, error) {
c, err := tx.Cursor(table)
if err != nil {
return nil, err
@ -1127,93 +1155,26 @@ func lastKey(tx kv.Tx, table string) ([]byte, error) {
return k, nil
}
// delBlock - low-level method to delete 1 block by key
// keeps genesis in db: [1, to)
// doesn't delete Reciepts
// doesn't delete Canonical markers
// doesn't delete TotalDifficulty
// keepSequence - can track decrement sequnces of Canonical and Non-Canonical txs
func delBlock(tx kv.RwTx, k []byte, keepSequence bool) error {
n := binary.BigEndian.Uint64(k)
canonicalHash, err := ReadCanonicalHash(tx, n)
func FirstKey(tx kv.Tx, table string) ([]byte, error) {
c, err := tx.Cursor(table)
if err != nil {
return err
return nil, err
}
isCanonical := bytes.Equal(k[8:], canonicalHash[:])
b, err := ReadBodyForStorageByKey(tx, k)
defer c.Close()
k, _, err := c.First()
if err != nil {
return err
return nil, err
}
if b == nil {
return fmt.Errorf("DeleteAncientBlocks: block body not found for block %d", n)
}
txIDBytes := make([]byte, 8)
for txID := b.BaseTxId; txID < b.BaseTxId+uint64(b.TxAmount); txID++ {
binary.BigEndian.PutUint64(txIDBytes, txID)
bucket := kv.EthTx
if !isCanonical {
bucket = kv.NonCanonicalTxs
}
if !keepSequence {
lastK, err := lastKey(tx, bucket)
if err != nil {
return err
}
seq, err := tx.ReadSequence(bucket)
if err != nil {
return err
}
if lastK != nil && binary.BigEndian.Uint64(lastK) > seq {
return fmt.Errorf("please delete blocks from newest to older. txnId: %d > %d", binary.BigEndian.Uint64(lastK), seq)
}
if seq > b.BaseTxId+uint64(b.TxAmount) {
return fmt.Errorf("please delete blocks from newest to older. seq: %d > %d+%d", seq, b.BaseTxId, uint64(b.TxAmount))
}
if err := ResetSequence(tx, bucket, b.BaseTxId); err != nil {
return err
}
}
if err := tx.Delete(bucket, txIDBytes, nil); err != nil {
return err
}
}
if err := tx.Delete(kv.Headers, k, nil); err != nil {
return err
}
if err := tx.Delete(kv.BlockBody, k, nil); err != nil {
return err
}
if err := tx.Delete(kv.Senders, k, nil); err != nil {
return err
}
if !keepSequence {
bucket := kv.EthTx
if !isCanonical {
bucket = kv.NonCanonicalTxs
}
lastK, err := lastKey(tx, bucket)
if err != nil {
return err
}
seq, err := tx.ReadSequence(bucket)
if err != nil {
return err
}
if lastK != nil && binary.BigEndian.Uint64(lastK) > seq {
return fmt.Errorf("end: please delete blocks from newest to older. txnId: %d > %d", binary.BigEndian.Uint64(lastK), seq)
}
if seq > b.BaseTxId+uint64(b.TxAmount) {
return fmt.Errorf("end: please delete blocks from newest to older. seq: %d > %d+%d", seq, b.BaseTxId, uint64(b.TxAmount))
}
}
return nil
return k, nil
}
// TruncateBlocks - delete block >= blockFrom
func TruncateBlocks(tx kv.RwTx, blockFrom uint64) error {
// does decrement sequnces of kv.EthTx and kv.NonCanonicalTxs
// doesn't delete Reciepts, Senders, Canonical markers, TotalDifficulty
func TruncateBlocks(ctx context.Context, tx kv.RwTx, blockFrom uint64) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
c, err := tx.Cursor(kv.Headers)
if err != nil {
return err
@ -1230,10 +1191,49 @@ func TruncateBlocks(tx kv.RwTx, blockFrom uint64) error {
if n < blockFrom { // [from, to)
break
}
if err := delBlock(tx, k, false); err != nil {
canonicalHash, err := ReadCanonicalHash(tx, n)
if err != nil {
return err
}
isCanonical := bytes.Equal(k[8:], canonicalHash[:])
b, err := ReadBodyForStorageByKey(tx, k)
if err != nil {
return err
}
if b != nil {
bucket := kv.EthTx
if !isCanonical {
bucket = kv.NonCanonicalTxs
}
if err := tx.ForEach(bucket, dbutils.EncodeBlockNumber(b.BaseTxId), func(k, _ []byte) error {
if err := tx.Delete(bucket, k, nil); err != nil {
return err
}
return nil
}); err != nil {
return err
}
if err := ResetSequence(tx, bucket, b.BaseTxId); err != nil {
return err
}
}
if err := tx.Delete(kv.Headers, k, nil); err != nil {
return err
}
if err := tx.Delete(kv.BlockBody, k, nil); err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-logEvery.C:
log.Info("TruncateBlocks", "block", n)
default:
}
}
return nil
}

View File

@ -18,6 +18,7 @@ package rawdb
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"math/big"
@ -158,7 +159,7 @@ func TestBlockStorage(t *testing.T) {
} else if entry.Hash() != block.Hash() {
t.Fatalf("Retrieved header mismatch: have %v, want %v", entry, block.Header())
}
if err := TruncateBlocks(tx, 2); err != nil {
if err := TruncateBlocks(context.Background(), tx, 2); err != nil {
t.Fatal(err)
}
if entry := ReadCanonicalBodyWithTransactions(tx, block.Hash(), block.NumberU64()); entry == nil {
@ -167,7 +168,7 @@ func TestBlockStorage(t *testing.T) {
t.Fatalf("Retrieved body mismatch: have %v, want %v", entry, block.Body())
}
// Delete the block and verify the execution
if err := TruncateBlocks(tx, block.NumberU64()); err != nil {
if err := TruncateBlocks(context.Background(), tx, block.NumberU64()); err != nil {
t.Fatal(err)
}
//if err := DeleteBlock(tx, block.Hash(), block.NumberU64()); err != nil {

View File

@ -110,7 +110,7 @@ func (s *PruneState) DoneAt(db kv.Putter, blockNum uint64) error {
}
// PruneTable has `limit` parameter to avoid too large data deletes per one sync cycle - better delete by small portions to reduce db.FreeList size
func PruneTable(tx kv.RwTx, table string, logPrefix string, pruneTo uint64, logEvery *time.Ticker, ctx context.Context, limit int) error {
func PruneTable(tx kv.RwTx, table string, pruneTo uint64, ctx context.Context, limit int) error {
c, err := tx.RwCursor(table)
if err != nil {
@ -133,8 +133,6 @@ func PruneTable(tx kv.RwTx, table string, logPrefix string, pruneTo uint64, logE
break
}
select {
case <-logEvery.C:
log.Info(fmt.Sprintf("[%s]", logPrefix), "table", table, "block", blockNum)
case <-ctx.Done():
return libcommon.ErrStopped
default:

View File

@ -587,11 +587,11 @@ func PruneExecutionStage(s *PruneState, tx kv.RwTx, cfg ExecuteBlockCfg, ctx con
}
if cfg.prune.Receipts.Enabled() {
if err = PruneTable(tx, kv.Receipts, logPrefix, cfg.prune.Receipts.PruneTo(s.ForwardProgress), logEvery, ctx, math.MaxInt32); err != nil {
if err = PruneTable(tx, kv.Receipts, cfg.prune.Receipts.PruneTo(s.ForwardProgress), ctx, math.MaxInt32); err != nil {
return err
}
// LogIndex.Prune will read everything what not pruned here
if err = PruneTable(tx, kv.Log, logPrefix, cfg.prune.Receipts.PruneTo(s.ForwardProgress), logEvery, ctx, math.MaxInt32); err != nil {
if err = PruneTable(tx, kv.Log, cfg.prune.Receipts.PruneTo(s.ForwardProgress), ctx, math.MaxInt32); err != nil {
return err
}
}

View File

@ -1116,14 +1116,10 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
}
}
if s.BlockNumber < 2 { // allow genesis
if s.BlockNumber < cfg.snapshots.BlocksAvailable() { // allow genesis
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
//tx.ClearBucket(kv.HeaderCanonical)
//tx.ClearBucket(kv.HeaderTD)
//tx.ClearBucket(kv.HeaderNumber)
// fill some small tables from snapshots, in future we may store this data in snapshots also, but
// for now easier just store them in db
td := big.NewInt(0)
@ -1165,10 +1161,16 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
if !ok {
return fmt.Errorf("snapshot not found for block: %d", cfg.snapshots.BlocksAvailable())
}
}
// Add last headers from snapshots to HeaderDownloader (as persistent links)
if s.BlockNumber < cfg.snapshots.BlocksAvailable() {
if err := s.Update(tx, cfg.snapshots.BlocksAvailable()); err != nil {
return err
}
canonicalHash, err := cfg.blockReader.CanonicalHash(ctx, tx, cfg.snapshots.BlocksAvailable())
if err != nil {
return err
}
if err = rawdb.WriteHeadHeaderHash(tx, canonicalHash); err != nil {
return err
}
if err := cfg.hd.AddHeaderFromSnapshot(tx, cfg.snapshots.BlocksAvailable(), cfg.blockReader); err != nil {
return err
}

View File

@ -373,12 +373,23 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co
defer tx.Rollback()
}
// With snapsync - can prune old data only after snapshot for this data created: CanDeleteTo()
if cfg.blockRetire.Snapshots() != nil && cfg.blockRetire.Snapshots().Cfg().Enabled {
if err := retireBlocks(s, tx, cfg, ctx); err != nil {
return fmt.Errorf("retireBlocks: %w", err)
if !cfg.blockRetire.Snapshots().Cfg().KeepBlocks {
canDeleteTo := snapshotsync.CanDeleteTo(s.ForwardProgress, cfg.blockRetire.Snapshots())
if err := rawdb.DeleteAncientBlocks(tx, canDeleteTo, 1_000); err != nil {
return nil
}
if err = PruneTable(tx, kv.Senders, canDeleteTo, ctx, 1_000); err != nil {
return err
}
}
if err := retireBlocksInSingleBackgroundThread(s, cfg, ctx); err != nil {
return fmt.Errorf("retireBlocksInSingleBackgroundThread: %w", err)
}
} else if cfg.prune.TxIndex.Enabled() {
if err = PruneTable(tx, kv.Senders, s.LogPrefix(), to, logEvery, ctx, 1_000); err != nil {
if err = PruneTable(tx, kv.Senders, to, ctx, 1_000); err != nil {
return err
}
}
@ -391,15 +402,7 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co
return nil
}
func retireBlocks(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Context) (err error) {
// delete portion of old blocks in any case
if !cfg.blockRetire.Snapshots().Cfg().KeepBlocks {
canDeleteTo := snapshotsync.CanDeleteTo(s.ForwardProgress, cfg.blockRetire.Snapshots())
if err := rawdb.DeleteAncientBlocks(tx, canDeleteTo, 1_000); err != nil {
return nil
}
}
func retireBlocksInSingleBackgroundThread(s *PruneState, cfg SendersCfg, ctx context.Context) (err error) {
// if something already happens in background - noop
if cfg.blockRetire.Working() {
return nil