mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-24 12:37:16 +00:00
Fix snapshot saving and add an interval setting. (#181)
This commit is contained in:
parent
21c981eb84
commit
f4cda8ba4c
@ -55,6 +55,8 @@ var start = flag.Int("start", 0, "number of data points to skip when making a ch
|
||||
var window = flag.Int("window", 1024, "size of the window for moving average")
|
||||
var triesize = flag.Int("triesize", 1024*1024, "maximum number of nodes in the state trie")
|
||||
var preroot = flag.Bool("preroot", false, "Attempt to compute hash of the trie without modifying it")
|
||||
var snapshotInterval = flag.Uint64("snapshotInterval", 0, "how often to take snapshots (0 - never, 1 - every block, 1000 - every 1000th block, etc)")
|
||||
var snapshotFrom = flag.Uint64("snapshotFrom", 0, "from which block to start snapshots")
|
||||
|
||||
func check(e error) {
|
||||
if e != nil {
|
||||
@ -1674,7 +1676,7 @@ func main() {
|
||||
//nakedAccountChart()
|
||||
//specExecChart1()
|
||||
if *action == "stateless" {
|
||||
stateless(*chaindata, *statefile, *triesize, *preroot)
|
||||
stateless(*chaindata, *statefile, *triesize, *preroot, *snapshotInterval, *snapshotFrom)
|
||||
}
|
||||
if *action == "stateless_chart" {
|
||||
stateless_chart_key_values("/Users/alexeyakhunov/mygit/go-ethereum/st_1/stateless.csv", []int{21, 20, 19, 18}, "breakdown.png", 2800000, 1)
|
||||
|
@ -110,121 +110,103 @@ func constructSnapshot(ethDb ethdb.Database, blockNum uint64) {
|
||||
check(err)
|
||||
}
|
||||
|
||||
func save_snapshot(db *bolt.DB, filename string) {
|
||||
func copyBucket(bucketName []byte, fromTx *bolt.Tx, toDB *bolt.DB) error {
|
||||
toTx, err := toDB.Begin(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
toBucket, err := toTx.CreateBucket(bucketName, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
count := 0
|
||||
|
||||
printStats := func() {
|
||||
fmt.Printf("\r -- commited %d records for bucket: '%s'...", count, string(bucketName))
|
||||
}
|
||||
|
||||
if fromBucket := fromTx.Bucket(bucketName); fromBucket != nil {
|
||||
defer printStats()
|
||||
|
||||
fromCursor := fromBucket.Cursor()
|
||||
|
||||
for k, v := fromCursor.First(); k != nil; k, v = fromCursor.Next() {
|
||||
err = toBucket.Put(common.CopyBytes(k), common.CopyBytes(v))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
count++
|
||||
|
||||
if count%100000 == 0 {
|
||||
err = toTx.Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
printStats()
|
||||
|
||||
toTx, err = toDB.Begin(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
toBucket = toTx.Bucket(bucketName)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
fmt.Printf(" -- nothing to copy for the bucket name: '%s'...", string(bucketName))
|
||||
}
|
||||
|
||||
return toTx.Commit()
|
||||
}
|
||||
|
||||
func copyDatabase(fromDB *bolt.DB, toDB *bolt.DB) error {
|
||||
return fromDB.View(func(tx *bolt.Tx) error {
|
||||
fmt.Printf(" - copying AccountsBucket...\n")
|
||||
if err := copyBucket(dbutils.AccountsBucket, tx, toDB); err != nil {
|
||||
fmt.Println("FAIL")
|
||||
return err
|
||||
}
|
||||
fmt.Println("OK")
|
||||
|
||||
fmt.Printf(" - copying StorageBucket...\n")
|
||||
if err := copyBucket(dbutils.StorageBucket, tx, toDB); err != nil {
|
||||
fmt.Println("FAIL")
|
||||
return err
|
||||
}
|
||||
fmt.Println("OK")
|
||||
|
||||
fmt.Printf(" - copying CodeBucket...\n")
|
||||
if err := copyBucket(dbutils.CodeBucket, tx, toDB); err != nil {
|
||||
fmt.Println("FAIL")
|
||||
return err
|
||||
}
|
||||
fmt.Println("OK")
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func saveSnapshot(db *bolt.DB, filename string) {
|
||||
fmt.Printf("Saving snapshot to %s\n", filename)
|
||||
|
||||
diskDb, err := bolt.Open(filename, 0600, &bolt.Options{})
|
||||
check(err)
|
||||
defer diskDb.Close()
|
||||
diskTx, err := diskDb.Begin(true)
|
||||
check(err)
|
||||
bDisk, err := diskTx.CreateBucket(dbutils.AccountsBucket, true)
|
||||
check(err)
|
||||
sbDisk, err := diskTx.CreateBucket(dbutils.StorageBucket, true)
|
||||
check(err)
|
||||
count := 0
|
||||
err = db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(dbutils.AccountsBucket)
|
||||
c := b.Cursor()
|
||||
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||
if err := bDisk.Put(common.CopyBytes(k), common.CopyBytes(v)); err != nil {
|
||||
return err
|
||||
}
|
||||
count++
|
||||
if count%100000 == 0 {
|
||||
if err := diskTx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("Commited %d records\n", count)
|
||||
diskTx, err = diskDb.Begin(true)
|
||||
bDisk = diskTx.Bucket(dbutils.AccountsBucket)
|
||||
sbDisk = diskTx.Bucket(dbutils.StorageBucket)
|
||||
}
|
||||
}
|
||||
b = tx.Bucket(dbutils.StorageBucket)
|
||||
c = b.Cursor()
|
||||
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||
if err := sbDisk.Put(common.CopyBytes(k), common.CopyBytes(v)); err != nil {
|
||||
return err
|
||||
}
|
||||
count++
|
||||
if count%100000 == 0 {
|
||||
if err := diskTx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("Commited %d records\n", count)
|
||||
diskTx, err = diskDb.Begin(true)
|
||||
bDisk = diskTx.Bucket(dbutils.AccountsBucket)
|
||||
sbDisk = diskTx.Bucket(dbutils.StorageBucket)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
check(err)
|
||||
err = diskTx.Commit()
|
||||
|
||||
err = copyDatabase(db, diskDb)
|
||||
check(err)
|
||||
}
|
||||
|
||||
func load_snapshot(db *bolt.DB, filename string) {
|
||||
func loadSnapshot(db *bolt.DB, filename string) {
|
||||
fmt.Printf("Loading snapshot from %s\n", filename)
|
||||
diskDb, err := bolt.Open(filename, 0600, &bolt.Options{})
|
||||
check(err)
|
||||
tx, err := db.Begin(true)
|
||||
defer diskDb.Close()
|
||||
|
||||
err = copyDatabase(diskDb, db)
|
||||
check(err)
|
||||
b, err := tx.CreateBucket(dbutils.AccountsBucket, true)
|
||||
check(err)
|
||||
sb, err := tx.CreateBucket(dbutils.StorageBucket, true)
|
||||
check(err)
|
||||
count := 0
|
||||
err = diskDb.View(func(txDisk *bolt.Tx) error {
|
||||
bDisk := txDisk.Bucket(dbutils.AccountsBucket)
|
||||
cDisk := bDisk.Cursor()
|
||||
for k, v := cDisk.First(); k != nil; k, v = cDisk.Next() {
|
||||
if err := b.Put(common.CopyBytes(k), common.CopyBytes(v)); err != nil {
|
||||
return err
|
||||
}
|
||||
count++
|
||||
if count%100000 == 0 {
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("Committed %d records\n", count)
|
||||
var err error
|
||||
tx, err = db.Begin(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b = tx.Bucket(dbutils.AccountsBucket)
|
||||
sb = tx.Bucket(dbutils.StorageBucket)
|
||||
}
|
||||
}
|
||||
sbDisk := txDisk.Bucket(dbutils.StorageBucket)
|
||||
count = 0
|
||||
cDisk = sbDisk.Cursor()
|
||||
for k, v := cDisk.First(); k != nil; k, v = cDisk.Next() {
|
||||
if err := sb.Put(common.CopyBytes(k), common.CopyBytes(v)); err != nil {
|
||||
return err
|
||||
}
|
||||
count++
|
||||
if count%100000 == 0 {
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Printf("Committed %d records\n", count)
|
||||
var err error
|
||||
tx, err = db.Begin(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b = tx.Bucket(dbutils.AccountsBucket)
|
||||
sb = tx.Bucket(dbutils.StorageBucket)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
check(err)
|
||||
err = tx.Commit()
|
||||
check(err)
|
||||
diskDb.Close()
|
||||
}
|
||||
|
||||
func loadCodes(db *bolt.DB, codeDb ethdb.Database) error {
|
||||
@ -332,6 +314,7 @@ func checkRoots(stateDb ethdb.Database, db *bolt.DB, rootHash common.Hash, block
|
||||
r := trie.NewResolver(0, true, blockNum)
|
||||
key := []byte{}
|
||||
req := t.NewResolveRequest(nil, key, 0, rootHash[:])
|
||||
fmt.Printf("new resolve request for root block with hash %x\n", rootHash)
|
||||
r.AddRequest(req)
|
||||
var err error
|
||||
if err = r.ResolveWithDb(stateDb, blockNum); err != nil {
|
||||
@ -401,7 +384,7 @@ func stateSnapshot() error {
|
||||
stateDb, db := ethdb.NewMemDatabase2()
|
||||
defer stateDb.Close()
|
||||
if _, err := os.Stat("statedb0"); err == nil {
|
||||
load_snapshot(db, "statedb0")
|
||||
loadSnapshot(db, "statedb0")
|
||||
if err := loadCodes(db, ethDb); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -106,7 +106,7 @@ func writeStats(w io.Writer, blockNum uint64, blockProof trie.BlockProof) {
|
||||
}
|
||||
*/
|
||||
|
||||
func stateless(chaindata string, statefile string, triesize int, tryPreRoot bool) {
|
||||
func stateless(chaindata string, statefile string, triesize int, tryPreRoot bool, interval uint64, ignoreOlderThan uint64) {
|
||||
state.MaxTrieCacheGen = uint32(triesize)
|
||||
startTime := time.Now()
|
||||
sigs := make(chan os.Signal, 1)
|
||||
@ -288,21 +288,32 @@ func stateless(chaindata string, statefile string, triesize int, tryPreRoot bool
|
||||
fmt.Printf("Commiting block %d failed: %v", blockNum, err)
|
||||
return
|
||||
}
|
||||
if batch.BatchSize() >= 100000 {
|
||||
|
||||
willSnapshot := interval > 0 && blockNum > 0 && blockNum >= ignoreOlderThan && blockNum%interval == 0
|
||||
|
||||
if batch.BatchSize() >= 100000 || willSnapshot {
|
||||
if _, err := batch.Commit(); err != nil {
|
||||
fmt.Printf("Failed to commit batch: %v\n", err)
|
||||
return
|
||||
}
|
||||
tds.PruneTries(false)
|
||||
}
|
||||
if (blockNum > 2000000 && blockNum%500000 == 0) || (blockNum > 4000000 && blockNum%100000 == 0) {
|
||||
|
||||
if willSnapshot {
|
||||
// Snapshots of the state will be written to the same directory as the state file
|
||||
save_snapshot(db, fmt.Sprintf("%s_%d", statefile, blockNum))
|
||||
fmt.Printf("\nSaving snapshot at block %d, hash %x\n", blockNum, block.Root())
|
||||
saveSnapshot(db, fmt.Sprintf("%s_%d", statefile, blockNum))
|
||||
}
|
||||
|
||||
preRoot = header.Root
|
||||
blockNum++
|
||||
|
||||
if blockNum%1000 == 0 {
|
||||
fmt.Printf("Processed %d blocks\n", blockNum)
|
||||
// overwrite terminal line, if no snapshot was made and not the first line
|
||||
if blockNum > 0 && !willSnapshot {
|
||||
fmt.Printf("\r")
|
||||
}
|
||||
fmt.Printf("Processed %d blocks", blockNum)
|
||||
}
|
||||
// Check for interrupts
|
||||
select {
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"io"
|
||||
"strconv"
|
||||
|
||||
"github.com/ledgerwatch/turbo-geth/common"
|
||||
"github.com/ledgerwatch/turbo-geth/common/pool"
|
||||
)
|
||||
|
||||
@ -181,7 +182,7 @@ func loadValue(br *bufio.Reader) (valueNode, error) {
|
||||
|
||||
func Load(r io.Reader) (*Trie, error) {
|
||||
br := bufio.NewReader(r)
|
||||
t := new(Trie)
|
||||
t := NewTestRLPTrie(common.Hash{})
|
||||
var err error
|
||||
t.root, err = loadNode(br)
|
||||
return t, err
|
||||
|
Loading…
Reference in New Issue
Block a user