Added record mode for Caplin (#7660)

This commit is contained in:
Giulio rebuffo 2023-06-05 01:52:55 +02:00 committed by GitHub
parent 415cf86250
commit e45ed7d139
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 100 additions and 12 deletions

2
.gitignore vendored
View File

@ -84,3 +84,5 @@ dist
__debug_bin
*.log
caplin-recordings

26
cl/freezer/utils.go Normal file
View File

@ -0,0 +1,26 @@
package freezer
import (
"bytes"
"fmt"
"github.com/ledgerwatch/erigon-lib/types/ssz"
"github.com/ledgerwatch/erigon/cl/utils"
)
func PutObjectSSZIntoFreezer(objectName, freezerNamespace string, numericalId uint64, object ssz.Marshaler, record Freezer) error {
if record == nil {
return nil
}
var buffer bytes.Buffer
encoded, err := object.EncodeSSZ(nil)
if err != nil {
return err
}
if _, err = buffer.Write(utils.CompressSnappy(encoded)); err != nil {
return err
}
id := fmt.Sprintf("%d", numericalId)
return record.Put(&buffer, nil, freezerNamespace, objectName, id)
}

View File

@ -46,7 +46,7 @@ func TestForkChoiceBasic(t *testing.T) {
// Initialize forkchoice store
anchorState := state.New(&clparams.MainnetBeaconConfig)
require.NoError(t, utils.DecodeSSZSnappy(anchorState, anchorStateEncoded, int(clparams.AltairVersion)))
store, err := forkchoice.NewForkChoiceStore(anchorState, nil, false)
store, err := forkchoice.NewForkChoiceStore(anchorState, nil, nil, false)
require.NoError(t, err)
// first steps
store.OnTick(0)

View File

@ -4,6 +4,7 @@ import (
"sync"
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
"github.com/ledgerwatch/erigon/cl/freezer"
state2 "github.com/ledgerwatch/erigon/cl/phase1/core/state"
"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice/fork_graph"
@ -38,6 +39,8 @@ type ForkChoiceStore struct {
mu sync.Mutex
// EL
engine execution_client.ExecutionEngine
// freezer
recorder freezer.Freezer
}
type LatestMessage struct {
@ -46,7 +49,7 @@ type LatestMessage struct {
}
// NewForkChoiceStore initialize a new store from the given anchor state, either genesis or checkpoint sync state.
func NewForkChoiceStore(anchorState *state2.BeaconState, engine execution_client.ExecutionEngine, enabledPruning bool) (*ForkChoiceStore, error) {
func NewForkChoiceStore(anchorState *state2.BeaconState, engine execution_client.ExecutionEngine, recorder freezer.Freezer, enabledPruning bool) (*ForkChoiceStore, error) {
anchorRoot, err := anchorState.BlockRoot()
if err != nil {
return nil, err
@ -76,6 +79,7 @@ func NewForkChoiceStore(anchorState *state2.BeaconState, engine execution_client
checkpointStates: checkpointStates,
eth2Roots: eth2Roots,
engine: engine,
recorder: recorder,
}, nil
}

View File

@ -3,6 +3,7 @@ package forkchoice
import (
"fmt"
"github.com/ledgerwatch/erigon/cl/freezer"
"github.com/ledgerwatch/erigon/cl/phase1/core/transition"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice/fork_graph"
@ -58,6 +59,11 @@ func (f *ForkChoiceStore) OnBlock(block *cltypes.SignedBeaconBlock, newPayload,
if f.Slot() == block.Block.Slot && isBeforeAttestingInterval {
f.proposerBoostRoot = blockRoot
}
if lastProcessedState.Slot()%f.forkGraph.Config().SlotsPerEpoch == 0 {
if err := freezer.PutObjectSSZIntoFreezer("beaconState", "caplin_core", lastProcessedState.Slot(), lastProcessedState, f.recorder); err != nil {
return err
}
}
// Update checkpoints
f.updateCheckpoints(lastProcessedState.CurrentJustifiedCheckpoint().Copy(), lastProcessedState.FinalizedCheckpoint().Copy())
// First thing save previous values of the checkpoints (avoid memory copy of all states and ensure easy revert)

View File

@ -6,6 +6,7 @@ import (
"github.com/VictoriaMetrics/metrics"
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
"github.com/ledgerwatch/erigon/cl/freezer"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice"
libcommon "github.com/ledgerwatch/erigon-lib/common"
@ -23,6 +24,7 @@ import (
type GossipManager struct {
ctx context.Context
recorder freezer.Freezer
forkChoice *forkchoice.ForkChoiceStore
sentinel sentinel.SentinelClient
// configs
@ -30,13 +32,15 @@ type GossipManager struct {
genesisConfig *clparams.GenesisConfig
}
func NewGossipReceiver(ctx context.Context, s sentinel.SentinelClient, forkChoice *forkchoice.ForkChoiceStore, beaconConfig *clparams.BeaconChainConfig, genesisConfig *clparams.GenesisConfig) *GossipManager {
func NewGossipReceiver(ctx context.Context, s sentinel.SentinelClient, forkChoice *forkchoice.ForkChoiceStore,
beaconConfig *clparams.BeaconChainConfig, genesisConfig *clparams.GenesisConfig, recorder freezer.Freezer) *GossipManager {
return &GossipManager{
sentinel: s,
forkChoice: forkChoice,
ctx: ctx,
beaconConfig: beaconConfig,
genesisConfig: genesisConfig,
recorder: recorder,
}
}
@ -87,6 +91,10 @@ func (g *GossipManager) onRecv(data *sentinel.GossipData, l log.Ctx) error {
"numGC", m.NumGC,
)
if err := freezer.PutObjectSSZIntoFreezer("gossip_signedBeaconBlock", "caplin_core", block.Block.Slot, block, g.recorder); err != nil {
return err
}
peers := metrics.GetOrCreateGauge("caplin_peer_count", func() float64 {
return float64(count.Amount)
})

View File

@ -5,6 +5,7 @@ import (
"runtime"
"time"
"github.com/ledgerwatch/erigon/cl/freezer"
"github.com/ledgerwatch/erigon/cl/phase1/core/state"
"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice"
@ -30,13 +31,22 @@ type StageForkChoiceCfg struct {
state *state.BeaconState
gossipManager *network2.GossipManager
forkChoice *forkchoice.ForkChoiceStore
caplinFreezer freezer.Freezer
}
const minPeersForDownload = 2
const minPeersForSyncStart = 4
var (
freezerNameSpacePrefix = ""
blockObjectName = "singedBeaconBlock"
stateObjectName = "beaconState"
gossipAction = "gossip"
)
func StageForkChoice(db kv.RwDB, downloader *network2.ForwardBeaconDownloader, genesisCfg *clparams.GenesisConfig,
beaconCfg *clparams.BeaconChainConfig, state *state.BeaconState, executionClient *execution_client.ExecutionClient, gossipManager *network2.GossipManager, forkChoice *forkchoice.ForkChoiceStore) StageForkChoiceCfg {
beaconCfg *clparams.BeaconChainConfig, state *state.BeaconState, executionClient *execution_client.ExecutionClient, gossipManager *network2.GossipManager,
forkChoice *forkchoice.ForkChoiceStore, caplinFreezer freezer.Freezer) StageForkChoiceCfg {
return StageForkChoiceCfg{
db: db,
downloader: downloader,
@ -46,6 +56,7 @@ func StageForkChoice(db kv.RwDB, downloader *network2.ForwardBeaconDownloader, g
executionClient: executionClient,
gossipManager: gossipManager,
forkChoice: forkChoice,
caplinFreezer: caplinFreezer,
}
}
@ -94,6 +105,10 @@ func startDownloadService(s *stagedsync.StageState, cfg StageForkChoiceCfg) {
cfg.downloader.SetHighestProcessedSlot(cfg.state.Slot())
cfg.downloader.SetProcessFunction(func(highestSlotProcessed uint64, _ libcommon.Hash, newBlocks []*cltypes.SignedBeaconBlock) (uint64, libcommon.Hash, error) {
for _, block := range newBlocks {
if err := freezer.PutObjectSSZIntoFreezer("downloaded_signedBeaconBlock", "caplin_core", block.Block.Slot, block, cfg.caplinFreezer); err != nil {
return highestSlotProcessed, libcommon.Hash{}, err
}
sendForckchoice :=
utils.GetCurrentSlot(cfg.genesisCfg.GenesisTime, cfg.beaconCfg.SecondsPerSlot) == block.Block.Slot
if err := cfg.forkChoice.OnBlock(block, false, true); err != nil {

View File

@ -81,7 +81,7 @@ func NewConsensusStagedSync(ctx context.Context,
ctx,
StageHistoryReconstruction(db, backwardDownloader, genesisCfg, beaconCfg, beaconDBCfg, state, tmpdir, executionClient),
StageBeaconState(db, beaconCfg, state, executionClient),
StageForkChoice(db, forwardDownloader, genesisCfg, beaconCfg, state, executionClient, gossipManager, forkChoice),
StageForkChoice(db, forwardDownloader, genesisCfg, beaconCfg, state, executionClient, gossipManager, forkChoice, nil),
),
ConsensusUnwindOrder,
ConsensusPruneOrder,

View File

@ -150,7 +150,7 @@ func (b *ForkChoice) Run(t *testing.T, root fs.FS, c spectest.TestCase) (err err
anchorState, err := spectest.ReadBeaconState(root, c.Version(), "anchor_state.ssz_snappy")
require.NoError(t, err)
forkStore, err := forkchoice.NewForkChoiceStore(anchorState, nil, false)
forkStore, err := forkchoice.NewForkChoiceStore(anchorState, nil, nil, false)
require.NoError(t, err)
var steps []ForkChoiceStep

View File

@ -4,6 +4,7 @@ import (
"context"
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
"github.com/ledgerwatch/erigon/cl/freezer"
"github.com/ledgerwatch/erigon/cl/phase1/core/state"
"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice"
@ -19,11 +20,12 @@ import (
"github.com/ledgerwatch/erigon/eth/stagedsync"
)
func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, beaconConfig *clparams.BeaconChainConfig, genesisConfig *clparams.GenesisConfig, engine execution_client.ExecutionEngine, state *state.BeaconState) error {
func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, beaconConfig *clparams.BeaconChainConfig, genesisConfig *clparams.GenesisConfig,
engine execution_client.ExecutionEngine, state *state.BeaconState, caplinFreezer freezer.Freezer) error {
beaconRpc := rpc.NewBeaconRpcP2P(ctx, sentinel, beaconConfig, genesisConfig)
downloader := network2.NewForwardBeaconDownloader(ctx, beaconRpc)
forkChoice, err := forkchoice.NewForkChoiceStore(state, engine, true)
forkChoice, err := forkchoice.NewForkChoiceStore(state, engine, caplinFreezer, true)
if err != nil {
log.Error("Could not create forkchoice", "err", err)
return err
@ -36,6 +38,6 @@ func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, beac
}
return true
})
gossipManager := network2.NewGossipReceiver(ctx, sentinel, forkChoice, beaconConfig, genesisConfig)
return stages.SpawnStageForkChoice(stages.StageForkChoice(nil, downloader, genesisConfig, beaconConfig, state, nil, gossipManager, forkChoice), &stagedsync.StageState{ID: "Caplin"}, nil, ctx)
gossipManager := network2.NewGossipReceiver(ctx, sentinel, forkChoice, beaconConfig, genesisConfig, caplinFreezer)
return stages.SpawnStageForkChoice(stages.StageForkChoice(nil, downloader, genesisConfig, beaconConfig, state, nil, gossipManager, forkChoice, caplinFreezer), &stagedsync.StageState{ID: "Caplin"}, nil, ctx)
}

View File

@ -18,6 +18,7 @@ import (
"fmt"
"os"
"github.com/ledgerwatch/erigon/cl/freezer"
"github.com/ledgerwatch/erigon/cl/phase1/core"
"github.com/ledgerwatch/erigon/cl/phase1/core/state"
"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
@ -111,5 +112,12 @@ func runCaplinNode(cliCtx *cli.Context) error {
defer cc.Close()
engine = execution_client.NewExecutionEnginePhase1FromClient(ctx, remote.NewETHBACKENDClient(cc))
}
return caplin1.RunCaplinPhase1(ctx, sentinel, cfg.BeaconCfg, cfg.GenesisCfg, engine, state)
var caplinFreezer freezer.Freezer
if cfg.RecordMode {
caplinFreezer = &freezer.RootPathOsFs{
Root: cfg.RecordDir,
}
}
return caplin1.RunCaplinPhase1(ctx, sentinel, cfg.BeaconCfg, cfg.GenesisCfg, engine, state, caplinFreezer)
}

View File

@ -34,6 +34,8 @@ type ConsensusClientCliCfg struct {
TransitionChain bool `json:"transitionChain"`
NetworkType clparams.NetworkType `json:"networkType"`
InitialSync bool `json:"initialSync"`
RecordMode bool `json:"recordMode"`
RecordDir string `json:"recordDir"`
InitalState *state.BeaconState
}
@ -71,6 +73,9 @@ func SetupConsensusClientCfg(ctx *cli.Context) (*ConsensusClientCliCfg, error) {
cfg.ServerAddr = fmt.Sprintf("%s:%d", ctx.String(flags.SentinelServerAddr.Name), ctx.Int(flags.SentinelServerPort.Name))
cfg.ServerProtocol = "tcp"
cfg.RecordMode = ctx.Bool(flags.RecordModeFlag.Name)
cfg.RecordDir = ctx.String(flags.RecordModeDir.Name)
cfg.Port = uint(ctx.Int(flags.SentinelDiscoveryPort.Name))
cfg.Addr = ctx.String(flags.SentinelDiscoveryAddr.Name)

View File

@ -19,4 +19,6 @@ var CLDefaultFlags = []cli.Flag{
&SentinelStaticPeersFlag,
&TransitionChainFlag,
&InitSyncFlag,
&RecordModeDir,
&RecordModeFlag,
}

View File

@ -87,4 +87,14 @@ var (
Name: "initial-sync",
Usage: "use initial-sync",
}
RecordModeFlag = cli.BoolFlag{
Value: false,
Name: "record-mode",
Usage: "enable/disable record mode",
}
RecordModeDir = cli.StringFlag{
Value: "caplin-recordings",
Name: "record-dir",
Usage: "directory for states and block recordings",
}
)

View File

@ -614,7 +614,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
return nil, err
}
go caplin1.RunCaplinPhase1(ctx, client, beaconCfg, genesisCfg, engine, state)
go caplin1.RunCaplinPhase1(ctx, client, beaconCfg, genesisCfg, engine, state, nil)
}
if currentBlock == nil {