From f4cda8ba4c56147af3cafdb237bfbb471d9ced86 Mon Sep 17 00:00:00 2001 From: Igor Mandrigin Date: Thu, 21 Nov 2019 14:36:24 +0100 Subject: [PATCH] Fix snapshot saving and add an interval setting. (#181) --- cmd/state/state.go | 4 +- cmd/state/state_snapshot.go | 193 ++++++++++++++++-------------------- cmd/state/stateless.go | 21 +++- trie/debug.go | 3 +- 4 files changed, 109 insertions(+), 112 deletions(-) diff --git a/cmd/state/state.go b/cmd/state/state.go index 356e3dc07..86eaf8b7e 100644 --- a/cmd/state/state.go +++ b/cmd/state/state.go @@ -55,6 +55,8 @@ var start = flag.Int("start", 0, "number of data points to skip when making a ch var window = flag.Int("window", 1024, "size of the window for moving average") var triesize = flag.Int("triesize", 1024*1024, "maximum number of nodes in the state trie") var preroot = flag.Bool("preroot", false, "Attempt to compute hash of the trie without modifying it") +var snapshotInterval = flag.Uint64("snapshotInterval", 0, "how often to take snapshots (0 - never, 1 - every block, 1000 - every 1000th block, etc)") +var snapshotFrom = flag.Uint64("snapshotFrom", 0, "from which block to start snapshots") func check(e error) { if e != nil { @@ -1674,7 +1676,7 @@ func main() { //nakedAccountChart() //specExecChart1() if *action == "stateless" { - stateless(*chaindata, *statefile, *triesize, *preroot) + stateless(*chaindata, *statefile, *triesize, *preroot, *snapshotInterval, *snapshotFrom) } if *action == "stateless_chart" { stateless_chart_key_values("/Users/alexeyakhunov/mygit/go-ethereum/st_1/stateless.csv", []int{21, 20, 19, 18}, "breakdown.png", 2800000, 1) diff --git a/cmd/state/state_snapshot.go b/cmd/state/state_snapshot.go index 5f2c9d2cb..226cf883d 100644 --- a/cmd/state/state_snapshot.go +++ b/cmd/state/state_snapshot.go @@ -110,121 +110,103 @@ func constructSnapshot(ethDb ethdb.Database, blockNum uint64) { check(err) } -func save_snapshot(db *bolt.DB, filename string) { +func copyBucket(bucketName []byte, fromTx *bolt.Tx, toDB *bolt.DB) error { + toTx, err := toDB.Begin(true) + if err != nil { + return err + } + + toBucket, err := toTx.CreateBucket(bucketName, true) + if err != nil { + return err + } + + count := 0 + + printStats := func() { + fmt.Printf("\r -- commited %d records for bucket: '%s'...", count, string(bucketName)) + } + + if fromBucket := fromTx.Bucket(bucketName); fromBucket != nil { + defer printStats() + + fromCursor := fromBucket.Cursor() + + for k, v := fromCursor.First(); k != nil; k, v = fromCursor.Next() { + err = toBucket.Put(common.CopyBytes(k), common.CopyBytes(v)) + if err != nil { + return err + } + count++ + + if count%100000 == 0 { + err = toTx.Commit() + if err != nil { + return err + } + + printStats() + + toTx, err = toDB.Begin(true) + if err != nil { + return err + } + toBucket = toTx.Bucket(bucketName) + } + } + } else { + fmt.Printf(" -- nothing to copy for the bucket name: '%s'...", string(bucketName)) + } + + return toTx.Commit() +} + +func copyDatabase(fromDB *bolt.DB, toDB *bolt.DB) error { + return fromDB.View(func(tx *bolt.Tx) error { + fmt.Printf(" - copying AccountsBucket...\n") + if err := copyBucket(dbutils.AccountsBucket, tx, toDB); err != nil { + fmt.Println("FAIL") + return err + } + fmt.Println("OK") + + fmt.Printf(" - copying StorageBucket...\n") + if err := copyBucket(dbutils.StorageBucket, tx, toDB); err != nil { + fmt.Println("FAIL") + return err + } + fmt.Println("OK") + + fmt.Printf(" - copying CodeBucket...\n") + if err := copyBucket(dbutils.CodeBucket, tx, toDB); err != nil { + fmt.Println("FAIL") + return err + } + fmt.Println("OK") + + return nil + }) +} + +func saveSnapshot(db *bolt.DB, filename string) { fmt.Printf("Saving snapshot to %s\n", filename) + diskDb, err := bolt.Open(filename, 0600, &bolt.Options{}) check(err) defer diskDb.Close() - diskTx, err := diskDb.Begin(true) - check(err) - bDisk, err := diskTx.CreateBucket(dbutils.AccountsBucket, true) - check(err) - sbDisk, err := diskTx.CreateBucket(dbutils.StorageBucket, true) - check(err) - count := 0 - err = db.View(func(tx *bolt.Tx) error { - b := tx.Bucket(dbutils.AccountsBucket) - c := b.Cursor() - for k, v := c.First(); k != nil; k, v = c.Next() { - if err := bDisk.Put(common.CopyBytes(k), common.CopyBytes(v)); err != nil { - return err - } - count++ - if count%100000 == 0 { - if err := diskTx.Commit(); err != nil { - return err - } - fmt.Printf("Commited %d records\n", count) - diskTx, err = diskDb.Begin(true) - bDisk = diskTx.Bucket(dbutils.AccountsBucket) - sbDisk = diskTx.Bucket(dbutils.StorageBucket) - } - } - b = tx.Bucket(dbutils.StorageBucket) - c = b.Cursor() - for k, v := c.First(); k != nil; k, v = c.Next() { - if err := sbDisk.Put(common.CopyBytes(k), common.CopyBytes(v)); err != nil { - return err - } - count++ - if count%100000 == 0 { - if err := diskTx.Commit(); err != nil { - return err - } - fmt.Printf("Commited %d records\n", count) - diskTx, err = diskDb.Begin(true) - bDisk = diskTx.Bucket(dbutils.AccountsBucket) - sbDisk = diskTx.Bucket(dbutils.StorageBucket) - } - } - return nil - }) - check(err) - err = diskTx.Commit() + + err = copyDatabase(db, diskDb) check(err) } -func load_snapshot(db *bolt.DB, filename string) { +func loadSnapshot(db *bolt.DB, filename string) { fmt.Printf("Loading snapshot from %s\n", filename) diskDb, err := bolt.Open(filename, 0600, &bolt.Options{}) check(err) - tx, err := db.Begin(true) + defer diskDb.Close() + + err = copyDatabase(diskDb, db) check(err) - b, err := tx.CreateBucket(dbutils.AccountsBucket, true) - check(err) - sb, err := tx.CreateBucket(dbutils.StorageBucket, true) - check(err) - count := 0 - err = diskDb.View(func(txDisk *bolt.Tx) error { - bDisk := txDisk.Bucket(dbutils.AccountsBucket) - cDisk := bDisk.Cursor() - for k, v := cDisk.First(); k != nil; k, v = cDisk.Next() { - if err := b.Put(common.CopyBytes(k), common.CopyBytes(v)); err != nil { - return err - } - count++ - if count%100000 == 0 { - if err := tx.Commit(); err != nil { - return err - } - fmt.Printf("Committed %d records\n", count) - var err error - tx, err = db.Begin(true) - if err != nil { - return err - } - b = tx.Bucket(dbutils.AccountsBucket) - sb = tx.Bucket(dbutils.StorageBucket) - } - } - sbDisk := txDisk.Bucket(dbutils.StorageBucket) - count = 0 - cDisk = sbDisk.Cursor() - for k, v := cDisk.First(); k != nil; k, v = cDisk.Next() { - if err := sb.Put(common.CopyBytes(k), common.CopyBytes(v)); err != nil { - return err - } - count++ - if count%100000 == 0 { - if err := tx.Commit(); err != nil { - return err - } - fmt.Printf("Committed %d records\n", count) - var err error - tx, err = db.Begin(true) - if err != nil { - return err - } - b = tx.Bucket(dbutils.AccountsBucket) - sb = tx.Bucket(dbutils.StorageBucket) - } - } - return nil - }) - check(err) - err = tx.Commit() - check(err) - diskDb.Close() } func loadCodes(db *bolt.DB, codeDb ethdb.Database) error { @@ -332,6 +314,7 @@ func checkRoots(stateDb ethdb.Database, db *bolt.DB, rootHash common.Hash, block r := trie.NewResolver(0, true, blockNum) key := []byte{} req := t.NewResolveRequest(nil, key, 0, rootHash[:]) + fmt.Printf("new resolve request for root block with hash %x\n", rootHash) r.AddRequest(req) var err error if err = r.ResolveWithDb(stateDb, blockNum); err != nil { @@ -401,7 +384,7 @@ func stateSnapshot() error { stateDb, db := ethdb.NewMemDatabase2() defer stateDb.Close() if _, err := os.Stat("statedb0"); err == nil { - load_snapshot(db, "statedb0") + loadSnapshot(db, "statedb0") if err := loadCodes(db, ethDb); err != nil { return err } diff --git a/cmd/state/stateless.go b/cmd/state/stateless.go index c58181ada..b8487c550 100644 --- a/cmd/state/stateless.go +++ b/cmd/state/stateless.go @@ -106,7 +106,7 @@ func writeStats(w io.Writer, blockNum uint64, blockProof trie.BlockProof) { } */ -func stateless(chaindata string, statefile string, triesize int, tryPreRoot bool) { +func stateless(chaindata string, statefile string, triesize int, tryPreRoot bool, interval uint64, ignoreOlderThan uint64) { state.MaxTrieCacheGen = uint32(triesize) startTime := time.Now() sigs := make(chan os.Signal, 1) @@ -288,21 +288,32 @@ func stateless(chaindata string, statefile string, triesize int, tryPreRoot bool fmt.Printf("Commiting block %d failed: %v", blockNum, err) return } - if batch.BatchSize() >= 100000 { + + willSnapshot := interval > 0 && blockNum > 0 && blockNum >= ignoreOlderThan && blockNum%interval == 0 + + if batch.BatchSize() >= 100000 || willSnapshot { if _, err := batch.Commit(); err != nil { fmt.Printf("Failed to commit batch: %v\n", err) return } tds.PruneTries(false) } - if (blockNum > 2000000 && blockNum%500000 == 0) || (blockNum > 4000000 && blockNum%100000 == 0) { + + if willSnapshot { // Snapshots of the state will be written to the same directory as the state file - save_snapshot(db, fmt.Sprintf("%s_%d", statefile, blockNum)) + fmt.Printf("\nSaving snapshot at block %d, hash %x\n", blockNum, block.Root()) + saveSnapshot(db, fmt.Sprintf("%s_%d", statefile, blockNum)) } + preRoot = header.Root blockNum++ + if blockNum%1000 == 0 { - fmt.Printf("Processed %d blocks\n", blockNum) + // overwrite terminal line, if no snapshot was made and not the first line + if blockNum > 0 && !willSnapshot { + fmt.Printf("\r") + } + fmt.Printf("Processed %d blocks", blockNum) } // Check for interrupts select { diff --git a/trie/debug.go b/trie/debug.go index 704627bb4..d5934c6a7 100644 --- a/trie/debug.go +++ b/trie/debug.go @@ -26,6 +26,7 @@ import ( "io" "strconv" + "github.com/ledgerwatch/turbo-geth/common" "github.com/ledgerwatch/turbo-geth/common/pool" ) @@ -181,7 +182,7 @@ func loadValue(br *bufio.Reader) (valueNode, error) { func Load(r io.Reader) (*Trie, error) { br := bufio.NewReader(r) - t := new(Trie) + t := NewTestRLPTrie(common.Hash{}) var err error t.root, err = loadNode(br) return t, err