From c45a710ce69b8f886cb3d22f0219c2f65542ad33 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Tue, 8 Sep 2020 14:28:37 +0700 Subject: [PATCH] Warmup logs, less overhead, warmup code bucket (#1054) * warmup logs and less overhead. * warmup logs and less overhead. * move WarmUp to common func --- eth/stagedsync/stage_execute.go | 17 ++++++----------- ethdb/kv_abstract.go | 2 ++ ethdb/kv_bolt.go | 1 + ethdb/kv_lmdb.go | 8 ++++++++ ethdb/kv_remote.go | 1 + ethdb/object_db.go | 26 ++++++++++++++++++++++++++ 6 files changed, 44 insertions(+), 11 deletions(-) diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index c20024b20..5e02e7cc2 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -103,19 +103,14 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig if warmup { log.Info("Running a warmup...") - count := 0 - if err := stateDB.Walk(dbutils.PlainStateBucket, nil, 0, func(_, _ []byte) (bool, error) { - if err := common.Stopped(quit); err != nil { - return false, nil - } - count++ - if count%10000000 == 0 { - log.Info("Warmed up", "keys", count) - } - return true, nil - }); err != nil { + if err := ethdb.WarmUp(tx.(ethdb.HasTx).Tx(), dbutils.PlainStateBucket, logEvery, quit); err != nil { return err } + + if err := ethdb.WarmUp(tx.(ethdb.HasTx).Tx(), dbutils.CodeBucket, logEvery, quit); err != nil { + return err + } + warmup = false log.Info("Warm up done.") } diff --git a/ethdb/kv_abstract.go b/ethdb/kv_abstract.go index 618233a55..7d9bdba03 100644 --- a/ethdb/kv_abstract.go +++ b/ethdb/kv_abstract.go @@ -132,6 +132,8 @@ type Cursor interface { // new data is the same size as the old. Otherwise it will simply // perform a delete of the old record followed by an insert. PutCurrent(key, value []byte) error + + Count() (uint64, error) // Count - fast way to calculate amount of keys in bucket. It counts all keys even if Prefix was set. } type CursorDupSort interface { diff --git a/ethdb/kv_bolt.go b/ethdb/kv_bolt.go index 8060b796e..370bad62c 100644 --- a/ethdb/kv_bolt.go +++ b/ethdb/kv_bolt.go @@ -306,6 +306,7 @@ func (c *boltCursor) PutCurrent(key, value []byte) error { panic("not func (c *boltCursor) Current() ([]byte, []byte, error) { panic("not supported") } func (c *boltCursor) Last() (k, v []byte, err error) { panic("not implemented yet") } func (c *boltCursor) PutNoOverwrite(key []byte, value []byte) error { panic("not implemented yet") } +func (c *boltCursor) Count() (uint64, error) { panic("not supported") } func (c *boltCursor) SeekExact(key []byte) (val []byte, err error) { return c.bucket.Get(key) diff --git a/ethdb/kv_lmdb.go b/ethdb/kv_lmdb.go index a6af5deda..903f7ab4f 100644 --- a/ethdb/kv_lmdb.go +++ b/ethdb/kv_lmdb.go @@ -617,6 +617,14 @@ func (c *LmdbCursor) initCursor() error { return nil } +func (c *LmdbCursor) Count() (uint64, error) { + st, err := c.tx.tx.Stat(c.bucketCfg.DBI) + if err != nil { + return 0, err + } + return st.Entries, nil +} + func (c *LmdbCursor) First() ([]byte, []byte, error) { if c.c == nil { if err := c.initCursor(); err != nil { diff --git a/ethdb/kv_remote.go b/ethdb/kv_remote.go index 0963ed955..cf5d81e95 100644 --- a/ethdb/kv_remote.go +++ b/ethdb/kv_remote.go @@ -261,6 +261,7 @@ func (c *remoteCursor) PutCurrent(key, value []byte) error { panic("n func (c *remoteCursor) Append(key []byte, value []byte) error { panic("not supported") } func (c *remoteCursor) Delete(key []byte) error { panic("not supported") } func (c *remoteCursor) DeleteCurrent() error { panic("not supported") } +func (c *remoteCursor) Count() (uint64, error) { panic("not supported") } func (c *remoteCursor) First() ([]byte, []byte, error) { return c.Seek(c.prefix) diff --git a/ethdb/object_db.go b/ethdb/object_db.go index 80a42de53..68d47c3b3 100644 --- a/ethdb/object_db.go +++ b/ethdb/object_db.go @@ -29,6 +29,7 @@ import ( "github.com/ledgerwatch/turbo-geth/log" "github.com/ledgerwatch/turbo-geth/metrics" "strings" + "time" ) var ( @@ -447,3 +448,28 @@ func NewDatabaseWithFreezer(db *ObjectDatabase, dir, suffix string) (*ObjectData // FIXME: implement freezer in Turbo-Geth return db, nil } + +func WarmUp(tx Tx, bucket string, logEvery *time.Ticker, quit <-chan struct{}) error { + count := 0 + c := tx.Cursor(bucket) + totalKeys, errCount := c.Count() + if errCount != nil { + return errCount + } + for k, _, err := c.First(); k != nil; k, _, err = c.Next() { + if err != nil { + return err + } + count++ + + select { + default: + case <-quit: + return common.ErrStopped + case <-logEvery.C: + log.Info("Warmed up state", "progress", fmt.Sprintf("%.2fM/%.2fM", float64(count)/1_000_000, float64(totalKeys)/1_000_000)) + } + } + + return nil +}