metrics for header/body/exec stages (#1618)

This commit is contained in:
Alex Sharov 2021-03-28 21:40:42 +07:00 committed by GitHub
parent 8ccc6b2664
commit 0685250095
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 81 additions and 44 deletions

View File

@ -16,7 +16,7 @@
"gnetId": null,
"graphTooltip": 0,
"id": 1,
"iteration": 1615349256966,
"iteration": 1616929111437,
"links": [],
"panels": [
{
@ -73,11 +73,11 @@
"steppedLine": false,
"targets": [
{
"expr": "chain_execution_number{instance=~\"$instance\"}",
"expr": "stage_execution{instance=~\"$instance\"}",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "execution: {{quantile}}, {{instance}}",
"legendFormat": "execution: {{instance}}",
"refId": "A"
}
],
@ -179,7 +179,7 @@
"steppedLine": false,
"targets": [
{
"expr": "rate(chain_execution_number{instance=~\"$instance\"}[1m])",
"expr": "rate(stage_execution{instance=~\"$instance\"}[1m])",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
@ -283,11 +283,11 @@
"steppedLine": false,
"targets": [
{
"expr": "chain_execution{quantile=\"$quantile\",instance=~\"$instance\"}",
"expr": "stage_execution{instance=~\"$instance\"}",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "execution: {{quantile}}, {{instance}}",
"legendFormat": "execution: {{instance}}",
"refId": "A"
}
],
@ -503,7 +503,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Size",
"title": "DB Size",
"tooltip": {
"shared": true,
"sort": 0,
@ -3274,7 +3274,7 @@
"orientation": "horizontal",
"reduceOptions": {
"calcs": [
"mean"
"lastNotNull"
],
"fields": "",
"values": false
@ -3285,11 +3285,11 @@
"pluginVersion": "7.4.3",
"targets": [
{
"expr": "chain_head_header{instance=~\"$instance\"}",
"expr": "stage_headers{instance=~\"$instance\"}",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{instance}}",
"legendFormat": "Header: {{instance}}",
"refId": "A"
}
],
@ -3430,7 +3430,7 @@
"orientation": "horizontal",
"reduceOptions": {
"calcs": [
"mean"
"lastNotNull"
],
"fields": "",
"values": false
@ -3441,11 +3441,11 @@
"pluginVersion": "7.4.3",
"targets": [
{
"expr": "chain_head_block{instance=~\"$instance\"}",
"expr": "stage_headers{instance=~\"$instance\"}",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{instance}}",
"legendFormat": "blocks:{{instance}}",
"refId": "A"
}
],
@ -4222,7 +4222,7 @@
"type": "row"
}
],
"refresh": false,
"refresh": "10s",
"schemaVersion": 27,
"style": "dark",
"tags": [],
@ -4342,14 +4342,9 @@
{
"allValue": null,
"current": {
"selected": true,
"tags": [],
"text": [
"192.168.255.138:6060"
],
"value": [
"192.168.255.138:6060"
]
"selected": false,
"text": "All",
"value": "$__all"
},
"datasource": "Prometheus",
"definition": "system_cpu_sysload",
@ -4378,7 +4373,7 @@
]
},
"time": {
"from": "now-15m",
"from": "now-12h",
"to": "now"
},
"timepicker": {
@ -4408,5 +4403,5 @@
"timezone": "",
"title": "TurboGeth Prometheus",
"uid": "FPpjH6Hik",
"version": 21
"version": 49
}

View File

@ -51,9 +51,9 @@ import (
)
var (
headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil)
headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil)
headFastBlockGauge = metrics.NewRegisteredGauge("chain/head/receipt", nil)
//headBlockGauge = metrics.NewRegisteredGauge("chain/head/block", nil)
//headHeaderGauge = metrics.NewRegisteredGauge("chain/head/header", nil)
//headFastBlockGauge = metrics.NewRegisteredGauge("chain/head/receipt", nil)
//accountReadTimer = metrics.NewRegisteredTimer("chain/account/reads", nil)
//accountHashTimer = metrics.NewRegisteredTimer("chain/account/hashes", nil)
@ -77,7 +77,7 @@ var (
blockReorgInvalidatedTx = metrics.NewRegisteredMeter("chain/reorg/invalidTx", nil)
//blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil)
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
//blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
errInsertionInterrupted = errors.New("insertion is interrupted")
@ -354,7 +354,7 @@ func (bc *BlockChain) loadLastState() error {
// Make sure the state associated with the block is available
// Everything seems to be fine, set as the head block
bc.currentBlock.Store(currentBlock)
headBlockGauge.Update(int64(currentBlock.NumberU64()))
//headBlockGauge.Update(int64(currentBlock.NumberU64()))
// Restore the last known head header
currentHeader := currentBlock.Header()
@ -367,12 +367,12 @@ func (bc *BlockChain) loadLastState() error {
// Restore the last known head fast block
bc.currentFastBlock.Store(currentBlock)
headFastBlockGauge.Update(int64(currentBlock.NumberU64()))
//headFastBlockGauge.Update(int64(currentBlock.NumberU64()))
if head := rawdb.ReadHeadFastBlockHash(bc.db); head != (common.Hash{}) {
if block := bc.GetBlockByHash(head); block != nil {
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
//headFastBlockGauge.Update(int64(block.NumberU64()))
}
}
// Issue a status log for the user
@ -410,7 +410,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
// last step, however the direction of SetHead is from high
// to low, so it's safe the update in-memory markers directly.
bc.currentBlock.Store(newHeadBlock)
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
//headBlockGauge.Update(int64(newHeadBlock.NumberU64()))
}
}
@ -428,7 +428,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
// last step, however the direction of SetHead is from high
// to low, so it's safe the update in-memory markers directly.
bc.currentFastBlock.Store(newHeadFastBlock)
headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
//headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64()))
}
return bc.CurrentBlock().NumberU64(), false /* we have nothing to wipe in turbo-geth */
@ -476,7 +476,7 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
// If all checks out, manually set the head block
bc.Chainmu.Lock()
bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
//headBlockGauge.Update(int64(block.NumberU64()))
bc.Chainmu.Unlock()
log.Info("Committed new head block", "number", block.Number(), "hash", hash)
@ -540,11 +540,11 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
// Last update all in-memory chain markers
bc.genesisBlock = genesis
bc.currentBlock.Store(bc.genesisBlock)
headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
//headBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
bc.hc.SetGenesis(bc.genesisBlock.Header())
bc.hc.SetCurrentHeader(bc.db, bc.genesisBlock.Header())
bc.currentFastBlock.Store(bc.genesisBlock)
headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
//headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64()))
return nil
}
@ -612,10 +612,10 @@ func (bc *BlockChain) writeHeadBlock(block *types.Block) error {
// If the block is better than our head or is on a different chain, force update heads
if updateHeads {
bc.currentFastBlock.Store(block)
headFastBlockGauge.Update(int64(block.NumberU64()))
//headFastBlockGauge.Update(int64(block.NumberU64()))
}
bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
//headBlockGauge.Update(int64(block.NumberU64()))
return nil
}
@ -849,7 +849,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if bc.GetTd(currentFastBlock.Hash(), currentFastBlock.NumberU64()).Cmp(td) < 0 {
rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
bc.currentFastBlock.Store(head)
headFastBlockGauge.Update(int64(head.NumberU64()))
//headFastBlockGauge.Update(int64(head.NumberU64()))
bc.Chainmu.Unlock()
return true
}
@ -1916,7 +1916,6 @@ func ExecuteBlockEphemerally(
stateWriter state.WriterWithChangeSets,
) (types.Receipts, error) {
defer blockExecutionTimer.UpdateSince(time.Now())
defer blockExecutionNumber.Update(block.Number().Int64())
block.Uncles()
ibs := state.New(stateReader)
header := block.Header()

View File

@ -100,7 +100,7 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c
}
}
hc.currentHeaderHash = hc.CurrentHeader().Hash()
headHeaderGauge.Update(hc.CurrentHeader().Number.Int64())
//headHeaderGauge.Update(hc.CurrentHeader().Number.Int64())
return hc, nil
}
@ -275,7 +275,7 @@ func (hc *HeaderChain) writeHeaders(headers []*types.Header) (result *headerWrit
// Last step update all in-memory head header markers
hc.currentHeaderHash = lastHash
hc.currentHeader.Store(types.CopyHeader(lastHeader))
headHeaderGauge.Update(lastHeader.Number.Int64())
//headHeaderGauge.Update(lastHeader.Number.Int64())
// Chain status is canonical since this insert was a reorg.
// Note that all inserts which have higher TD than existing are 'reorg'.
@ -543,7 +543,7 @@ func (hc *HeaderChain) SetCurrentHeader(dbw ethdb.Putter, head *types.Header) {
rawdb.WriteHeadHeaderHash(dbw, head.Hash())
hc.currentHeader.Store(head)
hc.currentHeaderHash = head.Hash()
headHeaderGauge.Update(head.Number.Int64())
//headHeaderGauge.Update(head.Number.Int64())
}
type (
@ -598,7 +598,7 @@ func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, d
}
hc.currentHeader.Store(parent)
hc.currentHeaderHash = parentHash
headHeaderGauge.Update(parent.Number.Int64())
//headHeaderGauge.Update(parent.Number.Int64())
// If this is the first iteration, wipe any leftover data upwards too so
// we don't end up with dangling daps in the database

View File

@ -329,6 +329,10 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
return nil, errors.New("mode is " + config.StorageMode.ToString() + " original mode is " + sm.ToString())
}
if err = stagedsync.UpdateMetrics(chainDb); err != nil {
return nil, err
}
vmConfig, cacheConfig := BlockchainRuntimeConfig(config)
txCacher := core.NewTxSenderCacher(runtime.NumCPU())
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve, txCacher)

View File

@ -1859,6 +1859,7 @@ func (d *Downloader) importBlockResults(logPrefix string, results []*fetchResult
if err1 := d.bodiesState.Update(d.stateDB, blocks[index-1].NumberU64()); err1 != nil {
return 0, fmt.Errorf("saving SyncStage Bodies progress: %v", err1)
}
return blocks[index-1].NumberU64() + 1, nil
}
return 0, nil

View File

@ -418,3 +418,28 @@ func InsertBlocksInStages(db ethdb.Database, storageMode ethdb.StorageMode, conf
func InsertBlockInStages(db ethdb.Database, config *params.ChainConfig, vmConfig *vm.Config, engine consensus.Engine, block *types.Block, checkRoot bool) (bool, error) {
return InsertBlocksInStages(db, ethdb.DefaultStorageMode, config, vmConfig, engine, []*types.Block{block}, checkRoot)
}
// UpdateMetrics - need update metrics manually because current "metrics" package doesn't support labels
// need to fix it in future
func UpdateMetrics(db ethdb.Getter) error {
var progress uint64
var err error
progress, err = stages.GetStageProgress(db, stages.Headers)
if err != nil {
return err
}
stageHeadersGauge.Update(int64(progress))
progress, err = stages.GetStageProgress(db, stages.Bodies)
if err != nil {
return err
}
stageBodiesGauge.Update(int64(progress))
progress, err = stages.GetStageProgress(db, stages.Execution)
if err != nil {
return err
}
stageExecutionGauge.Update(int64(progress))
return nil
}

View File

@ -13,9 +13,12 @@ import (
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/metrics"
"github.com/ledgerwatch/turbo-geth/turbo/stages/bodydownload"
)
var stageBodiesGauge = metrics.NewRegisteredGauge("stage/bodies", nil)
// BodiesForward progresses Bodies stage in the forward direction
func BodiesForward(
s *StageState,
@ -159,6 +162,7 @@ func BodiesForward(
case <-wakeUpChan:
//log.Info("bodyLoop woken up by the incoming request")
}
stageBodiesGauge.Update(int64(bodyProgress))
}
if err := batch.Commit(); err != nil {
return fmt.Errorf("%s: failed to write batch commit: %v", logPrefix, err)

View File

@ -23,11 +23,14 @@ import (
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/metrics"
"github.com/ledgerwatch/turbo-geth/params"
"github.com/ledgerwatch/turbo-geth/turbo/shards"
"github.com/ledgerwatch/turbo-geth/turbo/silkworm"
)
var stageExecutionGauge = metrics.NewRegisteredGauge("stage/execution", nil)
const (
logInterval = 30 * time.Second
)
@ -255,6 +258,7 @@ func SpawnExecuteBlocksStage(s *StageState, stateDB ethdb.Database, chainConfig
case <-logEvery.C:
logBlock, logTime = logProgress(logPrefix, logBlock, logTime, blockNum, batch, cache)
}
stageExecutionGauge.Update(int64(blockNum))
}
if cache == nil {

View File

@ -14,10 +14,13 @@ import (
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/metrics"
"github.com/ledgerwatch/turbo-geth/params"
"github.com/ledgerwatch/turbo-geth/rlp"
)
var stageHeadersGauge = metrics.NewRegisteredGauge("stage/headers", nil)
func SpawnHeaderDownloadStage(s *StageState, u Unwinder, d DownloaderGlue, headersFetchers []func() error) error {
err := d.SpawnHeaderDownloadStage(headersFetchers, s, u)
if err == nil {
@ -222,6 +225,7 @@ Error: %v
if err := batch.Put(dbutils.HeadersBucket, dbutils.HeaderKey(number, header.Hash()), data); err != nil {
return false, false, 0, fmt.Errorf("[%s] Failed to store header: %w", logPrefix, err)
}
stageHeadersGauge.Update(int64(lastHeader.Number.Uint64()))
}
if deepFork {
forkHeader := rawdb.ReadHeader(batch, headers[0].ParentHash, headers[0].Number.Uint64()-1)

View File

@ -179,6 +179,7 @@ func HeadersForward(
if stopped {
return fmt.Errorf("interrupted")
}
stageHeadersGauge.Update(int64(headerInserter.GetHighest()))
return nil
}