blockReader: use in ethstat (#7565)

This commit is contained in:
Alex Sharov 2023-05-23 16:30:47 +07:00 committed by GitHub
parent 696ccb6edd
commit 23bd14744b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 35 additions and 26 deletions

View File

@ -16,7 +16,6 @@ import (
"github.com/c2h5oh/datasize"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
@ -60,6 +59,7 @@ import (
"github.com/ledgerwatch/erigon/consensus/merge"
"github.com/ledgerwatch/erigon/core"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/erigon/core/state/historyv2read"
"github.com/ledgerwatch/erigon/core/state/temporal"
"github.com/ledgerwatch/erigon/core/systemcontracts"
@ -656,7 +656,7 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
if config.Ethstats != "" {
var headCh chan [][]byte
headCh, backend.unsubscribeEthstat = backend.notifications.Events.AddHeaderSubscription()
if err := ethstats.New(stack, backend.sentryServers, chainKv, backend.engine, config.Ethstats, backend.networkID, ctx.Done(), headCh); err != nil {
if err := ethstats.New(stack, backend.sentryServers, chainKv, backend.blockReader, backend.engine, config.Ethstats, backend.networkID, ctx.Done(), headCh); err != nil {
return nil, err
}
}

View File

@ -31,9 +31,12 @@ import (
"sync"
"time"
"github.com/ledgerwatch/erigon-lib/downloader/downloadergrpc"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
clcore "github.com/ledgerwatch/erigon/cl/phase1/core"
"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
"github.com/ledgerwatch/erigon/core/rawdb/blockio"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snap"
"github.com/holiman/uint256"
@ -50,7 +53,6 @@ import (
"github.com/ledgerwatch/erigon-lib/direct"
downloader3 "github.com/ledgerwatch/erigon-lib/downloader"
"github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
"github.com/ledgerwatch/erigon-lib/downloader/downloadergrpc"
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
"github.com/ledgerwatch/erigon-lib/gointerfaces/grpcutil"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
@ -59,7 +61,6 @@ import (
prototypes "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/kvcache"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon-lib/kv/remotedbserver"
libstate "github.com/ledgerwatch/erigon-lib/state"
txpool2 "github.com/ledgerwatch/erigon-lib/txpool"
@ -97,7 +98,6 @@ import (
"github.com/ledgerwatch/erigon/eth/stagedsync"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb/privateapi"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/ethstats"
"github.com/ledgerwatch/erigon/node"
"github.com/ledgerwatch/erigon/p2p"
@ -748,7 +748,7 @@ func (backend *Ethereum) Init(stack *node.Node, config *ethconfig.Config) error
if config.Ethstats != "" {
var headCh chan [][]byte
headCh, backend.unsubscribeEthstat = backend.notifications.Events.AddHeaderSubscription()
if err := ethstats.New(stack, backend.sentryServers, chainKv, backend.engine, config.Ethstats, backend.networkID, ctx.Done(), headCh); err != nil {
if err := ethstats.New(stack, backend.sentryServers, chainKv, backend.blockReader, backend.engine, config.Ethstats, backend.networkID, ctx.Done(), headCh); err != nil {
return err
}
}
@ -1155,6 +1155,9 @@ func (s *Ethereum) SentryCtx() context.Context {
func (s *Ethereum) SentryControlServer() *sentry.MultiClient {
return s.sentriesClient
}
func (s *Ethereum) BlockIO() (services.FullBlockReader, *blockio.BlockWriter) {
return s.blockReader, s.blockWriter
}
// RemoveContents is like os.RemoveAll, but preserve dir itself
func RemoveContents(dir string) error {

View File

@ -34,6 +34,7 @@ import (
"github.com/gorilla/websocket"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/cmd/sentry/sentry"
@ -68,6 +69,7 @@ type Service struct {
pongCh chan struct{} // Pong notifications are fed into this channel
histCh chan []uint64 // History request block numbers are fed into this channel
blockReader services.FullBlockReader
}
// connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the
@ -120,7 +122,7 @@ func (w *connWrapper) Close() error {
}
// New returns a monitoring service ready for stats reporting.
func New(node *node.Node, servers []*sentry.GrpcServer, chainDB kv.RoDB, engine consensus.Engine, url string, networkid uint64, quitCh <-chan struct{}, headCh chan [][]byte) error {
func New(node *node.Node, servers []*sentry.GrpcServer, chainDB kv.RoDB, blockReader services.FullBlockReader, engine consensus.Engine, url string, networkid uint64, quitCh <-chan struct{}, headCh chan [][]byte) error {
// Parse the netstats connection url
re := regexp.MustCompile("([^:@]*)(:([^@]*))?@(.+)")
parts := re.FindStringSubmatch(url)
@ -128,17 +130,18 @@ func New(node *node.Node, servers []*sentry.GrpcServer, chainDB kv.RoDB, engine
return fmt.Errorf("invalid netstats url: \"%s\", should be nodename:secret@host:port", url)
}
ethstats := &Service{
engine: engine,
servers: servers,
node: parts[1],
pass: parts[3],
host: parts[4],
pongCh: make(chan struct{}),
histCh: make(chan []uint64, 1),
networkid: networkid,
chaindb: chainDB,
headCh: headCh,
quitCh: quitCh,
blockReader: blockReader,
engine: engine,
servers: servers,
node: parts[1],
pass: parts[3],
host: parts[4],
pongCh: make(chan struct{}),
histCh: make(chan []uint64, 1),
networkid: networkid,
chaindb: chainDB,
headCh: headCh,
quitCh: quitCh,
}
node.RegisterLifecycle(ethstats)
@ -508,7 +511,10 @@ func (s *Service) reportBlock(conn *connWrapper) error {
}
defer roTx.Rollback()
block := rawdb.ReadCurrentBlock(roTx)
block, err := s.blockReader.CurrentBlock(roTx)
if err != nil {
return err
}
if block == nil {
return nil
}
@ -596,7 +602,7 @@ func (s *Service) reportHistory(conn *connWrapper, list []uint64) error {
history := make([]*blockStats, len(indexes))
for i, number := range indexes {
// Retrieve the next block if it's known to us
block, err := rawdb.ReadBlockByNumber(roTx, number)
block, err := s.blockReader.BlockByNumber(context.Background(), roTx, number)
if err != nil {
return err
}

View File

@ -12,6 +12,7 @@ import (
"syscall"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2"
@ -153,7 +154,8 @@ func ImportChain(ethereum *eth.Ethereum, chainDB kv.RwDB, fn string, logger log.
return fmt.Errorf("interrupted")
}
missing := missingBlocks(chainDB, blocks[:i])
br, _ := ethereum.BlockIO()
missing := missingBlocks(chainDB, blocks[:i], br)
if len(missing) == 0 {
logger.Info("Skipping batch as all blocks present", "batch", batch, "first", blocks[0].Hash(), "last", blocks[i-1].Hash())
continue
@ -183,13 +185,11 @@ func ChainHasBlock(chainDB kv.RwDB, block *types.Block) bool {
return chainHasBlock
}
func missingBlocks(chainDB kv.RwDB, blocks []*types.Block) []*types.Block {
func missingBlocks(chainDB kv.RwDB, blocks []*types.Block, blockReader services.FullBlockReader) []*types.Block {
var headBlock *types.Block
chainDB.View(context.Background(), func(tx kv.Tx) (err error) {
hash := rawdb.ReadHeadHeaderHash(tx)
number := rawdb.ReadHeaderNumber(tx, hash)
headBlock = rawdb.ReadBlock(tx, hash, *number)
return nil
headBlock, err = blockReader.CurrentBlock(tx)
return err
})
for i, block := range blocks {