cache state check (#5844)

draft for now to get some early feedback on approach
This commit is contained in:
hexoscott 2022-11-07 13:04:31 +00:00 committed by GitHub
parent 82bb408e99
commit 636586c1b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 112 additions and 33 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/ledgerwatch/erigon-lib/common/dir"
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/rpc/rpccfg"
"github.com/ledgerwatch/erigon/turbo/debug"
@ -30,6 +31,13 @@ import (
kv2 "github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/erigon-lib/kv/remotedb"
"github.com/ledgerwatch/erigon-lib/kv/remotedbserver"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
grpcHealth "google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/cli/httpcfg"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/health"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcservices"
@ -47,12 +55,6 @@ import (
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snap"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
grpcHealth "google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
)
var rootCmd = &cobra.Command{
@ -226,8 +228,8 @@ func checkDbCompatibility(ctx context.Context, db kv.RoDB) error {
func EmbeddedServices(ctx context.Context,
erigonDB kv.RoDB, stateCacheCfg kvcache.CoherentConfig,
blockReader services.FullBlockReader, snapshots *snapshotsync.RoSnapshots, agg *libstate.Aggregator22,
ethBackendServer remote.ETHBACKENDServer, txPoolServer txpool.TxpoolServer, miningServer txpool.MiningServer,
blockReader services.FullBlockReader, ethBackendServer remote.ETHBACKENDServer, txPoolServer txpool.TxpoolServer,
miningServer txpool.MiningServer, stateDiffClient StateChangesClient,
) (eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, stateCache kvcache.Cache, ff *rpchelper.Filters, err error) {
if stateCacheCfg.CacheSize > 0 {
// notification about new blocks (state stream) doesn't work now inside erigon - because
@ -238,8 +240,7 @@ func EmbeddedServices(ctx context.Context,
} else {
stateCache = kvcache.NewDummy()
}
kvRPC := remotedbserver.NewKvServer(ctx, erigonDB, snapshots, agg)
stateDiffClient := direct.NewStateDiffClientDirect(kvRPC)
subscribeToStateChangesLoop(ctx, stateDiffClient, stateCache)
directClient := direct.NewEthBackendClientDirect(ethBackendServer)

View File

@ -0,0 +1,24 @@
package commands
import (
"context"
"github.com/ledgerwatch/erigon-lib/kv/kvcache"
)
func (api *ErigonImpl) CacheCheck() (*kvcache.CacheValidationResult, error) {
cache := api.stateCache
ctx := context.Background()
tx, err := api.db.BeginRo(ctx)
if err != nil {
return nil, err
}
result, err := cache.ValidateCurrentRoot(ctx, tx)
if err != nil {
return nil, err
}
return result, nil
}

View File

@ -0,0 +1,22 @@
package rawdb
import (
"encoding/binary"
"github.com/ledgerwatch/erigon-lib/kv"
)
func GetStateVersion(tx kv.Tx) (uint64, error) {
val, err := tx.GetOne(kv.Sequence, kv.PlainStateVersion)
if err != nil {
return 0, err
}
if len(val) == 0 {
return 0, nil
}
return binary.BigEndian.Uint64(val), nil
}
func IncrementStateVersion(tx kv.RwTx) (uint64, error) {
return tx.IncrementSequence(string(kv.PlainStateVersion), 1)
}

View File

@ -5,6 +5,7 @@ import (
"encoding/binary"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/types/accounts"

View File

@ -5,6 +5,7 @@ import (
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/types/accounts"
@ -16,6 +17,7 @@ var _ WriterWithChangeSets = (*PlainStateWriter)(nil)
type putDel interface {
kv.Putter
kv.Deleter
IncrementSequence(bucket string, amount uint64) (uint64, error)
}
type PlainStateWriter struct {
db putDel

View File

@ -46,6 +46,12 @@ import (
txpool2 "github.com/ledgerwatch/erigon-lib/txpool"
"github.com/ledgerwatch/erigon-lib/txpool/txpooluitl"
types2 "github.com/ledgerwatch/erigon-lib/types"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cmd/downloader/downloader"
"github.com/ledgerwatch/erigon/cmd/downloader/downloader/downloadercfg"
@ -91,11 +97,6 @@ import (
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snap"
stages2 "github.com/ledgerwatch/erigon/turbo/stages"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/types/known/emptypb"
)
// Config contains the configuration options of the ETH protocol.
@ -386,13 +387,13 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
}
var miningRPC txpool_proto.MiningServer
stateDiffClient := direct.NewStateDiffClientDirect(kvRPC)
if config.DeprecatedTxPool.Disable {
backend.txPool2GrpcServer = &txpool2.GrpcDisabled{}
} else {
//cacheConfig := kvcache.DefaultCoherentCacheConfig
//cacheConfig.MetricsLabel = "txpool"
stateDiffClient := direct.NewStateDiffClientDirect(kvRPC)
backend.newTxs2 = make(chan types2.Hashes, 1024)
//defer close(newTxs)
backend.txPool2DB, backend.txPool2, backend.txPool2Fetch, backend.txPool2Send, backend.txPool2GrpcServer, err = txpooluitl.AllComponents(
@ -627,7 +628,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
}
// start HTTP API
httpRpcCfg := stack.Config().Http
ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, blockReader, allSnapshots, backend.agg, ethBackendRPC, backend.txPool2GrpcServer, miningRPC)
ethRpcClient, txPoolRpcClient, miningRpcClient, stateCache, ff, err := cli.EmbeddedServices(ctx, chainKv, httpRpcCfg.StateCache, blockReader, ethBackendRPC, backend.txPool2GrpcServer, miningRPC, stateDiffClient)
if err != nil {
return nil, err
}

View File

@ -15,9 +15,10 @@ import (
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/eth/ethconfig/estimate"
"github.com/ledgerwatch/log/v3"
commonold "github.com/ledgerwatch/erigon/common"
ecom "github.com/ledgerwatch/erigon/common"
@ -491,6 +492,11 @@ Loop:
return fmt.Errorf("batch commit: %w", err)
}
_, err = rawdb.IncrementStateVersion(tx)
if err != nil {
log.Error("writing plain state version", "err", err)
}
if !useExternalTx {
if err = tx.Commit(); err != nil {
return err

2
go.mod
View File

@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon
go 1.18
require (
github.com/ledgerwatch/erigon-lib v0.0.0-20221107051601-e76f03a69ec7
github.com/ledgerwatch/erigon-lib v0.0.0-20221107051656-19804fd97122
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20221026024915-f6abfe5c120e
github.com/ledgerwatch/log/v3 v3.6.0
github.com/ledgerwatch/secp256k1 v1.0.0

4
go.sum
View File

@ -558,8 +558,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20221107051601-e76f03a69ec7 h1:oN7VKwHprxm10acC6zZTMmjL2yIh6QNQXlq0mgBTgTQ=
github.com/ledgerwatch/erigon-lib v0.0.0-20221107051601-e76f03a69ec7/go.mod h1:mEDJcRnjEtpclZgsx60ca2qjbMfjSMBzvCFhLO6BYVY=
github.com/ledgerwatch/erigon-lib v0.0.0-20221107051656-19804fd97122 h1:KhOl4ZqIoRl9ZAhXXzGFnUVqlnR/P2slT5UgIPL/IMk=
github.com/ledgerwatch/erigon-lib v0.0.0-20221107051656-19804fd97122/go.mod h1:mEDJcRnjEtpclZgsx60ca2qjbMfjSMBzvCFhLO6BYVY=
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20221026024915-f6abfe5c120e h1:VRfm6Ylg0WTnjzFs7mU5qOyZGZTPhnlpwtMPrcD7n+U=
github.com/ledgerwatch/erigon-snapshot v1.1.1-0.20221026024915-f6abfe5c120e/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/log/v3 v3.6.0 h1:JBUSK1epPyutUrz7KYDTcJtQLEHnehECRpKbM1ugy5M=

View File

@ -21,6 +21,10 @@ import (
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli"
"golang.org/x/sync/semaphore"
"github.com/ledgerwatch/erigon/cmd/hack/tool/fromdb"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/core/rawdb"
@ -32,9 +36,6 @@ import (
"github.com/ledgerwatch/erigon/turbo/logging"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snap"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli"
"golang.org/x/sync/semaphore"
)
const ASSERT = false

View File

@ -11,6 +11,10 @@ import (
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/kvcache"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/pflag"
"github.com/urfave/cli"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/cli/httpcfg"
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/common"
@ -18,9 +22,6 @@ import (
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/ethdb/prune"
"github.com/ledgerwatch/erigon/node/nodecfg"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/pflag"
"github.com/urfave/cli"
)
var (

View File

@ -6,12 +6,13 @@ import (
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon/common"
)
// Accumulator collects state changes in a form that can then be delivered to the RPC daemon
type Accumulator struct {
viewID uint64 // mdbx's txID
plainStateID uint64
changes []*remote.StateChange
latestChange *remote.StateChange
accountChangeIndex map[common.Address]int // For the latest changes, allows finding account change by account's address
@ -26,22 +27,26 @@ type StateChangeConsumer interface {
SendStateChanges(ctx context.Context, sc *remote.StateChangeBatch)
}
func (a *Accumulator) Reset(viewID uint64) {
func (a *Accumulator) Reset(plainStateID uint64) {
a.changes = nil
a.latestChange = nil
a.accountChangeIndex = nil
a.storageChangeIndex = nil
a.viewID = viewID
a.plainStateID = plainStateID
}
func (a *Accumulator) SendAndReset(ctx context.Context, c StateChangeConsumer, pendingBaseFee uint64, blockGasLimit uint64) {
if a == nil || c == nil || len(a.changes) == 0 {
return
}
sc := &remote.StateChangeBatch{DatabaseViewID: a.viewID, ChangeBatch: a.changes, PendingBlockBaseFee: pendingBaseFee, BlockGasLimit: blockGasLimit}
sc := &remote.StateChangeBatch{DatabaseViewID: a.plainStateID, ChangeBatch: a.changes, PendingBlockBaseFee: pendingBaseFee, BlockGasLimit: blockGasLimit}
c.SendStateChanges(ctx, sc)
a.Reset(0) // reset here for GC, but there will be another Reset with correct viewID
}
func (a *Accumulator) SetStateID(stateID uint64) {
a.plainStateID = stateID
}
// StartChange begins accumulation of changes for a new block
func (a *Accumulator) StartChange(blockHeight uint64, blockHash common.Hash, txs [][]byte, unwind bool) {
a.changes = append(a.changes, &remote.StateChange{})

View File

@ -15,6 +15,8 @@ import (
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/memdb"
"github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/log/v3"
"github.com/ledgerwatch/erigon/cmd/sentry/sentry"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus/misc"
@ -32,7 +34,6 @@ import (
"github.com/ledgerwatch/erigon/turbo/shards"
"github.com/ledgerwatch/erigon/turbo/snapshotsync"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
"github.com/ledgerwatch/log/v3"
)
func SendPayloadStatus(hd *headerdownload.HeaderDownload, headBlockHash common.Hash, err error) {
@ -167,7 +168,11 @@ func StageLoopStep(
}
if notifications != nil && notifications.Accumulator != nil && canRunCycleInOneTransaction {
notifications.Accumulator.Reset(tx.ViewID())
stateVersion, err := rawdb.GetStateVersion(tx)
if err != nil {
log.Error("problem reading plain state version", "err", err)
}
notifications.Accumulator.Reset(stateVersion)
}
err = sync.Run(db, tx, initialCycle, false /* quiet */)
@ -196,6 +201,7 @@ func StageLoopStep(
var headTd *big.Int
var head uint64
var headHash common.Hash
var plainStateVersion uint64
if head, err = stages.GetStageProgress(rotx, stages.Headers); err != nil {
return headBlockHash, err
}
@ -206,6 +212,14 @@ func StageLoopStep(
return headBlockHash, err
}
headBlockHash = rawdb.ReadHeadBlockHash(rotx)
// update the accumulator with a new plain state version so the cache can be notified that
// state has moved on
if plainStateVersion, err = rawdb.GetStateVersion(rotx); err != nil {
return headBlockHash, err
}
notifications.Accumulator.SetStateID(plainStateVersion)
if canRunCycleInOneTransaction && (head != finishProgressBefore || commitTime > 500*time.Millisecond) {
log.Info("Commit cycle", "in", commitTime)
}
@ -230,6 +244,7 @@ func StageLoopStep(
}
updateHead(ctx, head, headHash, headTd256)
}
if notifications != nil {
if notifications.Accumulator != nil {
header := rawdb.ReadCurrentHeader(rotx)