diff --git a/cl/clpersist/block_saver.go b/cl/clpersist/block_saver.go new file mode 100644 index 000000000..013fdf582 --- /dev/null +++ b/cl/clpersist/block_saver.go @@ -0,0 +1,52 @@ +package clpersist + +import ( + "fmt" + "os" + "path" + + "github.com/ledgerwatch/erigon/cl/clparams" + "github.com/ledgerwatch/erigon/cl/cltypes" + "github.com/ledgerwatch/erigon/cmd/sentinel/sentinel/communication/ssz_snappy" + "github.com/spf13/afero" +) + +func SaveBlockWithConfig( + fs afero.Fs, + block *cltypes.SignedBeaconBlock, + config *clparams.BeaconChainConfig, +) error { + // we define the file structure to store the block. + // + // superEpoch = floor(slot / (epochSize ^ 2)) + // epoch = floot(slot / epochSize) + // file is to be stored at + // "/signedBeaconBlock/{superEpoch}/{epoch}/{slot}.ssz_snappy" + + superEpoch := block.Block.Slot / (config.SlotsPerEpoch * config.SlotsPerEpoch) + epoch := block.Block.Slot / config.SlotsPerEpoch + + folderPath := path.Clean(fmt.Sprintf("%d/%d", superEpoch, epoch)) + // ignore this error... reason: windows + fs.MkdirAll(folderPath, 0o755) + path := path.Clean(fmt.Sprintf("%s/%d.sz", folderPath, block.Block.Slot)) + + fp, err := fs.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0o755) + if err != nil { + return err + } + defer fp.Close() + err = fp.Truncate(0) + if err != nil { + return err + } + err = ssz_snappy.EncodeAndWrite(fp, block) + if err != nil { + return err + } + err = fp.Sync() + if err != nil { + return err + } + return nil +} diff --git a/cl/clpersist/block_store.go b/cl/clpersist/block_store.go new file mode 100644 index 000000000..41e194cc5 --- /dev/null +++ b/cl/clpersist/block_store.go @@ -0,0 +1,125 @@ +package clpersist + +import ( + "context" + "sync" + + "github.com/ledgerwatch/erigon/cl/cltypes" + "github.com/ledgerwatch/erigon/cl/phase1/network" + "github.com/ledgerwatch/erigon/cl/rpc" + "github.com/ledgerwatch/erigon/cl/sentinel/peers" + "github.com/tidwall/btree" +) + +type BlockSource interface { + GetRange(ctx context.Context, from uint64, count uint64) ([]*peers.PeeredObject[*cltypes.SignedBeaconBlock], error) + PurgeRange(ctx context.Context, from uint64, count uint64) error +} + +var _ BlockSource = (*BeaconRpcSource)(nil) + +type BeaconRpcSource struct { + rpc *rpc.BeaconRpcP2P +} + +func (b *BeaconRpcSource) SaveBlocks(ctx context.Context, blocks []*peers.PeeredObject[*cltypes.SignedBeaconBlock]) error { + // it is a no-op because there is no need to do this + return nil +} + +func NewBeaconRpcSource(rpc *rpc.BeaconRpcP2P) *BeaconRpcSource { + return &BeaconRpcSource{ + rpc: rpc, + } +} + +func (b *BeaconRpcSource) GetRange(ctx context.Context, from uint64, count uint64) ([]*peers.PeeredObject[*cltypes.SignedBeaconBlock], error) { + if count == 0 { + return nil, nil + } + responses, pid, err := b.rpc.SendBeaconBlocksByRangeReq(ctx, from, count) + if err != nil { + b.rpc.BanPeer(pid) + return nil, err + } + out := make([]*peers.PeeredObject[*cltypes.SignedBeaconBlock], 0, len(responses)) + for _, v := range responses { + out = append(out, &peers.PeeredObject[*cltypes.SignedBeaconBlock]{Data: v, Peer: pid}) + } + return out, nil +} + +// a noop for rpc source since we always return new data +func (b *BeaconRpcSource) PurgeRange(ctx context.Context, from uint64, count uint64) error { + return nil +} + +var _ BlockSource = (*GossipSource)(nil) + +type GossipSource struct { + gossip *network.GossipManager + gossipBlocks <-chan *peers.PeeredObject[*cltypes.SignedBeaconBlock] + + mu sync.Mutex + blocks *btree.Map[uint64, chan *peers.PeeredObject[*cltypes.SignedBeaconBlock]] +} + +func NewGossipSource(ctx context.Context, gossip *network.GossipManager) *GossipSource { + g := &GossipSource{ + gossip: gossip, + gossipBlocks: gossip.SubscribeSignedBeaconBlocks(ctx), + blocks: btree.NewMap[uint64, chan *peers.PeeredObject[*cltypes.SignedBeaconBlock]](32), + } + go func() { + for { + select { + case <-ctx.Done(): + return + case recv := <-g.gossipBlocks: + ch := g.grabOrCreate(ctx, recv.Data.Block.Slot) + select { + case ch <- recv: + default: + } + } + } + }() + return g +} + +func (b *GossipSource) grabOrCreate(ctx context.Context, id uint64) chan *peers.PeeredObject[*cltypes.SignedBeaconBlock] { + b.mu.Lock() + defer b.mu.Unlock() + ch, ok := b.blocks.Get(id) + if !ok { + ch = make(chan *peers.PeeredObject[*cltypes.SignedBeaconBlock], 3) + b.blocks.Set(id, ch) + } + return ch +} +func (b *GossipSource) GetRange(ctx context.Context, from uint64, count uint64) ([]*peers.PeeredObject[*cltypes.SignedBeaconBlock], error) { + out := make([]*peers.PeeredObject[*cltypes.SignedBeaconBlock], 0, count) + for i := from; i < from+count; i++ { + ch := b.grabOrCreate(ctx, i) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case item := <-ch: + out = append(out, item) + } + } + return out, nil +} + +func (b *GossipSource) PurgeRange(ctx context.Context, from uint64, count uint64) error { + b.mu.Lock() + defer b.mu.Unlock() + b.blocks.AscendMut(from, func(key uint64, value chan *peers.PeeredObject[*cltypes.SignedBeaconBlock]) bool { + if key >= from+count { + return false + } + b.blocks.Delete(key) + return true + }) + return nil +} diff --git a/cl/clstages/clstages.go b/cl/clstages/clstages.go new file mode 100644 index 000000000..29aead950 --- /dev/null +++ b/cl/clstages/clstages.go @@ -0,0 +1,59 @@ +package clstages + +import ( + "context" + "fmt" + "time" + + "github.com/ledgerwatch/log/v3" +) + +type StageGraph[CONFIG any, ARGUMENTS any] struct { + ArgsFunc func(ctx context.Context, cfg CONFIG) (args ARGUMENTS) + Stages map[string]Stage[CONFIG, ARGUMENTS] +} + +type Stage[CONFIG any, ARGUMENTS any] struct { + Description string + ActionFunc func(ctx context.Context, logger log.Logger, cfg CONFIG, args ARGUMENTS) error + TransitionFunc func(cfg CONFIG, args ARGUMENTS, err error) string +} + +func (s *StageGraph[CONFIG, ARGUMENTS]) StartWithStage(ctx context.Context, startStage string, logger log.Logger, cfg CONFIG) error { + stageName := startStage + args := s.ArgsFunc(ctx, cfg) + for { + currentStage, ok := s.Stages[stageName] + if !ok { + return fmt.Errorf("attempted to transition to unknown stage: %s", stageName) + } + lg := logger.New("stage", stageName) + errch := make(chan error) + start := time.Now() + go func() { + sctx, cn := context.WithCancel(ctx) + defer cn() + // we run this is a goroutine so that the process can exit in the middle of a stage + // since caplin is designed to always be able to recover regardless of db state, this should be safe + select { + case errch <- currentStage.ActionFunc(sctx, lg, cfg, args): + case <-ctx.Done(): + errch <- ctx.Err() + } + }() + err := <-errch + dur := time.Since(start) + if err != nil { + lg.Error("error executing clstage", "err", err) + } + select { + case <-ctx.Done(): + return ctx.Err() + default: + args = s.ArgsFunc(ctx, cfg) + nextStage := currentStage.TransitionFunc(cfg, args, err) + logger.Info("clstage finish", "stage", stageName, "in", dur, "next", nextStage) + stageName = nextStage + } + } +} diff --git a/cl/phase1/forkchoice/forkchoice.go b/cl/phase1/forkchoice/forkchoice.go index b85053db1..ae4cf0806 100644 --- a/cl/phase1/forkchoice/forkchoice.go +++ b/cl/phase1/forkchoice/forkchoice.go @@ -90,6 +90,14 @@ func (f *ForkChoiceStore) HighestSeen() uint64 { return f.highestSeen } +// AdvanceHighestSeen advances the highest seen block by n and returns the new slot after the change +func (f *ForkChoiceStore) AdvanceHighestSeen(n uint64) uint64 { + f.mu.Lock() + defer f.mu.Unlock() + f.highestSeen += n + return f.highestSeen +} + // Time returns current time func (f *ForkChoiceStore) Time() uint64 { f.mu.Lock() diff --git a/cl/phase1/network/gossip_manager.go b/cl/phase1/network/gossip_manager.go index 5d6b9c7f1..afbaa05e4 100644 --- a/cl/phase1/network/gossip_manager.go +++ b/cl/phase1/network/gossip_manager.go @@ -3,11 +3,11 @@ package network import ( "context" "runtime" + "sync" - "github.com/VictoriaMetrics/metrics" - "github.com/ledgerwatch/erigon/cl/cltypes/solid" "github.com/ledgerwatch/erigon/cl/freezer" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" + "github.com/ledgerwatch/erigon/cl/sentinel/peers" libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/dbg" @@ -30,6 +30,10 @@ type GossipManager struct { // configs beaconConfig *clparams.BeaconChainConfig genesisConfig *clparams.GenesisConfig + + mu sync.RWMutex + subs map[int]chan *peers.PeeredObject[*cltypes.SignedBeaconBlock] + totalSubs int } func NewGossipReceiver(ctx context.Context, s sentinel.SentinelClient, forkChoice *forkchoice.ForkChoiceStore, @@ -41,9 +45,29 @@ func NewGossipReceiver(ctx context.Context, s sentinel.SentinelClient, forkChoic beaconConfig: beaconConfig, genesisConfig: genesisConfig, recorder: recorder, + subs: make(map[int]chan *peers.PeeredObject[*cltypes.SignedBeaconBlock]), } } +// this subscribes to signed beacon blocks..... i wish this was better +func (g *GossipManager) SubscribeSignedBeaconBlocks(ctx context.Context) <-chan *peers.PeeredObject[*cltypes.SignedBeaconBlock] { + // a really big limit because why not.... + out := make(chan *peers.PeeredObject[*cltypes.SignedBeaconBlock], 512) + g.mu.Lock() + g.totalSubs++ + idx := g.totalSubs + g.subs[idx] = out + g.mu.Unlock() + go func() { + <-ctx.Done() + g.mu.Lock() + delete(g.subs, idx) + g.mu.Unlock() + }() + return out + +} + func (g *GossipManager) onRecv(data *sentinel.GossipData, l log.Ctx) error { currentEpoch := utils.GetCurrentEpoch(g.genesisConfig.GenesisTime, g.beaconConfig.SecondsPerSlot, g.beaconConfig.SlotsPerEpoch) @@ -95,57 +119,15 @@ func (g *GossipManager) onRecv(data *sentinel.GossipData, l log.Ctx) error { return err } - peers := metrics.GetOrCreateGauge("caplin_peer_count", func() float64 { - return float64(count.Amount) - }) + g.mu.RLock() + for _, v := range g.subs { + select { + case v <- &peers.PeeredObject[*cltypes.SignedBeaconBlock]{Data: block, Peer: data.Peer.Pid}: + default: + } + } + g.mu.RUnlock() - peers.Get() - - if err := g.forkChoice.OnBlock(block, true, true); err != nil { - // if we are within a quarter of an epoch within chain tip we ban it - if currentSlotByTime < g.forkChoice.HighestSeen()+(g.beaconConfig.SlotsPerEpoch/4) { - g.sentinel.BanPeer(g.ctx, data.Peer) - } - l["at"] = "block process" - return err - } - block.Block.Body.Attestations.Range(func(idx int, a *solid.Attestation, total int) bool { - if err = g.forkChoice.OnAttestation(a, true); err != nil { - return false - } - return true - }) - if err != nil { - l["at"] = "attestation process" - return err - } - // Now check the head - headRoot, headSlot, err := g.forkChoice.GetHead() - if err != nil { - l["slot"] = block.Block.Slot - l["at"] = "fetch head data" - return err - } - // Do forkchoice if possible - if g.forkChoice.Engine() != nil { - finalizedCheckpoint := g.forkChoice.FinalizedCheckpoint() - log.Info("Caplin is sending forkchoice") - // Run forkchoice - if err := g.forkChoice.Engine().ForkChoiceUpdate( - g.forkChoice.GetEth1Hash(finalizedCheckpoint.BlockRoot()), - g.forkChoice.GetEth1Hash(headRoot), - ); err != nil { - log.Warn("Could not set forkchoice", "err", err) - l["at"] = "sending forkchoice" - return err - } - } - // Log final result - log.Debug("New gossip block imported", - "slot", block.Block.Slot, - "head", headSlot, - "headRoot", headRoot, - ) case sentinel.GossipType_VoluntaryExitGossipType: object = &cltypes.SignedVoluntaryExit{} if err := object.DecodeSSZ(data.Data, int(version)); err != nil { diff --git a/cl/phase1/stages/clstages.go b/cl/phase1/stages/clstages.go new file mode 100644 index 000000000..ec6ea4f88 --- /dev/null +++ b/cl/phase1/stages/clstages.go @@ -0,0 +1,453 @@ +package stages + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "time" + + "github.com/ledgerwatch/erigon/cl/clparams" + "github.com/ledgerwatch/erigon/cl/clpersist" + "github.com/ledgerwatch/erigon/cl/clstages" + "github.com/ledgerwatch/erigon/cl/cltypes" + "github.com/ledgerwatch/erigon/cl/phase1/core/state" + "github.com/ledgerwatch/erigon/cl/phase1/execution_client" + "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" + network2 "github.com/ledgerwatch/erigon/cl/phase1/network" + "github.com/ledgerwatch/erigon/cl/rpc" + "github.com/ledgerwatch/erigon/cl/sentinel/peers" + "github.com/ledgerwatch/erigon/cl/utils" + "github.com/ledgerwatch/log/v3" + "github.com/spf13/afero" + "golang.org/x/sync/errgroup" +) + +type Cfg struct { + rpc *rpc.BeaconRpcP2P + genesisCfg *clparams.GenesisConfig + beaconCfg *clparams.BeaconChainConfig + executionClient *execution_client.ExecutionClient + state *state.CachingBeaconState + gossipManager *network2.GossipManager + forkChoice *forkchoice.ForkChoiceStore + dataDirFs afero.Fs +} + +type Args struct { + peers uint64 + + targetEpoch, seenEpoch uint64 + targetSlot, seenSlot uint64 +} + +func ClStagesCfg( + rpc *rpc.BeaconRpcP2P, + genesisCfg *clparams.GenesisConfig, + beaconCfg *clparams.BeaconChainConfig, + state *state.CachingBeaconState, + executionClient *execution_client.ExecutionClient, + gossipManager *network2.GossipManager, + forkChoice *forkchoice.ForkChoiceStore, + dataDirFs afero.Fs, +) *Cfg { + return &Cfg{ + rpc: rpc, + genesisCfg: genesisCfg, + beaconCfg: beaconCfg, + state: state, + executionClient: executionClient, + gossipManager: gossipManager, + forkChoice: forkChoice, + dataDirFs: dataDirFs, + } +} + +type StageName = string + +const ( + WaitForPeers StageName = "WaitForPeers" + CatchUpEpochs StageName = "CatchUpEpochs" + CatchUpBlocks StageName = "CatchUpBlocks" + ForkChoice StageName = "ForkChoice" + ListenForForks StageName = "ListenForForks" + CleanupAndPruning StageName = "CleanupAndPruning" + SleepForSlot StageName = "SleepForSlot" +) + +const ( + minPeersForDownload = uint64(4) +) + +func MetaCatchingUp(args Args) string { + if args.peers < minPeersForDownload { + return WaitForPeers + } + if args.seenEpoch < args.targetEpoch { + return CatchUpEpochs + } + if args.seenSlot < args.targetSlot { + return CatchUpBlocks + } + return "" +} + +/* + +this graph describes the state transitions for cl + +digraph { + compound=true; + subgraph cluster_0 { + label="syncing"; + WaitForPeers; + CatchUpBlocks; + CatchUpEpochs; + } + + subgraph cluster_3 { + label="if behind (transition function)" + MetaCatchingUp; + } + + subgraph cluster_1 { + label="head"; + ForkChoice; CleanupAndPruning; ListenForForks; SleepForSlot; + } + + MetaCatchingUp -> WaitForPeers + MetaCatchingUp -> CatchUpEpochs + MetaCatchingUp -> CatchUpBlocks + + WaitForPeers -> MetaCatchingUp[lhead=cluster_3] + CatchUpEpochs -> MetaCatchingUp[lhead=cluster_3] + CatchUpBlocks -> MetaCatchingUp[lhead=cluster_3] + CleanupAndPruning -> MetaCatchingUp[lhead=cluster_3] + ListenForForks -> MetaCatchingUp[lhead=cluster_3] + ForkChoice -> MetaCatchingUp[lhead=cluster_3] + + CatchUpBlocks -> ForkChoice + ForkChoice -> ListenForForks + + SleepForSlot -> WaitForPeers + + ListenForForks -> ForkChoice + ListenForForks -> SleepForSlot + ListenForForks -> CleanupAndPruning + CleanupAndPruning -> SleepForSlot +} +*/ + +// ConsensusClStages creates a stage loop container to be used to run caplin +func ConsensusClStages(ctx context.Context, + cfg *Cfg, +) *clstages.StageGraph[*Cfg, Args] { + rpcSource := clpersist.NewBeaconRpcSource(cfg.rpc) + gossipSource := clpersist.NewGossipSource(ctx, cfg.gossipManager) + processBlock := func(block *peers.PeeredObject[*cltypes.SignedBeaconBlock], newPayload, fullValidation bool) error { + if err := cfg.forkChoice.OnBlock(block.Data, newPayload, fullValidation); err != nil { + log.Warn("fail to process block", "reason", err, "slot", block.Data.Block.Slot) + cfg.rpc.BanPeer(block.Peer) + return err + } + // NOTE: this error is ignored and logged only! + err := clpersist.SaveBlockWithConfig(afero.NewBasePathFs(cfg.dataDirFs, "caplin/beacon"), block.Data, cfg.beaconCfg) + if err != nil { + log.Error("failed to persist block to store", "slot", block.Data.Block.Slot, "err", err) + } + return nil + } + + // TODO: this is an ugly hack, but it works! Basically, we want shared state in the clstages. + // Probably the correct long term solution is to create a third generic parameter that defines shared state + // but for now, all it would have are the two gossip sources and the forkChoicesSinceReorg, so i don't think its worth it (yet). + shouldForkChoiceSinceReorg := false + + // clstages run in a single thread - so we don't need to worry about any synchronization. + return &clstages.StageGraph[*Cfg, Args]{ + // the ArgsFunc is run after every stage. It is passed into the transition function, and the same args are passed into the next stage. + ArgsFunc: func(ctx context.Context, cfg *Cfg) (args Args) { + var err error + args.peers, err = cfg.rpc.Peers() + if err != nil { + log.Error("failed to get sentinel peer count", "err", err) + args.peers = 0 + } + args.seenSlot = cfg.forkChoice.HighestSeen() + args.seenEpoch = args.seenSlot / cfg.beaconCfg.SlotsPerEpoch + args.targetSlot = utils.GetCurrentSlot(cfg.genesisCfg.GenesisTime, cfg.beaconCfg.SecondsPerSlot) + // Note that the target epoch is always one behind. this is because we are always behind in the current epoch, so it would not be very useful + args.targetEpoch = utils.GetCurrentEpoch(cfg.genesisCfg.GenesisTime, cfg.beaconCfg.SecondsPerSlot, cfg.beaconCfg.SlotsPerEpoch) - 1 + return + }, + Stages: map[string]clstages.Stage[*Cfg, Args]{ + WaitForPeers: { + Description: `wait for enough peers. This is also a safe stage to go to when unsure of what stage to use`, + TransitionFunc: func(cfg *Cfg, args Args, err error) string { + if x := MetaCatchingUp(args); x != "" { + return x + } + return CatchUpBlocks + }, + ActionFunc: func(ctx context.Context, logger log.Logger, cfg *Cfg, args Args) error { + peersCount, err := cfg.rpc.Peers() + if err != nil { + return nil + } + waitWhenNotEnoughPeers := 3 * time.Second + for { + if peersCount > minPeersForDownload { + break + } + logger.Debug("[Caplin] Waiting For Peers", "have", peersCount, "needed", minPeersForDownload, "retryIn", waitWhenNotEnoughPeers) + time.Sleep(waitWhenNotEnoughPeers) + peersCount, err = cfg.rpc.Peers() + if err != nil { + peersCount = 0 + } + } + return nil + }, + }, + CatchUpEpochs: { + Description: `if we are 1 or more epochs behind, we download in parallel by epoch`, + TransitionFunc: func(cfg *Cfg, args Args, err error) string { + if x := MetaCatchingUp(args); x != "" { + return x + } + return CatchUpBlocks + }, + ActionFunc: func(ctx context.Context, logger log.Logger, cfg *Cfg, args Args) error { + totalEpochs := args.targetEpoch - args.seenEpoch + logger = logger.New( + "slot", fmt.Sprintf("%d/%d", args.seenSlot, args.targetSlot), + "epoch", fmt.Sprintf("%d/%d(%d)", args.seenEpoch, args.targetEpoch, (1+args.targetEpoch)*cfg.beaconCfg.SlotsPerEpoch-1), + ) + logger.Info("downloading epochs from reqresp") + ctx, cn := context.WithTimeout(ctx, time.Duration(cfg.beaconCfg.SecondsPerSlot*cfg.beaconCfg.SlotsPerEpoch)*time.Second) + defer cn() + counter := atomic.Int64{} + // now we download the missing blocks + chans := make([]chan []*peers.PeeredObject[*cltypes.SignedBeaconBlock], 0, totalEpochs) + ctx, cn = context.WithCancel(ctx) + egg, ctx := errgroup.WithContext(ctx) + + egg.SetLimit(3) + defer cn() + for i := args.seenEpoch; i <= args.targetEpoch; i = i + 1 { + startBlock := i * cfg.beaconCfg.SlotsPerEpoch + o := make(chan []*peers.PeeredObject[*cltypes.SignedBeaconBlock], 0) + chans = append(chans, o) + egg.Go(func() error { + blocks, err := rpcSource.GetRange(ctx, startBlock, cfg.beaconCfg.SlotsPerEpoch) + if err != nil { + return err + } + logger.Info("downloading epochs from reqresp", "progress", fmt.Sprintf("%d", int(100*(float64(counter.Add(1))/float64(totalEpochs+1))))+"%") + o <- blocks + return nil + }) + } + errchan := make(chan error, 1) + go func() { + defer func() { + errchan <- nil + }() + for _, v := range chans { + select { + case <-ctx.Done(): + return + case epochResp := <-v: + for _, block := range epochResp { + if block.Data.Block.Slot <= args.seenSlot { + continue + } + err := processBlock(block, false, false) + if err != nil { + errchan <- err + return + } + } + } + } + }() + go func() { + // if any error, lets just return the error and retry. we will make any progress we did... but we should really make sure all parts succeed when catching up + err := egg.Wait() + if err != nil { + errchan <- err + } + }() + return <-errchan + }, + }, + CatchUpBlocks: { + Description: `if we are within the epoch but not at head, we run catchupblocks`, + TransitionFunc: func(cfg *Cfg, args Args, err error) string { + if x := MetaCatchingUp(args); x != "" { + return x + } + return ForkChoice + }, + ActionFunc: func(ctx context.Context, logger log.Logger, cfg *Cfg, args Args) error { + totalRequest := args.targetSlot - args.seenSlot + logger.Info("waiting for blocks...", + "seenSlot", args.seenSlot, + "targetSlot", args.targetSlot, + "requestedSlots", totalRequest, + ) + respCh := make(chan []*peers.PeeredObject[*cltypes.SignedBeaconBlock]) + errCh := make(chan error) + sources := []clpersist.BlockSource{gossipSource, rpcSource} + // the timeout is equal to the amount of blocks to fetch multiplied by the seconds per slot + ctx, cn := context.WithTimeout(ctx, time.Duration(cfg.beaconCfg.SecondsPerSlot*totalRequest)*time.Second) + defer cn() + // we go ask all the sources and see who gets back to us first. whoever does is the winner!! + for _, v := range sources { + sourceFunc := v.GetRange + go func() { + blocks, err := sourceFunc(ctx, args.seenSlot+1, totalRequest) + if err != nil { + errCh <- err + return + } + respCh <- blocks + }() + } + select { + case err := <-errCh: + return err + case blocks := <-respCh: + for _, block := range blocks { + if err := processBlock(block, true, true); err != nil { + return err + } + logger.Info("block processed", "slot", block.Data.Block.Slot) + } + } + return nil + }, + }, + ForkChoice: { + Description: `fork choice stage. We will send all fork choise things here + also, we will wait up to delay seconds to deal with attestations + side forks`, + TransitionFunc: func(cfg *Cfg, args Args, err error) string { + if x := MetaCatchingUp(args); x != "" { + return x + } + return ListenForForks + }, + ActionFunc: func(ctx context.Context, logger log.Logger, cfg *Cfg, args Args) error { + + // TODO: we need to get the last run block in order to process attestations here + ////////block.Block.Body.Attestations.Range(func(idx int, a *solid.Attestation, total int) bool { + //////// if err = g.forkChoice.OnAttestation(a, true); err != nil { + //////// return false + //////// } + //////// return true + ////////}) + ////////if err != nil { + //////// return err + ////////} + + // Now check the head + headRoot, _, err := cfg.forkChoice.GetHead() + if err != nil { + return err + } + + // Do forkchoice if possible + if cfg.forkChoice.Engine() != nil { + finalizedCheckpoint := cfg.forkChoice.FinalizedCheckpoint() + logger.Info("Caplin is sending forkchoice") + // Run forkchoice + if err := cfg.forkChoice.Engine().ForkChoiceUpdate( + cfg.forkChoice.GetEth1Hash(finalizedCheckpoint.BlockRoot()), + cfg.forkChoice.GetEth1Hash(headRoot), + ); err != nil { + logger.Warn("Could not set forkchoice", "err", err) + return err + } + } + return nil + }, + }, + ListenForForks: { + TransitionFunc: func(cfg *Cfg, args Args, err error) string { + defer func() { + shouldForkChoiceSinceReorg = false + }() + if x := MetaCatchingUp(args); x != "" { + return x + } + if shouldForkChoiceSinceReorg { + return ForkChoice + } + if args.seenSlot%cfg.beaconCfg.SlotsPerEpoch == 0 { + return CleanupAndPruning + } + return SleepForSlot + }, + ActionFunc: func(ctx context.Context, logger log.Logger, cfg *Cfg, args Args) error { + slotTime := utils.GetSlotTime(cfg.genesisCfg.GenesisTime, cfg.beaconCfg.SecondsPerSlot, args.targetSlot).Add( + time.Duration(cfg.beaconCfg.SecondsPerSlot) * (time.Second / 3), + ) + waitDur := slotTime.Sub(time.Now()) + ctx, cn := context.WithTimeout(ctx, waitDur) + defer cn() + // try to get the current block + blocks, err := gossipSource.GetRange(ctx, args.seenSlot, 1) + if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return nil + } + return err + } + for _, block := range blocks { + err := processBlock(block, false, true) + if err != nil { + // its okay if block processing fails + logger.Warn("reorg block failed validation", "err", err) + return nil + } + shouldForkChoiceSinceReorg = true + logger.Warn("possible reorg/missed slot", "slot", args.seenSlot) + } + return nil + }, + }, + CleanupAndPruning: { + Description: `cleanup and pruning is done here`, + TransitionFunc: func(cfg *Cfg, args Args, err error) string { + if x := MetaCatchingUp(args); x != "" { + return x + } + return SleepForSlot + }, + ActionFunc: func(ctx context.Context, logger log.Logger, cfg *Cfg, args Args) error { + // clean up some old ranges + err := gossipSource.PurgeRange(ctx, 1, args.seenSlot-cfg.beaconCfg.SlotsPerEpoch*16) + if err != nil { + return err + } + //TODO: probably can clear old superepoch in fs here as well! + return nil + }, + }, + SleepForSlot: { + Description: `sleep until the next slot`, + TransitionFunc: func(cfg *Cfg, args Args, err error) string { + return WaitForPeers + }, + ActionFunc: func(ctx context.Context, logger log.Logger, cfg *Cfg, args Args) error { + nextSlot := args.seenSlot + 1 + nextSlotTime := utils.GetSlotTime(cfg.genesisCfg.GenesisTime, cfg.beaconCfg.SecondsPerSlot, nextSlot) + nextSlotDur := nextSlotTime.Sub(time.Now()) + logger.Info("sleeping until next slot", "slot", nextSlot, "time", nextSlotTime, "dur", nextSlotDur) + time.Sleep(nextSlotDur) + return nil + }, + }, + }, + } +} diff --git a/cl/phase1/stages/stage_fork_choice.go b/cl/phase1/stages/stage_fork_choice.go deleted file mode 100644 index 8f9d2bfd3..000000000 --- a/cl/phase1/stages/stage_fork_choice.go +++ /dev/null @@ -1,231 +0,0 @@ -package stages - -import ( - "context" - "runtime" - "time" - - "github.com/ledgerwatch/erigon/cl/freezer" - "github.com/ledgerwatch/erigon/cl/phase1/core/state" - "github.com/ledgerwatch/erigon/cl/phase1/execution_client" - "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" - network2 "github.com/ledgerwatch/erigon/cl/phase1/network" - - libcommon "github.com/ledgerwatch/erigon-lib/common" - "github.com/ledgerwatch/erigon-lib/common/dbg" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/log/v3" - - "github.com/ledgerwatch/erigon/cl/clparams" - "github.com/ledgerwatch/erigon/cl/cltypes" - "github.com/ledgerwatch/erigon/cl/utils" - "github.com/ledgerwatch/erigon/eth/stagedsync" -) - -type StageForkChoiceCfg struct { - db kv.RwDB - downloader *network2.ForwardBeaconDownloader - genesisCfg *clparams.GenesisConfig - beaconCfg *clparams.BeaconChainConfig - executionClient *execution_client.ExecutionClient - state *state.CachingBeaconState - gossipManager *network2.GossipManager - forkChoice *forkchoice.ForkChoiceStore - caplinFreezer freezer.Freezer -} - -const minPeersForDownload = 2 -const minPeersForSyncStart = 4 - -var ( - freezerNameSpacePrefix = "" - blockObjectName = "singedBeaconBlock" - stateObjectName = "beaconState" - gossipAction = "gossip" -) - -func StageForkChoice(db kv.RwDB, downloader *network2.ForwardBeaconDownloader, genesisCfg *clparams.GenesisConfig, - beaconCfg *clparams.BeaconChainConfig, state *state.CachingBeaconState, executionClient *execution_client.ExecutionClient, gossipManager *network2.GossipManager, - forkChoice *forkchoice.ForkChoiceStore, caplinFreezer freezer.Freezer) StageForkChoiceCfg { - return StageForkChoiceCfg{ - db: db, - downloader: downloader, - genesisCfg: genesisCfg, - beaconCfg: beaconCfg, - state: state, - executionClient: executionClient, - gossipManager: gossipManager, - forkChoice: forkChoice, - caplinFreezer: caplinFreezer, - } -} - -// StageForkChoice enables the fork choice state. it is never supposed to exit this stage once it gets in. -func SpawnStageForkChoice(cfg StageForkChoiceCfg, s *stagedsync.StageState, tx kv.RwTx, ctx context.Context) error { - /*useExternalTx := tx != nil - var err error - if !useExternalTx { - tx, err = cfg.db.BeginRw(ctx) - if err != nil { - return err - } - defer tx.Rollback() - }*/ - // Start download service - log.Info("Started Ethereum 2.0 Gossip Service") - // We start gossip management. - go cfg.gossipManager.Start() - go onTickService(ctx, cfg) - go func() { - logIntervalPeers := time.NewTicker(1 * time.Minute) - for { - select { - case <-logIntervalPeers.C: - if peerCount, err := cfg.downloader.Peers(); err == nil { - log.Info("[Caplin] P2P", "peers", peerCount) - - } - case <-ctx.Done(): - return - } - - } - }() - startDownloadService(s, cfg) - /*if !useExternalTx { - if err = tx.Commit(); err != nil { - return err - } - }*/ - return nil -} - -func startDownloadService(s *stagedsync.StageState, cfg StageForkChoiceCfg) { - cfg.downloader.SetHighestProcessedRoot(libcommon.Hash{}) - cfg.downloader.SetHighestProcessedSlot(cfg.state.Slot()) - cfg.downloader.SetProcessFunction(func(highestSlotProcessed uint64, _ libcommon.Hash, newBlocks []*cltypes.SignedBeaconBlock) (uint64, libcommon.Hash, error) { - for _, block := range newBlocks { - if err := freezer.PutObjectSSZIntoFreezer("signedBeaconBlock", "caplin_core", block.Block.Slot, block, cfg.caplinFreezer); err != nil { - return highestSlotProcessed, libcommon.Hash{}, err - } - - sendForckchoice := - utils.GetCurrentSlot(cfg.genesisCfg.GenesisTime, cfg.beaconCfg.SecondsPerSlot) == block.Block.Slot - if err := cfg.forkChoice.OnBlock(block, sendForckchoice, true); err != nil { - log.Warn("Could not download block", "reason", err, "slot", block.Block.Slot) - return highestSlotProcessed, libcommon.Hash{}, err - } - highestSlotProcessed = utils.Max64(block.Block.Slot, highestSlotProcessed) - if sendForckchoice { - var m runtime.MemStats - dbg.ReadMemStats(&m) - // Import the head - headRoot, headSlot, err := cfg.forkChoice.GetHead() - - log.Debug("New block imported", - "slot", block.Block.Slot, - "head", headSlot, - "headRoot", headRoot, - "alloc/sys", libcommon.ByteCount(m.Alloc)+"/"+libcommon.ByteCount(m.Sys), - "numGC", m.NumGC, - ) - if err != nil { - log.Debug("Could not fetch head data", - "slot", block.Block.Slot, - "err", err) - continue - } - - // Do forkchoice if possible - if cfg.forkChoice.Engine() != nil { - finalizedCheckpoint := cfg.forkChoice.FinalizedCheckpoint() - log.Info("Caplin is sending forkchoice") - // Run forkchoice - if err := cfg.forkChoice.Engine().ForkChoiceUpdate( - cfg.forkChoice.GetEth1Hash(finalizedCheckpoint.BlockRoot()), - cfg.forkChoice.GetEth1Hash(headRoot), - ); err != nil { - log.Warn("Could not set forkchoice", "err", err) - } - } - } - } - // Checks done, update all internals accordingly - return highestSlotProcessed, libcommon.Hash{}, nil - }) - maxBlockBehindBeforeDownload := int64(32) - overtimeMargin := uint64(6) // how much time has passed before trying download the next block in seconds - ctx := context.TODO() - isDownloading := false -MainLoop: - for { - targetSlot := utils.GetCurrentSlot(cfg.genesisCfg.GenesisTime, cfg.beaconCfg.SecondsPerSlot) - overtime := utils.GetCurrentSlotOverTime(cfg.genesisCfg.GenesisTime, cfg.beaconCfg.SecondsPerSlot) - seenSlot := cfg.forkChoice.HighestSeen() - if targetSlot == seenSlot || (targetSlot == seenSlot+1 && overtime < overtimeMargin) { - time.Sleep(time.Second) - continue - } - peersCount, err := cfg.downloader.Peers() - if err != nil { - continue - } - waitWhenNotEnoughPeers := 5 * time.Second - if !isDownloading { - isDownloading = peersCount >= minPeersForSyncStart - } - if isDownloading { - isDownloading = peersCount >= minPeersForDownload - if !isDownloading { - log.Debug("[Caplin] Lost too many peers", "have", peersCount, "needed", minPeersForDownload) - } - } - if !isDownloading { - log.Debug("[Caplin] Waiting For Peers", "have", peersCount, "needed", minPeersForSyncStart, "retryIn", waitWhenNotEnoughPeers) - time.Sleep(waitWhenNotEnoughPeers) - continue - } - highestSeen := cfg.forkChoice.HighestSeen() - startDownloadSlot := highestSeen - uint64(maxBlockBehindBeforeDownload) - // Detect underflow - if startDownloadSlot > highestSeen { - startDownloadSlot = 0 - } - - cfg.downloader.SetHighestProcessedRoot(libcommon.Hash{}) - cfg.downloader.SetHighestProcessedSlot( - utils.Max64(startDownloadSlot, cfg.forkChoice.FinalizedSlot())) - - // Wait small time - log.Debug("Caplin may have missed some slots, started downloading chain") - // Process blocks until we reach our target - for highestProcessed := cfg.downloader.GetHighestProcessedSlot(); utils.GetCurrentSlot(cfg.genesisCfg.GenesisTime, cfg.beaconCfg.SecondsPerSlot) > highestProcessed; highestProcessed = cfg.downloader.GetHighestProcessedSlot() { - ctx, cancel := context.WithTimeout(ctx, 12*time.Second) - cfg.downloader.RequestMore(ctx) - cancel() - peersCount, err = cfg.downloader.Peers() - if err != nil { - break - } - if utils.GetCurrentSlot(cfg.genesisCfg.GenesisTime, cfg.beaconCfg.SecondsPerSlot) == cfg.forkChoice.HighestSeen() { - break - } - if peersCount < minPeersForDownload { - continue MainLoop - } - } - log.Debug("Finished catching up", "slot", cfg.downloader.GetHighestProcessedSlot()) - } -} - -func onTickService(ctx context.Context, cfg StageForkChoiceCfg) { - tickInterval := time.NewTicker(50 * time.Millisecond) - for { - select { - case <-tickInterval.C: - cfg.forkChoice.OnTick(uint64(time.Now().Unix())) - case <-ctx.Done(): - return - } - } -} diff --git a/cl/phase1/stages/stage_history_reconstruction.go b/cl/phase1/stages/stage_history_reconstruction.go deleted file mode 100644 index 979722771..000000000 --- a/cl/phase1/stages/stage_history_reconstruction.go +++ /dev/null @@ -1,220 +0,0 @@ -package stages - -import ( - "context" - "fmt" - "time" - - rawdb2 "github.com/ledgerwatch/erigon/cl/phase1/core/rawdb" - "github.com/ledgerwatch/erigon/cl/phase1/core/state" - execution_client2 "github.com/ledgerwatch/erigon/cl/phase1/execution_client" - "github.com/ledgerwatch/erigon/cl/phase1/network" - "github.com/ledgerwatch/erigon/cl/utils" - - "github.com/ledgerwatch/erigon-lib/etl" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/cl/clparams" - "github.com/ledgerwatch/erigon/cl/cltypes" - "github.com/ledgerwatch/erigon/eth/stagedsync" - "github.com/ledgerwatch/log/v3" -) - -type StageHistoryReconstructionCfg struct { - db kv.RwDB - genesisCfg *clparams.GenesisConfig - beaconCfg *clparams.BeaconChainConfig - downloader *network.BackwardBeaconDownloader - state *state.CachingBeaconState - executionClient *execution_client2.ExecutionClient - beaconDBCfg *rawdb2.BeaconDataConfig - tmpdir string -} - -const logIntervalTime = 30 * time.Second - -func StageHistoryReconstruction(db kv.RwDB, downloader *network.BackwardBeaconDownloader, genesisCfg *clparams.GenesisConfig, beaconCfg *clparams.BeaconChainConfig, beaconDBCfg *rawdb2.BeaconDataConfig, state *state.CachingBeaconState, tmpdir string, executionClient *execution_client2.ExecutionClient) StageHistoryReconstructionCfg { - return StageHistoryReconstructionCfg{ - db: db, - genesisCfg: genesisCfg, - beaconCfg: beaconCfg, - downloader: downloader, - state: state, - tmpdir: tmpdir, - executionClient: executionClient, - beaconDBCfg: beaconDBCfg, - } -} - -// SpawnStageBeaconsForward spawn the beacon forward stage -func SpawnStageHistoryReconstruction(cfg StageHistoryReconstructionCfg, s *stagedsync.StageState, tx kv.RwTx, ctx context.Context, logger log.Logger) error { - // This stage must be done only once. - progress := s.BlockNumber - if progress != 0 { - return nil - } - - useExternalTx := tx != nil - var err error - if !useExternalTx { - tx, err = cfg.db.BeginRw(ctx) - if err != nil { - return err - } - defer tx.Rollback() - } - blockRoot, err := cfg.state.BlockRoot() - if err != nil { - return err - } - destinationSlot := uint64(0) - currentSlot := cfg.state.LatestBlockHeader().Slot - if currentSlot > cfg.beaconDBCfg.BackFillingAmount { - destinationSlot = currentSlot - cfg.beaconDBCfg.BackFillingAmount - } - - // ETL collectors for attestations + beacon blocks - beaconBlocksCollector := etl.NewCollector(s.LogPrefix(), cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), logger) - defer beaconBlocksCollector.Close() - attestationsCollector := etl.NewCollector(s.LogPrefix(), cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), logger) - defer attestationsCollector.Close() - executionPayloadsCollector := etl.NewCollector(s.LogPrefix(), cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), logger) - defer executionPayloadsCollector.Close() - // Indexes collector - rootToSlotCollector := etl.NewCollector(s.LogPrefix(), cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), logger) - defer rootToSlotCollector.Close() - // Lastly finalizations markers collector. - finalizationCollector := etl.NewCollector(s.LogPrefix(), cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), logger) - defer finalizationCollector.Close() - // Start the procedure - logger.Info(fmt.Sprintf("[%s] Reconstructing", s.LogPrefix()), "from", cfg.state.LatestBlockHeader().Slot, "to", destinationSlot) - // Setup slot and block root - cfg.downloader.SetSlotToDownload(currentSlot) - cfg.downloader.SetExpectedRoot(blockRoot) - foundLatestEth1ValidHash := false - if cfg.executionClient == nil { - foundLatestEth1ValidHash = true - } - // Set up onNewBlock callback - cfg.downloader.SetOnNewBlock(func(blk *cltypes.SignedBeaconBlock) (finished bool, err error) { - slot := blk.Block.Slot - blockRoot, err := blk.Block.HashSSZ() - if err != nil { - return false, err - } - key := append(rawdb2.EncodeNumber(slot), blockRoot[:]...) - // Collect beacon blocks - encodedBeaconBlock, err := blk.EncodeSSZ(nil) - if err != nil { - return false, err - } - slotBytes := rawdb2.EncodeNumber(slot) - if err := beaconBlocksCollector.Collect(key, utils.CompressSnappy(encodedBeaconBlock)); err != nil { - return false, err - } - // Collect hashes - if err := rootToSlotCollector.Collect(blockRoot[:], slotBytes); err != nil { - return false, err - } - if err := rootToSlotCollector.Collect(blk.Block.StateRoot[:], slotBytes); err != nil { - return false, err - } - // Mark finalization markers. - if err := finalizationCollector.Collect(slotBytes, blockRoot[:]); err != nil { - return false, err - } - // Collect Execution Payloads - if blk.Version() >= clparams.BellatrixVersion && !foundLatestEth1ValidHash { - payload := blk.Block.Body.ExecutionPayload - if foundLatestEth1ValidHash, err = cfg.executionClient.IsCanonical(payload.BlockHash); err != nil { - return false, err - } - if foundLatestEth1ValidHash { - return slot <= destinationSlot, nil - } - encodedPayload := make([]byte, 0, payload.EncodingSizeSSZ()) - encodedPayload, err = payload.EncodeSSZ(encodedPayload) - if err != nil { - return false, err - } - if err := executionPayloadsCollector.Collect(rawdb2.EncodeNumber(slot), encodedPayload); err != nil { - return false, err - } - } - return slot <= destinationSlot && foundLatestEth1ValidHash, nil - }) - prevProgress := cfg.downloader.Progress() - - logInterval := time.NewTicker(logIntervalTime) - finishCh := make(chan struct{}) - // Start logging thread - go func() { - for { - select { - case <-logInterval.C: - logArgs := []interface{}{} - currProgress := cfg.downloader.Progress() - speed := float64(prevProgress-currProgress) / float64(logIntervalTime/time.Second) - prevProgress = currProgress - peerCount, err := cfg.downloader.Peers() - if err != nil { - return - } - logArgs = append(logArgs, - "progress", currProgress, - "blk/sec", fmt.Sprintf("%.1f", speed), - "peers", peerCount) - if currentSlot > destinationSlot { - logArgs = append(logArgs, "remaining", currProgress-destinationSlot) - } - logger.Info(fmt.Sprintf("[%s] Backwards downloading phase", s.LogPrefix()), logArgs...) - case <-finishCh: - return - case <-ctx.Done(): - - } - } - }() - for !cfg.downloader.Finished() { - cfg.downloader.RequestMore(ctx) - } - close(finishCh) - if err := attestationsCollector.Load(tx, kv.Attestetations, etl.IdentityLoadFunc, etl.TransformArgs{Quit: context.Background().Done()}); err != nil { - return err - } - if err := beaconBlocksCollector.Load(tx, kv.BeaconBlocks, etl.IdentityLoadFunc, etl.TransformArgs{Quit: context.Background().Done()}); err != nil { - return err - } - if err := rootToSlotCollector.Load(tx, kv.RootSlotIndex, etl.IdentityLoadFunc, etl.TransformArgs{Quit: ctx.Done()}); err != nil { - return err - } - if err := finalizationCollector.Load(tx, kv.FinalizedBlockRoots, etl.IdentityLoadFunc, etl.TransformArgs{Quit: ctx.Done()}); err != nil { - return err - } - executionPayloadInsertionBatch := execution_client2.NewInsertBatch(cfg.executionClient) - // Send in ordered manner EL blocks to Execution Layer - if err := executionPayloadsCollector.Load(tx, kv.BeaconBlocks, func(k, v []byte, table etl.CurrentTableReader, next etl.LoadNextFunc) error { - payload := &cltypes.Eth1Block{} - if err := payload.DecodeSSZ(v, int(clparams.BellatrixVersion)); err != nil { - return err - } - if err := executionPayloadInsertionBatch.WriteExecutionPayload(payload); err != nil { - return err - } - return next(k, nil, nil) - }, etl.TransformArgs{Quit: context.Background().Done()}); err != nil { - return err - } - if err := executionPayloadInsertionBatch.Flush(); err != nil { - return err - } - if err := s.Update(tx, 1); err != nil { - return err - } - - if !useExternalTx { - if err = tx.Commit(); err != nil { - return err - } - } - return nil -} diff --git a/cl/phase1/stages/stages.go b/cl/phase1/stages/stages.go deleted file mode 100644 index 8829a57ac..000000000 --- a/cl/phase1/stages/stages.go +++ /dev/null @@ -1,90 +0,0 @@ -package stages - -import ( - "context" - - "github.com/ledgerwatch/erigon/cl/phase1/core/rawdb" - "github.com/ledgerwatch/erigon/cl/phase1/core/state" - "github.com/ledgerwatch/erigon/cl/phase1/execution_client" - "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" - network2 "github.com/ledgerwatch/erigon/cl/phase1/network" - - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/cl/clparams" - "github.com/ledgerwatch/erigon/eth/stagedsync" - "github.com/ledgerwatch/erigon/eth/stagedsync/stages" - "github.com/ledgerwatch/log/v3" -) - -// StateStages are all stages necessary for basic unwind and stage computation, it is primarly used to process side forks and memory execution. -func ConsensusStages(ctx context.Context, historyReconstruction StageHistoryReconstructionCfg, beaconState StageBeaconStateCfg, forkchoice StageForkChoiceCfg) []*stagedsync.Stage { - return []*stagedsync.Stage{ - { - ID: stages.BeaconHistoryReconstruction, - Description: "Download beacon blocks backwards.", - Forward: func(firstCycle bool, badBlockUnwind bool, s *stagedsync.StageState, u stagedsync.Unwinder, tx kv.RwTx, logger log.Logger) error { - return SpawnStageHistoryReconstruction(historyReconstruction, s, tx, ctx, logger) - }, - Unwind: func(firstCycle bool, u *stagedsync.UnwindState, s *stagedsync.StageState, tx kv.RwTx, logger log.Logger) error { - return nil - }, - }, - { - ID: stages.BeaconState, - Description: "Execute Consensus Layer transition", - Forward: func(firstCycle bool, badBlockUnwind bool, s *stagedsync.StageState, u stagedsync.Unwinder, tx kv.RwTx, logger log.Logger) error { - return SpawnStageBeaconState(beaconState, tx, ctx) - }, - Unwind: func(firstCycle bool, u *stagedsync.UnwindState, s *stagedsync.StageState, tx kv.RwTx, logger log.Logger) error { - return nil - }, - }, - { - ID: stages.BeaconBlocks, - Description: "Download beacon blocks forward.", - Forward: func(firstCycle bool, badBlockUnwind bool, s *stagedsync.StageState, u stagedsync.Unwinder, tx kv.RwTx, logger log.Logger) error { - return SpawnStageForkChoice(forkchoice, s, tx, ctx) - }, - Unwind: func(firstCycle bool, u *stagedsync.UnwindState, s *stagedsync.StageState, tx kv.RwTx, logger log.Logger) error { - return nil - }, - }, - } -} - -var ConsensusUnwindOrder = stagedsync.UnwindOrder{ - stages.BeaconState, - stages.BeaconBlocks, -} - -var ConsensusPruneOrder = stagedsync.PruneOrder{ - stages.BeaconState, - stages.BeaconBlocks, -} - -func NewConsensusStagedSync(ctx context.Context, - db kv.RwDB, - forwardDownloader *network2.ForwardBeaconDownloader, - backwardDownloader *network2.BackwardBeaconDownloader, - genesisCfg *clparams.GenesisConfig, - beaconCfg *clparams.BeaconChainConfig, - state *state.CachingBeaconState, - tmpdir string, - executionClient *execution_client.ExecutionClient, - beaconDBCfg *rawdb.BeaconDataConfig, - gossipManager *network2.GossipManager, - forkChoice *forkchoice.ForkChoiceStore, - logger log.Logger, -) (*stagedsync.Sync, error) { - return stagedsync.New( - ConsensusStages( - ctx, - StageHistoryReconstruction(db, backwardDownloader, genesisCfg, beaconCfg, beaconDBCfg, state, tmpdir, executionClient), - StageBeaconState(db, beaconCfg, state, executionClient), - StageForkChoice(db, forwardDownloader, genesisCfg, beaconCfg, state, executionClient, gossipManager, forkChoice, nil), - ), - ConsensusUnwindOrder, - ConsensusPruneOrder, - logger, - ), nil -} diff --git a/cl/phase1/stages/stages_beacon_state.go b/cl/phase1/stages/stages_beacon_state.go deleted file mode 100644 index 638273a5c..000000000 --- a/cl/phase1/stages/stages_beacon_state.go +++ /dev/null @@ -1,98 +0,0 @@ -package stages - -import ( - "context" - "github.com/ledgerwatch/erigon/cl/transition" - - "github.com/ledgerwatch/erigon/cl/phase1/core/rawdb" - state2 "github.com/ledgerwatch/erigon/cl/phase1/core/state" - "github.com/ledgerwatch/erigon/cl/phase1/execution_client" - - libcommon "github.com/ledgerwatch/erigon-lib/common" - "github.com/ledgerwatch/erigon-lib/kv" - "github.com/ledgerwatch/erigon/cl/clparams" - "github.com/ledgerwatch/erigon/cl/utils" - "github.com/ledgerwatch/erigon/eth/stagedsync/stages" - "github.com/ledgerwatch/log/v3" -) - -type StageBeaconStateCfg struct { - db kv.RwDB - beaconCfg *clparams.BeaconChainConfig - state *state2.CachingBeaconState - executionClient *execution_client.ExecutionClient - enabled bool -} - -func StageBeaconState(db kv.RwDB, - beaconCfg *clparams.BeaconChainConfig, state *state2.CachingBeaconState, executionClient *execution_client.ExecutionClient) StageBeaconStateCfg { - return StageBeaconStateCfg{ - db: db, - beaconCfg: beaconCfg, - state: state, - executionClient: executionClient, - enabled: false, - } -} - -// SpawnStageBeaconState is used to replay historical states -func SpawnStageBeaconState(cfg StageBeaconStateCfg, tx kv.RwTx, ctx context.Context) error { - if !cfg.enabled { - return nil - } - // This code need to be fixed. - useExternalTx := tx != nil - var err error - if !useExternalTx { - tx, err = cfg.db.BeginRw(ctx) - if err != nil { - return err - } - defer tx.Rollback() - } - - endSlot, err := stages.GetStageProgress(tx, stages.BeaconBlocks) - if err != nil { - return err - } - latestBlockHeader := cfg.state.LatestBlockHeader() - - fromSlot := latestBlockHeader.Slot - for slot := fromSlot + 1; slot <= endSlot; slot++ { - finalizedRoot, err := rawdb.ReadFinalizedBlockRoot(tx, slot) - if err != nil { - return err - } - // Slot had a missing proposal in this case. - if finalizedRoot == (libcommon.Hash{}) { - continue - } - // TODO(Giulio2002): proper versioning - block, eth1Number, eth1Hash, err := rawdb.ReadBeaconBlock(tx, finalizedRoot, slot, clparams.Phase0Version) - if err != nil { - return err - } - - // Query execution engine only if the payload have an hash. - if eth1Hash != (libcommon.Hash{}) { - if block.Block.Body.ExecutionPayload, err = cfg.executionClient.ReadExecutionPayload(eth1Number, eth1Hash); err != nil { - return err - } - } - // validate fully only in current epoch. - fullValidate := utils.GetCurrentEpoch(cfg.state.GenesisTime(), cfg.beaconCfg.SecondsPerSlot, cfg.beaconCfg.SlotsPerEpoch) == state2.Epoch(cfg.state.BeaconState) - if err := transition.TransitionState(cfg.state, block, fullValidate); err != nil { - log.Info("Found epoch, so stopping now...", "count", slot-(fromSlot+1), "slot", slot) - return err - } - log.Info("Applied state transition", "from", slot, "to", slot+1) - } - - log.Info("[CachingBeaconState] Finished transitioning state", "from", fromSlot, "to", endSlot) - if !useExternalTx { - if err = tx.Commit(); err != nil { - return err - } - } - return nil -} diff --git a/cl/rpc/rpc.go b/cl/rpc/rpc.go index a2ff32f75..09c6df3b1 100644 --- a/cl/rpc/rpc.go +++ b/cl/rpc/rpc.go @@ -55,7 +55,7 @@ func (b *BeaconRpcP2P) sendBlocksRequest(ctx context.Context, topic string, reqD // Prepare output slice. responsePacket := []*cltypes.SignedBeaconBlock{} - ctx, cn := context.WithTimeout(ctx, time.Second*time.Duration(5+10*count)) + ctx, cn := context.WithTimeout(ctx, time.Second*time.Duration(16+30*count)) defer cn() message, err := b.sentinel.SendRequest(ctx, &sentinel.RequestData{ Data: reqData, @@ -84,7 +84,7 @@ func (b *BeaconRpcP2P) sendBlocksRequest(ctx context.Context, topic string, reqD // Read varint for length of message. encodedLn, _, err := ssz_snappy.ReadUvarint(r) if err != nil { - return nil, message.Peer.Pid, fmt.Errorf("unable to read varint from message prefix: %v", err) + return nil, message.Peer.Pid, fmt.Errorf("unable to read varint from message prefix: %w", err) } // Sanity check for message size. if encodedLn > uint64(maxMessageLength) { diff --git a/cl/sentinel/peers/peer.go b/cl/sentinel/peers/peer.go index ad616dd40..026b319a9 100644 --- a/cl/sentinel/peers/peer.go +++ b/cl/sentinel/peers/peer.go @@ -11,6 +11,11 @@ import ( const USERAGENT_UNKNOWN = "unknown" +type PeeredObject[T any] struct { + Peer string + Data T +} + // Record Peer data. type Peer struct { penalties int @@ -54,14 +59,14 @@ func (p *Peer) UserAgent() string { } func (p *Peer) Penalize() { - log.Debug("[Sentinel Peers] peer penalized", "peer-id", p.pid) + log.Trace("[Sentinel Peers] peer penalized", "peer-id", p.pid) p.do(func(p *Peer) { p.penalties++ }) } func (p *Peer) Forgive() { - log.Debug("[Sentinel Peers] peer forgiven", "peer-id", p.pid) + log.Trace("[Sentinel Peers] peer forgiven", "peer-id", p.pid) p.do(func(p *Peer) { if p.penalties > 0 { p.penalties-- @@ -74,14 +79,14 @@ func (p *Peer) MarkUsed() { p.useCount++ p.lastRequest = time.Now() }) - log.Debug("[Sentinel Peers] peer used", "peer-id", p.pid, "uses", p.useCount) + log.Trace("[Sentinel Peers] peer used", "peer-id", p.pid, "uses", p.useCount) } func (p *Peer) MarkReplied() { p.do(func(p *Peer) { p.successCount++ }) - log.Debug("[Sentinel Peers] peer replied", "peer-id", p.pid, "uses", p.useCount, "success", p.successCount) + log.Trace("[Sentinel Peers] peer replied", "peer-id", p.pid, "uses", p.useCount, "success", p.successCount) } func (p *Peer) IsAvailable() (available bool) { @@ -130,7 +135,7 @@ func anySetInString(set []string, in string) bool { func (p *Peer) Disconnect(reason ...string) { rzn := strings.Join(reason, " ") if !anySetInString(skipReasons, rzn) { - log.Debug("[Sentinel Peers] disconnecting from peer", "peer-id", p.pid, "reason", strings.Join(reason, " ")) + log.Trace("[Sentinel Peers] disconnecting from peer", "peer-id", p.pid, "reason", strings.Join(reason, " ")) } p.m.host.Peerstore().RemovePeer(p.pid) p.m.host.Network().ClosePeer(p.pid) diff --git a/cl/transition/impl/eth2/operations.go b/cl/transition/impl/eth2/operations.go index 673590a22..bbcd8d7d4 100644 --- a/cl/transition/impl/eth2/operations.go +++ b/cl/transition/impl/eth2/operations.go @@ -4,9 +4,9 @@ import ( "bytes" "errors" "fmt" - "github.com/ledgerwatch/erigon/cl/abstract" "reflect" - "time" + + "github.com/ledgerwatch/erigon/cl/abstract" "github.com/ledgerwatch/erigon/cl/transition/impl/eth2/statechange" "github.com/ledgerwatch/erigon/metrics/methelp" @@ -859,11 +859,11 @@ func (I *impl) ProcessSlots(s abstract.BeaconState, slot uint64) error { } // TODO(Someone): Add epoch transition. if (sSlot+1)%beaconConfig.SlotsPerEpoch == 0 { - start := time.Now() + // start := time.Now() if err := statechange.ProcessEpoch(s); err != nil { return err } - log.Debug("Processed new epoch successfully", "epoch", state.Epoch(s), "process_epoch_elpsed", time.Since(start)) + //log.Debug("Processed new epoch successfully", "epoch", state.Epoch(s), "process_epoch_elpsed", time.Since(start)) } // TODO: add logic to process epoch updates. sSlot += 1 diff --git a/cl/utils/time.go b/cl/utils/time.go index 1d129568f..b9ed288f7 100644 --- a/cl/utils/time.go +++ b/cl/utils/time.go @@ -15,6 +15,12 @@ package utils import "time" +// compute time of slot. +func GetSlotTime(genesisTime uint64, secondsPerSlot uint64, slot uint64) time.Time { + slotTime := genesisTime + secondsPerSlot*slot + return time.Unix(int64(slotTime), 0) +} + // compute current slot. func GetCurrentSlot(genesisTime uint64, secondsPerSlot uint64) uint64 { now := uint64(time.Now().Unix()) diff --git a/cmd/caplin-phase1/caplin1/run.go b/cmd/caplin-phase1/caplin1/run.go index b867c25b2..829812065 100644 --- a/cmd/caplin-phase1/caplin1/run.go +++ b/cmd/caplin-phase1/caplin1/run.go @@ -2,28 +2,30 @@ package caplin1 import ( "context" + "time" "github.com/ledgerwatch/erigon/cl/cltypes/solid" "github.com/ledgerwatch/erigon/cl/freezer" "github.com/ledgerwatch/erigon/cl/phase1/core/state" "github.com/ledgerwatch/erigon/cl/phase1/execution_client" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" - network2 "github.com/ledgerwatch/erigon/cl/phase1/network" + "github.com/ledgerwatch/erigon/cl/phase1/network" "github.com/ledgerwatch/erigon/cl/phase1/stages" + "github.com/spf13/afero" "github.com/Giulio2002/bls" "github.com/ledgerwatch/erigon-lib/gointerfaces/sentinel" "github.com/ledgerwatch/erigon/cl/clparams" "github.com/ledgerwatch/erigon/cl/rpc" "github.com/ledgerwatch/log/v3" - - "github.com/ledgerwatch/erigon/eth/stagedsync" ) func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, beaconConfig *clparams.BeaconChainConfig, genesisConfig *clparams.GenesisConfig, - engine execution_client.ExecutionEngine, state *state.CachingBeaconState, caplinFreezer freezer.Freezer) error { + engine execution_client.ExecutionEngine, state *state.CachingBeaconState, + caplinFreezer freezer.Freezer, datadir string) error { beaconRpc := rpc.NewBeaconRpcP2P(ctx, sentinel, beaconConfig, genesisConfig) - downloader := network2.NewForwardBeaconDownloader(ctx, beaconRpc) + + logger := log.New("app", "caplin") if caplinFreezer != nil { if err := freezer.PutObjectSSZIntoFreezer("beaconState", "caplin_core", 0, state, caplinFreezer); err != nil { @@ -32,7 +34,7 @@ func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, beac } forkChoice, err := forkchoice.NewForkChoiceStore(state, engine, caplinFreezer, true) if err != nil { - log.Error("Could not create forkchoice", "err", err) + logger.Error("Could not create forkchoice", "err", err) return err } bls.SetEnabledCaching(true) @@ -43,6 +45,53 @@ func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, beac } return true }) - gossipManager := network2.NewGossipReceiver(ctx, sentinel, forkChoice, beaconConfig, genesisConfig, caplinFreezer) - return stages.SpawnStageForkChoice(stages.StageForkChoice(nil, downloader, genesisConfig, beaconConfig, state, nil, gossipManager, forkChoice, caplinFreezer), &stagedsync.StageState{ID: "Caplin"}, nil, ctx) + gossipManager := network.NewGossipReceiver(ctx, sentinel, forkChoice, beaconConfig, genesisConfig, caplinFreezer) + dataDirFs := afero.NewBasePathFs(afero.NewOsFs(), datadir) + + { // start the gossip manager + go gossipManager.Start() + logger.Info("Started Ethereum 2.0 Gossip Service") + } + + { // start logging peers + go func() { + logIntervalPeers := time.NewTicker(1 * time.Minute) + for { + select { + case <-logIntervalPeers.C: + if peerCount, err := beaconRpc.Peers(); err == nil { + logger.Info("P2P", "peers", peerCount) + } + case <-ctx.Done(): + return + } + } + }() + } + + { // start ticking forkChoice + go func() { + tickInterval := time.NewTicker(50 * time.Millisecond) + for { + select { + case <-tickInterval.C: + forkChoice.OnTick(uint64(time.Now().Unix())) + case <-ctx.Done(): + return + } + } + }() + } + + // start the downloader service + //go initDownloader(beaconRpc, genesisConfig, beaconConfig, state, nil, gossipManager, forkChoice, caplinFreezer, dataDirFs) + + //forkChoiceConfig := stages.CaplinStagedSync(nil, beaconRpc, genesisConfig, beaconConfig, state, nil, gossipManager, forkChoice, caplinFreezer, dataDirFs) + stageCfg := stages.ClStagesCfg(beaconRpc, genesisConfig, beaconConfig, state, nil, gossipManager, forkChoice, dataDirFs) + sync := stages.ConsensusClStages(ctx, stageCfg) + err = sync.StartWithStage(ctx, "WaitForPeers", logger, stageCfg) + if err != nil { + return err + } + return err } diff --git a/cmd/caplin-phase1/main.go b/cmd/caplin-phase1/main.go index 562de8c62..914e63a39 100644 --- a/cmd/caplin-phase1/main.go +++ b/cmd/caplin-phase1/main.go @@ -133,5 +133,5 @@ func runCaplinNode(cliCtx *cli.Context) error { } } - return caplin1.RunCaplinPhase1(ctx, sentinel, cfg.BeaconCfg, cfg.GenesisCfg, executionEngine, state, caplinFreezer) + return caplin1.RunCaplinPhase1(ctx, sentinel, cfg.BeaconCfg, cfg.GenesisCfg, executionEngine, state, caplinFreezer, cfg.DataDir) } diff --git a/cmd/sentinel/cli/cliSettings.go b/cmd/sentinel/cli/cliSettings.go index a10c50f16..667769752 100644 --- a/cmd/sentinel/cli/cliSettings.go +++ b/cmd/sentinel/cli/cliSettings.go @@ -28,25 +28,26 @@ type ConsensusClientCliCfg struct { BeaconDataCfg *rawdb.BeaconDataConfig Port uint `json:"port"` Addr string `json:"address"` - ServerAddr string `json:"serverAddr"` - ServerProtocol string `json:"serverProtocol"` - ServerTcpPort uint `json:"serverTcpPort"` - LogLvl uint `json:"logLevel"` - NoDiscovery bool `json:"noDiscovery"` - LocalDiscovery bool `json:"localDiscovery"` - CheckpointUri string `json:"checkpointUri"` + ServerAddr string `json:"server_addr"` + ServerProtocol string `json:"server_protocol"` + ServerTcpPort uint `json:"server_tcp_port"` + LogLvl uint `json:"log_level"` + NoDiscovery bool `json:"no_discovery"` + LocalDiscovery bool `json:"local_discovery"` + CheckpointUri string `json:"checkpoint_uri"` Chaindata string `json:"chaindata"` - ErigonPrivateApi string `json:"erigonPrivateApi"` - TransitionChain bool `json:"transitionChain"` + ErigonPrivateApi string `json:"erigon_private_api"` + TransitionChain bool `json:"transition_chain"` NetworkType clparams.NetworkType - InitialSync bool `json:"initialSync"` - NoBeaconApi bool `json:"noBeaconApi"` - BeaconApiReadTimeout time.Duration `json:"beaconApiReadTimeout"` - BeaconApiWriteTimeout time.Duration `json:"beaconApiWriteTimeout"` - BeaconAddr string `json:"beaconAddr"` - BeaconProtocol string `json:"beaconProtocol"` - RecordMode bool `json:"recordMode"` - RecordDir string `json:"recordDir"` + InitialSync bool `json:"initial_sync"` + NoBeaconApi bool `json:"no_beacon_api"` + BeaconApiReadTimeout time.Duration `json:"beacon_api_read_timeout"` + BeaconApiWriteTimeout time.Duration `json:"beacon_api_write_timeout"` + BeaconAddr string `json:"beacon_addr"` + BeaconProtocol string `json:"beacon_protocol"` + RecordMode bool `json:"record_mode"` + RecordDir string `json:"record_dir"` + DataDir string `json:"data_dir"` RunEngineAPI bool `json:"run_engine_api"` EngineAPIAddr string `json:"engine_api_addr"` EngineAPIPort int `json:"engine_api_port"` @@ -95,6 +96,7 @@ func SetupConsensusClientCfg(ctx *cli.Context) (*ConsensusClientCliCfg, error) { cfg.BeaconProtocol = "tcp" cfg.RecordMode = ctx.Bool(flags.RecordModeFlag.Name) cfg.RecordDir = ctx.String(flags.RecordModeDir.Name) + cfg.DataDir = ctx.String(utils.DataDirFlag.Name) cfg.RunEngineAPI = ctx.Bool(flags.RunEngineAPI.Name) cfg.EngineAPIAddr = ctx.String(flags.EngineApiHostFlag.Name) diff --git a/cmd/sentinel/cli/flags/defaultFlags.go b/cmd/sentinel/cli/flags/defaultFlags.go index 2c20f457c..a3b2560cd 100644 --- a/cmd/sentinel/cli/flags/defaultFlags.go +++ b/cmd/sentinel/cli/flags/defaultFlags.go @@ -1,6 +1,9 @@ package flags -import "github.com/urfave/cli/v2" +import ( + "github.com/ledgerwatch/erigon/cmd/utils" + "github.com/urfave/cli/v2" +) var CLDefaultFlags = []cli.Flag{ &SentinelDiscoveryPort, @@ -30,4 +33,5 @@ var CLDefaultFlags = []cli.Flag{ &EngineApiHostFlag, &EngineApiPortFlag, &JwtSecret, + &utils.DataDirFlag, } diff --git a/cmd/sentinel/sentinel/peers/peer.go b/cmd/sentinel/sentinel/peers/peer.go index d6ec928b4..854b93a07 100644 --- a/cmd/sentinel/sentinel/peers/peer.go +++ b/cmd/sentinel/sentinel/peers/peer.go @@ -32,12 +32,12 @@ func (p *Peer) ID() peer.ID { return p.pid } func (p *Peer) Penalize() { - log.Debug("[Sentinel Peers] peer penalized", "peer-id", p.pid) + log.Trace("[Sentinel Peers] peer penalized", "peer-id", p.pid) p.Penalties++ } func (p *Peer) Forgive() { - log.Debug("[Sentinel Peers] peer forgiven", "peer-id", p.pid) + log.Trace("[Sentinel Peers] peer forgiven", "peer-id", p.pid) if p.Penalties > 0 { p.Penalties-- } @@ -51,7 +51,7 @@ func (p *Peer) MarkUsed() { func (p *Peer) MarkReplied() { p.successCount++ - log.Debug("[Sentinel Peers] peer replied", "peer-id", p.pid, "uses", p.useCount, "success", p.successCount) + log.Trace("[Sentinel Peers] peer replied", "peer-id", p.pid, "uses", p.useCount, "success", p.successCount) } func (p *Peer) IsAvailable() (available bool) { @@ -96,14 +96,14 @@ func anySetInString(set []string, in string) bool { func (p *Peer) Disconnect(reason ...string) { rzn := strings.Join(reason, " ") if !anySetInString(skipReasons, rzn) { - log.Debug("[Sentinel Peers] disconnecting from peer", "peer-id", p.pid, "reason", strings.Join(reason, " ")) + log.Trace("[Sentinel Peers] disconnecting from peer", "peer-id", p.pid, "reason", strings.Join(reason, " ")) } p.m.host.Peerstore().RemovePeer(p.pid) p.m.host.Network().ClosePeer(p.pid) p.Penalties = 0 } func (p *Peer) Ban(reason ...string) { - log.Debug("[Sentinel Peers] bad peers has been banned", "peer-id", p.pid, "reason", strings.Join(reason, " ")) + log.Trace("[Sentinel Peers] bad peers has been banned", "peer-id", p.pid, "reason", strings.Join(reason, " ")) p.Banned = true p.Disconnect(reason...) return diff --git a/eth/backend.go b/eth/backend.go index 783433db8..20687c48d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -738,7 +738,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere return nil, err } - go caplin1.RunCaplinPhase1(ctx, client, beaconCfg, genesisCfg, engine, state, nil) + go caplin1.RunCaplinPhase1(ctx, client, beaconCfg, genesisCfg, engine, state, nil, dirs.DataDir) } return backend, nil diff --git a/rpc/websocket_bench_test.go b/rpc/websocket_bench_test.go new file mode 100644 index 000000000..cf13b0c43 --- /dev/null +++ b/rpc/websocket_bench_test.go @@ -0,0 +1,63 @@ +package rpc + +import ( + "context" + "net/http/httptest" + "strings" + "testing" + + "github.com/ledgerwatch/log/v3" +) + +// This test checks whether calls exceeding the request size limit are rejected. +func BenchmarkWebsocketEmptyCall(b *testing.B) { + logger := log.New() + + var ( + srv = newTestServer(logger) + httpsrv = httptest.NewServer(srv.WebsocketHandler([]string{"*"}, nil, false, logger)) + wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") + ) + defer srv.Stop() + defer httpsrv.Close() + + client, err := DialWebsocket(context.Background(), wsURL, "", logger) + if err != nil { + panic(err) + } + defer client.Close() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := client.Call(nil, "test_ping"); err != nil { + panic(err) + } + } +} +func BenchmarkWebsocket16kb(b *testing.B) { + logger := log.New() + + var ( + srv = newTestServer(logger) + httpsrv = httptest.NewServer(srv.WebsocketHandler([]string{"*"}, nil, false, logger)) + wsURL = "ws:" + strings.TrimPrefix(httpsrv.URL, "http:") + ) + defer srv.Stop() + defer httpsrv.Close() + + client, err := DialWebsocket(context.Background(), wsURL, "", logger) + if err != nil { + panic(err) + } + defer client.Close() + + payload16kb := strings.Repeat("x", 4096*4) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err := client.Call(nil, "test_echo", payload16kb, 5, nil) + if err != nil { + panic(err) + } + } +} diff --git a/turbo/app/make_app.go b/turbo/app/make_app.go index 3f0de4974..de25969d4 100644 --- a/turbo/app/make_app.go +++ b/turbo/app/make_app.go @@ -44,6 +44,7 @@ func MakeApp(name string, action cli.ActionFunc, cliFlags []cli.Flag) *cli.App { app.Flags = append(cliFlags, debug.Flags...) // debug flags are required app.Flags = append(app.Flags, utils.MetricFlags...) app.Flags = append(app.Flags, logging.Flags...) + app.After = func(ctx *cli.Context) error { debug.Exit() return nil