Utilities to fix DB corruption (#2214)

* Detect broken ancestry

* Print last canonical headers

* Add backExec

* Cleanup

* Add unwind util

* Add trimTxs utility

* Actually delete tx records

* Avoid infinite loop

* Change strategy

* Close cursor after commmit

* Fix RwBegin issue

* 10m records per transaction

Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro.local>
This commit is contained in:
ledgerwatch 2021-06-21 21:52:51 +01:00 committed by GitHub
parent 7d1e17712f
commit a79ed2fe0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1804,6 +1804,212 @@ func advanceExec(chaindata string) error {
return nil
}
func backExec(chaindata string) error {
db := kv2.MustOpenKV(chaindata)
defer db.Close()
tx, err := db.BeginRw(context.Background())
if err != nil {
return err
}
defer tx.Rollback()
stageExec, err := stages.GetStageProgress(tx, stages.Execution)
if err != nil {
return err
}
log.Info("Stage exec", "progress", stageExec)
if err = stages.SaveStageProgress(tx, stages.Execution, stageExec-1); err != nil {
return err
}
stageExec, err = stages.GetStageProgress(tx, stages.Execution)
if err != nil {
return err
}
log.Info("Stage exec", "changed to", stageExec)
if err = tx.Commit(); err != nil {
return err
}
return nil
}
func unwind(chaindata string, block uint64) error {
db := kv2.MustOpenKV(chaindata)
defer db.Close()
tx, err := db.BeginRw(context.Background())
if err != nil {
return err
}
defer tx.Rollback()
log.Info("Unwinding to", "block", block)
if err = stages.SaveStageUnwind(tx, stages.Headers, block); err != nil {
return err
}
if err = stages.SaveStageUnwind(tx, stages.BlockHashes, block); err != nil {
return err
}
if err = stages.SaveStageUnwind(tx, stages.Bodies, block); err != nil {
return err
}
if err = stages.SaveStageUnwind(tx, stages.Senders, block); err != nil {
return err
}
if err = stages.SaveStageUnwind(tx, stages.Execution, block); err != nil {
return err
}
if err = stages.SaveStageUnwind(tx, stages.HashState, block); err != nil {
return err
}
if err = stages.SaveStageUnwind(tx, stages.IntermediateHashes, block); err != nil {
return err
}
if err = stages.SaveStageUnwind(tx, stages.CallTraces, block); err != nil {
return err
}
if err = stages.SaveStageUnwind(tx, stages.AccountHistoryIndex, block); err != nil {
return err
}
if err = stages.SaveStageUnwind(tx, stages.StorageHistoryIndex, block); err != nil {
return err
}
if err = stages.SaveStageUnwind(tx, stages.LogIndex, block); err != nil {
return err
}
if err = stages.SaveStageUnwind(tx, stages.TxLookup, block); err != nil {
return err
}
if err = stages.SaveStageUnwind(tx, stages.Finish, block); err != nil {
return err
}
if err = stages.SaveStageUnwind(tx, stages.TxPool, block); err != nil {
return err
}
if err = tx.Commit(); err != nil {
return err
}
return nil
}
func fixState(chaindata string) error {
kv := kv2.MustOpenKV(chaindata)
defer kv.Close()
tx, err := kv.BeginRw(context.Background())
if err != nil {
return err
}
defer tx.Rollback()
c, err1 := tx.RwCursor(dbutils.HeaderCanonicalBucket)
if err1 != nil {
return err1
}
defer c.Close()
var prevHeaderKey [40]byte
var k, v []byte
for k, v, err = c.First(); err == nil && k != nil; k, v, err = c.Next() {
var headerKey [40]byte
copy(headerKey[:], k)
copy(headerKey[8:], v)
hv, herr := tx.GetOne(dbutils.HeadersBucket, headerKey[:])
if herr != nil {
return herr
}
if hv == nil {
return fmt.Errorf("missing header record for %x", headerKey)
}
var header types.Header
if err = rlp.DecodeBytes(hv, &header); err != nil {
return fmt.Errorf("decoding header from %x: %v", v, err)
}
if header.Number.Uint64() > 1 {
var parentK [40]byte
binary.BigEndian.PutUint64(parentK[:], header.Number.Uint64()-1)
copy(parentK[8:], header.ParentHash[:])
if !bytes.Equal(parentK[:], prevHeaderKey[:]) {
fmt.Printf("broken ancestry from %d %x (parent hash %x): prevKey %x\n", header.Number.Uint64(), v, header.ParentHash, prevHeaderKey)
}
}
copy(prevHeaderKey[:], headerKey[:])
}
if err != nil {
return err
}
return tx.Commit()
}
func trimTxs(chaindata string) error {
db := kv2.MustOpen(chaindata).RwKV()
defer db.Close()
tx, err := db.BeginRw(context.Background())
if err != nil {
return err
}
defer tx.Rollback()
lastTxId, err := tx.ReadSequence(dbutils.EthTx)
if err != nil {
return err
}
txs, err1 := tx.RwCursor(dbutils.EthTx)
if err1 != nil {
return err1
}
defer txs.Close()
bodies, err2 := tx.Cursor(dbutils.BlockBodyPrefix)
if err2 != nil {
return err
}
defer bodies.Close()
toDelete := roaring64.New()
toDelete.AddRange(0, lastTxId)
// Exclude transaction that are used, from the range
for k, v, err := bodies.First(); k != nil; k, v, err = bodies.Next() {
if err != nil {
return err
}
var body types.BodyForStorage
if err = rlp.DecodeBytes(v, &body); err != nil {
return err
}
// Remove from the map
toDelete.RemoveRange(body.BaseTxId, body.BaseTxId+uint64(body.TxAmount))
}
fmt.Printf("Number of tx records to delete: %d\n", toDelete.GetCardinality())
iter := toDelete.Iterator()
for {
var deleted int
for iter.HasNext() {
txId := iter.Next()
var key [8]byte
binary.BigEndian.PutUint64(key[:], txId)
if err = txs.Delete(key[:], nil); err != nil {
return err
}
deleted++
if deleted >= 10_000_000 {
break
}
}
if deleted == 0 {
fmt.Printf("Nothing more to delete\n")
break
}
fmt.Printf("Committing after deleting %d records\n", deleted)
if err = tx.Commit(); err != nil {
return err
}
txs.Close()
tx, err = db.BeginRw(context.Background())
if err != nil {
return err
}
defer tx.Rollback()
txs, err = tx.RwCursor(dbutils.EthTx)
if err != nil {
return err
}
defer txs.Close()
}
return nil
}
func main() {
flag.Parse()
@ -1945,6 +2151,18 @@ func main() {
case "advanceExec":
err = advanceExec(*chaindata)
case "backExec":
err = backExec(*chaindata)
case "fixState":
err = fixState(*chaindata)
case "unwind":
err = unwind(*chaindata, uint64(*block))
case "trimTxs":
err = trimTxs(*chaindata)
}
if err != nil {