diff --git a/.gitignore b/.gitignore index e14d0b87a..5df094e31 100644 --- a/.gitignore +++ b/.gitignore @@ -84,3 +84,5 @@ dist __debug_bin *.log + +caplin-recordings \ No newline at end of file diff --git a/cl/freezer/utils.go b/cl/freezer/utils.go new file mode 100644 index 000000000..f71689c8b --- /dev/null +++ b/cl/freezer/utils.go @@ -0,0 +1,26 @@ +package freezer + +import ( + "bytes" + "fmt" + + "github.com/ledgerwatch/erigon-lib/types/ssz" + "github.com/ledgerwatch/erigon/cl/utils" +) + +func PutObjectSSZIntoFreezer(objectName, freezerNamespace string, numericalId uint64, object ssz.Marshaler, record Freezer) error { + if record == nil { + return nil + } + var buffer bytes.Buffer + encoded, err := object.EncodeSSZ(nil) + if err != nil { + return err + } + if _, err = buffer.Write(utils.CompressSnappy(encoded)); err != nil { + return err + } + id := fmt.Sprintf("%d", numericalId) + + return record.Put(&buffer, nil, freezerNamespace, objectName, id) +} diff --git a/cl/phase1/forkchoice/fork_choice_test.go b/cl/phase1/forkchoice/fork_choice_test.go index 0e8afc701..76f2153fa 100644 --- a/cl/phase1/forkchoice/fork_choice_test.go +++ b/cl/phase1/forkchoice/fork_choice_test.go @@ -46,7 +46,7 @@ func TestForkChoiceBasic(t *testing.T) { // Initialize forkchoice store anchorState := state.New(&clparams.MainnetBeaconConfig) require.NoError(t, utils.DecodeSSZSnappy(anchorState, anchorStateEncoded, int(clparams.AltairVersion))) - store, err := forkchoice.NewForkChoiceStore(anchorState, nil, false) + store, err := forkchoice.NewForkChoiceStore(anchorState, nil, nil, false) require.NoError(t, err) // first steps store.OnTick(0) diff --git a/cl/phase1/forkchoice/forkchoice.go b/cl/phase1/forkchoice/forkchoice.go index beb28c30d..ecf75c52b 100644 --- a/cl/phase1/forkchoice/forkchoice.go +++ b/cl/phase1/forkchoice/forkchoice.go @@ -4,6 +4,7 @@ import ( "sync" "github.com/ledgerwatch/erigon/cl/cltypes/solid" + "github.com/ledgerwatch/erigon/cl/freezer" state2 "github.com/ledgerwatch/erigon/cl/phase1/core/state" "github.com/ledgerwatch/erigon/cl/phase1/execution_client" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice/fork_graph" @@ -38,6 +39,8 @@ type ForkChoiceStore struct { mu sync.Mutex // EL engine execution_client.ExecutionEngine + // freezer + recorder freezer.Freezer } type LatestMessage struct { @@ -46,7 +49,7 @@ type LatestMessage struct { } // NewForkChoiceStore initialize a new store from the given anchor state, either genesis or checkpoint sync state. -func NewForkChoiceStore(anchorState *state2.BeaconState, engine execution_client.ExecutionEngine, enabledPruning bool) (*ForkChoiceStore, error) { +func NewForkChoiceStore(anchorState *state2.BeaconState, engine execution_client.ExecutionEngine, recorder freezer.Freezer, enabledPruning bool) (*ForkChoiceStore, error) { anchorRoot, err := anchorState.BlockRoot() if err != nil { return nil, err @@ -76,6 +79,7 @@ func NewForkChoiceStore(anchorState *state2.BeaconState, engine execution_client checkpointStates: checkpointStates, eth2Roots: eth2Roots, engine: engine, + recorder: recorder, }, nil } diff --git a/cl/phase1/forkchoice/on_block.go b/cl/phase1/forkchoice/on_block.go index fbaec674d..cd6860e6a 100644 --- a/cl/phase1/forkchoice/on_block.go +++ b/cl/phase1/forkchoice/on_block.go @@ -3,6 +3,7 @@ package forkchoice import ( "fmt" + "github.com/ledgerwatch/erigon/cl/freezer" "github.com/ledgerwatch/erigon/cl/phase1/core/transition" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice/fork_graph" @@ -58,6 +59,11 @@ func (f *ForkChoiceStore) OnBlock(block *cltypes.SignedBeaconBlock, newPayload, if f.Slot() == block.Block.Slot && isBeforeAttestingInterval { f.proposerBoostRoot = blockRoot } + if lastProcessedState.Slot()%f.forkGraph.Config().SlotsPerEpoch == 0 { + if err := freezer.PutObjectSSZIntoFreezer("beaconState", "caplin_core", lastProcessedState.Slot(), lastProcessedState, f.recorder); err != nil { + return err + } + } // Update checkpoints f.updateCheckpoints(lastProcessedState.CurrentJustifiedCheckpoint().Copy(), lastProcessedState.FinalizedCheckpoint().Copy()) // First thing save previous values of the checkpoints (avoid memory copy of all states and ensure easy revert) diff --git a/cl/phase1/network/gossip_manager.go b/cl/phase1/network/gossip_manager.go index c51c16670..af2f14b3e 100644 --- a/cl/phase1/network/gossip_manager.go +++ b/cl/phase1/network/gossip_manager.go @@ -6,6 +6,7 @@ import ( "github.com/VictoriaMetrics/metrics" "github.com/ledgerwatch/erigon/cl/cltypes/solid" + "github.com/ledgerwatch/erigon/cl/freezer" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" libcommon "github.com/ledgerwatch/erigon-lib/common" @@ -23,6 +24,7 @@ import ( type GossipManager struct { ctx context.Context + recorder freezer.Freezer forkChoice *forkchoice.ForkChoiceStore sentinel sentinel.SentinelClient // configs @@ -30,13 +32,15 @@ type GossipManager struct { genesisConfig *clparams.GenesisConfig } -func NewGossipReceiver(ctx context.Context, s sentinel.SentinelClient, forkChoice *forkchoice.ForkChoiceStore, beaconConfig *clparams.BeaconChainConfig, genesisConfig *clparams.GenesisConfig) *GossipManager { +func NewGossipReceiver(ctx context.Context, s sentinel.SentinelClient, forkChoice *forkchoice.ForkChoiceStore, + beaconConfig *clparams.BeaconChainConfig, genesisConfig *clparams.GenesisConfig, recorder freezer.Freezer) *GossipManager { return &GossipManager{ sentinel: s, forkChoice: forkChoice, ctx: ctx, beaconConfig: beaconConfig, genesisConfig: genesisConfig, + recorder: recorder, } } @@ -87,6 +91,10 @@ func (g *GossipManager) onRecv(data *sentinel.GossipData, l log.Ctx) error { "numGC", m.NumGC, ) + if err := freezer.PutObjectSSZIntoFreezer("gossip_signedBeaconBlock", "caplin_core", block.Block.Slot, block, g.recorder); err != nil { + return err + } + peers := metrics.GetOrCreateGauge("caplin_peer_count", func() float64 { return float64(count.Amount) }) diff --git a/cl/phase1/stages/stage_fork_choice.go b/cl/phase1/stages/stage_fork_choice.go index 1dba40da6..07db922ff 100644 --- a/cl/phase1/stages/stage_fork_choice.go +++ b/cl/phase1/stages/stage_fork_choice.go @@ -5,6 +5,7 @@ import ( "runtime" "time" + "github.com/ledgerwatch/erigon/cl/freezer" "github.com/ledgerwatch/erigon/cl/phase1/core/state" "github.com/ledgerwatch/erigon/cl/phase1/execution_client" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" @@ -30,13 +31,22 @@ type StageForkChoiceCfg struct { state *state.BeaconState gossipManager *network2.GossipManager forkChoice *forkchoice.ForkChoiceStore + caplinFreezer freezer.Freezer } const minPeersForDownload = 2 const minPeersForSyncStart = 4 +var ( + freezerNameSpacePrefix = "" + blockObjectName = "singedBeaconBlock" + stateObjectName = "beaconState" + gossipAction = "gossip" +) + func StageForkChoice(db kv.RwDB, downloader *network2.ForwardBeaconDownloader, genesisCfg *clparams.GenesisConfig, - beaconCfg *clparams.BeaconChainConfig, state *state.BeaconState, executionClient *execution_client.ExecutionClient, gossipManager *network2.GossipManager, forkChoice *forkchoice.ForkChoiceStore) StageForkChoiceCfg { + beaconCfg *clparams.BeaconChainConfig, state *state.BeaconState, executionClient *execution_client.ExecutionClient, gossipManager *network2.GossipManager, + forkChoice *forkchoice.ForkChoiceStore, caplinFreezer freezer.Freezer) StageForkChoiceCfg { return StageForkChoiceCfg{ db: db, downloader: downloader, @@ -46,6 +56,7 @@ func StageForkChoice(db kv.RwDB, downloader *network2.ForwardBeaconDownloader, g executionClient: executionClient, gossipManager: gossipManager, forkChoice: forkChoice, + caplinFreezer: caplinFreezer, } } @@ -94,6 +105,10 @@ func startDownloadService(s *stagedsync.StageState, cfg StageForkChoiceCfg) { cfg.downloader.SetHighestProcessedSlot(cfg.state.Slot()) cfg.downloader.SetProcessFunction(func(highestSlotProcessed uint64, _ libcommon.Hash, newBlocks []*cltypes.SignedBeaconBlock) (uint64, libcommon.Hash, error) { for _, block := range newBlocks { + if err := freezer.PutObjectSSZIntoFreezer("downloaded_signedBeaconBlock", "caplin_core", block.Block.Slot, block, cfg.caplinFreezer); err != nil { + return highestSlotProcessed, libcommon.Hash{}, err + } + sendForckchoice := utils.GetCurrentSlot(cfg.genesisCfg.GenesisTime, cfg.beaconCfg.SecondsPerSlot) == block.Block.Slot if err := cfg.forkChoice.OnBlock(block, false, true); err != nil { diff --git a/cl/phase1/stages/stages.go b/cl/phase1/stages/stages.go index 95136f3f3..4004fb9d3 100644 --- a/cl/phase1/stages/stages.go +++ b/cl/phase1/stages/stages.go @@ -81,7 +81,7 @@ func NewConsensusStagedSync(ctx context.Context, ctx, StageHistoryReconstruction(db, backwardDownloader, genesisCfg, beaconCfg, beaconDBCfg, state, tmpdir, executionClient), StageBeaconState(db, beaconCfg, state, executionClient), - StageForkChoice(db, forwardDownloader, genesisCfg, beaconCfg, state, executionClient, gossipManager, forkChoice), + StageForkChoice(db, forwardDownloader, genesisCfg, beaconCfg, state, executionClient, gossipManager, forkChoice, nil), ), ConsensusUnwindOrder, ConsensusPruneOrder, diff --git a/cl/spectest/consensus_tests/fork_choice.go b/cl/spectest/consensus_tests/fork_choice.go index fd794841e..6ee5c4e5c 100644 --- a/cl/spectest/consensus_tests/fork_choice.go +++ b/cl/spectest/consensus_tests/fork_choice.go @@ -150,7 +150,7 @@ func (b *ForkChoice) Run(t *testing.T, root fs.FS, c spectest.TestCase) (err err anchorState, err := spectest.ReadBeaconState(root, c.Version(), "anchor_state.ssz_snappy") require.NoError(t, err) - forkStore, err := forkchoice.NewForkChoiceStore(anchorState, nil, false) + forkStore, err := forkchoice.NewForkChoiceStore(anchorState, nil, nil, false) require.NoError(t, err) var steps []ForkChoiceStep diff --git a/cmd/caplin-phase1/caplin1/run.go b/cmd/caplin-phase1/caplin1/run.go index 9b85c0ce7..e54ccbd45 100644 --- a/cmd/caplin-phase1/caplin1/run.go +++ b/cmd/caplin-phase1/caplin1/run.go @@ -4,6 +4,7 @@ import ( "context" "github.com/ledgerwatch/erigon/cl/cltypes/solid" + "github.com/ledgerwatch/erigon/cl/freezer" "github.com/ledgerwatch/erigon/cl/phase1/core/state" "github.com/ledgerwatch/erigon/cl/phase1/execution_client" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" @@ -19,11 +20,12 @@ import ( "github.com/ledgerwatch/erigon/eth/stagedsync" ) -func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, beaconConfig *clparams.BeaconChainConfig, genesisConfig *clparams.GenesisConfig, engine execution_client.ExecutionEngine, state *state.BeaconState) error { +func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, beaconConfig *clparams.BeaconChainConfig, genesisConfig *clparams.GenesisConfig, + engine execution_client.ExecutionEngine, state *state.BeaconState, caplinFreezer freezer.Freezer) error { beaconRpc := rpc.NewBeaconRpcP2P(ctx, sentinel, beaconConfig, genesisConfig) downloader := network2.NewForwardBeaconDownloader(ctx, beaconRpc) - forkChoice, err := forkchoice.NewForkChoiceStore(state, engine, true) + forkChoice, err := forkchoice.NewForkChoiceStore(state, engine, caplinFreezer, true) if err != nil { log.Error("Could not create forkchoice", "err", err) return err @@ -36,6 +38,6 @@ func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, beac } return true }) - gossipManager := network2.NewGossipReceiver(ctx, sentinel, forkChoice, beaconConfig, genesisConfig) - return stages.SpawnStageForkChoice(stages.StageForkChoice(nil, downloader, genesisConfig, beaconConfig, state, nil, gossipManager, forkChoice), &stagedsync.StageState{ID: "Caplin"}, nil, ctx) + gossipManager := network2.NewGossipReceiver(ctx, sentinel, forkChoice, beaconConfig, genesisConfig, caplinFreezer) + return stages.SpawnStageForkChoice(stages.StageForkChoice(nil, downloader, genesisConfig, beaconConfig, state, nil, gossipManager, forkChoice, caplinFreezer), &stagedsync.StageState{ID: "Caplin"}, nil, ctx) } diff --git a/cmd/caplin-phase1/main.go b/cmd/caplin-phase1/main.go index ea53e7f2b..66d6cdf6a 100644 --- a/cmd/caplin-phase1/main.go +++ b/cmd/caplin-phase1/main.go @@ -18,6 +18,7 @@ import ( "fmt" "os" + "github.com/ledgerwatch/erigon/cl/freezer" "github.com/ledgerwatch/erigon/cl/phase1/core" "github.com/ledgerwatch/erigon/cl/phase1/core/state" "github.com/ledgerwatch/erigon/cl/phase1/execution_client" @@ -111,5 +112,12 @@ func runCaplinNode(cliCtx *cli.Context) error { defer cc.Close() engine = execution_client.NewExecutionEnginePhase1FromClient(ctx, remote.NewETHBACKENDClient(cc)) } - return caplin1.RunCaplinPhase1(ctx, sentinel, cfg.BeaconCfg, cfg.GenesisCfg, engine, state) + var caplinFreezer freezer.Freezer + if cfg.RecordMode { + caplinFreezer = &freezer.RootPathOsFs{ + Root: cfg.RecordDir, + } + } + + return caplin1.RunCaplinPhase1(ctx, sentinel, cfg.BeaconCfg, cfg.GenesisCfg, engine, state, caplinFreezer) } diff --git a/cmd/sentinel/cli/cliSettings.go b/cmd/sentinel/cli/cliSettings.go index 74cc52eaf..6578ad6e6 100644 --- a/cmd/sentinel/cli/cliSettings.go +++ b/cmd/sentinel/cli/cliSettings.go @@ -34,6 +34,8 @@ type ConsensusClientCliCfg struct { TransitionChain bool `json:"transitionChain"` NetworkType clparams.NetworkType `json:"networkType"` InitialSync bool `json:"initialSync"` + RecordMode bool `json:"recordMode"` + RecordDir string `json:"recordDir"` InitalState *state.BeaconState } @@ -71,6 +73,9 @@ func SetupConsensusClientCfg(ctx *cli.Context) (*ConsensusClientCliCfg, error) { cfg.ServerAddr = fmt.Sprintf("%s:%d", ctx.String(flags.SentinelServerAddr.Name), ctx.Int(flags.SentinelServerPort.Name)) cfg.ServerProtocol = "tcp" + cfg.RecordMode = ctx.Bool(flags.RecordModeFlag.Name) + cfg.RecordDir = ctx.String(flags.RecordModeDir.Name) + cfg.Port = uint(ctx.Int(flags.SentinelDiscoveryPort.Name)) cfg.Addr = ctx.String(flags.SentinelDiscoveryAddr.Name) diff --git a/cmd/sentinel/cli/flags/defaultFlags.go b/cmd/sentinel/cli/flags/defaultFlags.go index 244c9045a..25d6f8afc 100644 --- a/cmd/sentinel/cli/flags/defaultFlags.go +++ b/cmd/sentinel/cli/flags/defaultFlags.go @@ -19,4 +19,6 @@ var CLDefaultFlags = []cli.Flag{ &SentinelStaticPeersFlag, &TransitionChainFlag, &InitSyncFlag, + &RecordModeDir, + &RecordModeFlag, } diff --git a/cmd/sentinel/cli/flags/flags.go b/cmd/sentinel/cli/flags/flags.go index 6e1b9614c..45e634789 100644 --- a/cmd/sentinel/cli/flags/flags.go +++ b/cmd/sentinel/cli/flags/flags.go @@ -87,4 +87,14 @@ var ( Name: "initial-sync", Usage: "use initial-sync", } + RecordModeFlag = cli.BoolFlag{ + Value: false, + Name: "record-mode", + Usage: "enable/disable record mode", + } + RecordModeDir = cli.StringFlag{ + Value: "caplin-recordings", + Name: "record-dir", + Usage: "directory for states and block recordings", + } ) diff --git a/eth/backend.go b/eth/backend.go index 2478641b6..b4b4f01d6 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -614,7 +614,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere return nil, err } - go caplin1.RunCaplinPhase1(ctx, client, beaconCfg, genesisCfg, engine, state) + go caplin1.RunCaplinPhase1(ctx, client, beaconCfg, genesisCfg, engine, state, nil) } if currentBlock == nil {