diff --git a/README.md b/README.md index 5f6dafdce..267857940 100644 --- a/README.md +++ b/README.md @@ -560,6 +560,14 @@ node. If you are using `--internalcl` aka `caplin` as your consensus client, then also look at the chart above +#### `beaconAPI` ports + +| Component | Port | Protocol | Purpose | Should Expose | +|-----------|------|----------|------------------|---------------| +| REST | 5555 | TCP | REST | Public | + +If you are using `--internalcl` aka `caplin` as your consensus client and `--beacon.api` then also look at the chart above + #### `shared` ports | Component | Port | Protocol | Purpose | Should Expose | diff --git a/cl/antiquary/antiquary.go b/cl/antiquary/antiquary.go index e6d6e23b9..89bfc1bcf 100644 --- a/cl/antiquary/antiquary.go +++ b/cl/antiquary/antiquary.go @@ -26,7 +26,6 @@ type Antiquary struct { downloader proto_downloader.DownloaderClient logger log.Logger sn *freezeblocks.CaplinSnapshots - reader freezeblocks.BeaconSnapshotReader ctx context.Context beaconDB persistence.BlockSource backfilled *atomic.Bool @@ -43,7 +42,6 @@ func NewAntiquary(ctx context.Context, cfg *clparams.BeaconChainConfig, dirs dat logger: logger, sn: sn, beaconDB: beaconDB, - reader: reader, ctx: ctx, backfilled: backfilled, cfg: cfg, @@ -92,11 +90,14 @@ func (a *Antiquary) Loop() error { return err } logInterval := time.NewTicker(30 * time.Second) + if err := a.sn.ReopenFolder(); err != nil { + return err + } defer logInterval.Stop() // Now write the snapshots as indicies - for i := from; i < a.reader.FrozenSlots(); i++ { + for i := from; i < a.sn.BlocksAvailable(); i++ { // read the snapshot - header, elBlockNumber, elBlockHash, err := a.reader.ReadHeader(i) + header, elBlockNumber, elBlockHash, err := a.sn.ReadHeader(i) if err != nil { return err } @@ -118,12 +119,13 @@ func (a *Antiquary) Loop() error { } select { case <-logInterval.C: - log.Info("[Antiquary]: Processed snapshots", "progress", i, "target", a.reader.FrozenSlots()) + log.Info("[Antiquary]: Processed snapshots", "progress", i, "target", a.sn.BlocksAvailable()) case <-a.ctx.Done(): default: } } - frozenSlots := a.reader.FrozenSlots() + + frozenSlots := a.sn.BlocksAvailable() if frozenSlots != 0 { if err := a.beaconDB.PurgeRange(a.ctx, tx, 0, frozenSlots); err != nil { return err diff --git a/cl/beacon/beacon_router_configuration/cfg.go b/cl/beacon/beacon_router_configuration/cfg.go new file mode 100644 index 000000000..1a3307b0a --- /dev/null +++ b/cl/beacon/beacon_router_configuration/cfg.go @@ -0,0 +1,14 @@ +package beacon_router_configuration + +import "time" + +// TODO(enriavil1): Make this configurable via flags +type RouterConfiguration struct { + Active bool + Protocol string + Address string + + ReadTimeTimeout time.Duration + IdleTimeout time.Duration + WriteTimeout time.Duration +} diff --git a/cl/beacon/handler/blocks.go b/cl/beacon/handler/blocks.go index 46d5577ec..b04a7a5f1 100644 --- a/cl/beacon/handler/blocks.go +++ b/cl/beacon/handler/blocks.go @@ -3,8 +3,6 @@ package handler import ( "context" "fmt" - "github.com/ledgerwatch/erigon/cl/sentinel/communication/ssz_snappy" - "io" "net/http" libcommon "github.com/ledgerwatch/erigon-lib/common" @@ -67,7 +65,6 @@ func (a *ApiHandler) getBlock(r *http.Request) (data any, finalized *bool, versi blockId *segmentID root libcommon.Hash blkHeader *cltypes.SignedBeaconBlockHeader - blockReader io.ReadCloser isCanonical bool ) @@ -99,17 +96,13 @@ func (a *ApiHandler) getBlock(r *http.Request) (data any, finalized *bool, versi return } - blockReader, err = a.blockSource.BlockReader(ctx, blkHeader.Header.Slot, root) + blk, err := a.blockReader.ReadBlockByRoot(ctx, tx, root) if err != nil { return } - defer blockReader.Close() - blk := cltypes.NewSignedBeaconBlock(a.beaconChainCfg) + // Pack the response version = new(clparams.StateVersion) *version = a.beaconChainCfg.GetCurrentStateVersion(blkHeader.Header.Slot / a.beaconChainCfg.SlotsPerEpoch) - if err = ssz_snappy.DecodeAndReadNoForkDigest(blockReader, blk, *version); err != nil { - return - } data = blk finalized = new(bool) httpStatus = http.StatusAccepted @@ -123,7 +116,6 @@ func (a *ApiHandler) getBlockAttestations(r *http.Request) (data any, finalized blockId *segmentID root libcommon.Hash blkHeader *cltypes.SignedBeaconBlockHeader - blockReader io.ReadCloser isCanonical bool ) @@ -155,18 +147,12 @@ func (a *ApiHandler) getBlockAttestations(r *http.Request) (data any, finalized return } - blockReader, err = a.blockSource.BlockReader(ctx, blkHeader.Header.Slot, root) + blk, err := a.blockReader.ReadBlockByRoot(ctx, tx, root) if err != nil { return } - defer blockReader.Close() - blk := cltypes.NewSignedBeaconBlock(a.beaconChainCfg) version = new(clparams.StateVersion) *version = a.beaconChainCfg.GetCurrentStateVersion(blkHeader.Header.Slot / a.beaconChainCfg.SlotsPerEpoch) - if err = ssz_snappy.DecodeAndReadNoForkDigest(blockReader, blk, *version); err != nil { - return - } - data = blk.Block.Body.Attestations finalized = new(bool) httpStatus = http.StatusAccepted diff --git a/cl/beacon/handler/handler.go b/cl/beacon/handler/handler.go index 7b9ec2992..103cbeeff 100644 --- a/cl/beacon/handler/handler.go +++ b/cl/beacon/handler/handler.go @@ -10,13 +10,14 @@ import ( "github.com/ledgerwatch/erigon/cl/persistence" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" "github.com/ledgerwatch/erigon/cl/pool" + "github.com/ledgerwatch/erigon/turbo/snapshotsync/freezeblocks" ) type ApiHandler struct { o sync.Once mux chi.Router - blockSource persistence.RawBeaconBlockChain + blockReader freezeblocks.BeaconSnapshotReader indiciesDB kv.RoDB genesisCfg *clparams.GenesisConfig beaconChainCfg *clparams.BeaconChainConfig @@ -24,8 +25,8 @@ type ApiHandler struct { operationsPool pool.OperationsPool } -func NewApiHandler(genesisConfig *clparams.GenesisConfig, beaconChainConfig *clparams.BeaconChainConfig, source persistence.RawBeaconBlockChain, indiciesDB kv.RoDB, forkchoiceStore forkchoice.ForkChoiceStorage, operationsPool pool.OperationsPool) *ApiHandler { - return &ApiHandler{o: sync.Once{}, genesisCfg: genesisConfig, beaconChainCfg: beaconChainConfig, indiciesDB: indiciesDB, blockSource: source, forkchoiceStore: forkchoiceStore, operationsPool: operationsPool} +func NewApiHandler(genesisConfig *clparams.GenesisConfig, beaconChainConfig *clparams.BeaconChainConfig, source persistence.RawBeaconBlockChain, indiciesDB kv.RoDB, forkchoiceStore forkchoice.ForkChoiceStorage, operationsPool pool.OperationsPool, rcsn freezeblocks.BeaconSnapshotReader) *ApiHandler { + return &ApiHandler{o: sync.Once{}, genesisCfg: genesisConfig, beaconChainCfg: beaconChainConfig, indiciesDB: indiciesDB, forkchoiceStore: forkchoiceStore, operationsPool: operationsPool, blockReader: rcsn} } func (a *ApiHandler) init() { diff --git a/cl/beacon/router.go b/cl/beacon/router.go index 7514681f0..089c58c12 100644 --- a/cl/beacon/router.go +++ b/cl/beacon/router.go @@ -1,27 +1,18 @@ package beacon import ( + "fmt" "net" "net/http" - "time" + "github.com/ledgerwatch/erigon/cl/beacon/beacon_router_configuration" "github.com/ledgerwatch/erigon/cl/beacon/handler" "github.com/ledgerwatch/log/v3" ) -// TODO(enriavil1): Make this configurable via flags -type RouterConfiguration struct { - Active bool - Protocol string - Address string - - ReadTimeTimeout time.Duration - IdleTimeout time.Duration - WriteTimeout time.Duration -} - -func ListenAndServe(api *handler.ApiHandler, routerCfg *RouterConfiguration) { +func ListenAndServe(api *handler.ApiHandler, routerCfg beacon_router_configuration.RouterConfiguration) { listener, err := net.Listen(routerCfg.Protocol, routerCfg.Address) + fmt.Println(routerCfg.Address, routerCfg.Protocol) server := &http.Server{ Handler: newBeaconMiddleware(api), ReadTimeout: routerCfg.ReadTimeTimeout, diff --git a/cl/persistence/format/snapshot_format/blocks.go b/cl/persistence/format/snapshot_format/blocks.go index 1427f71c7..f392bee27 100644 --- a/cl/persistence/format/snapshot_format/blocks.go +++ b/cl/persistence/format/snapshot_format/blocks.go @@ -14,15 +14,15 @@ import ( "github.com/ledgerwatch/erigon/cl/persistence/format/chunk_encoding" ) -var buffersPool = sync.Pool{ - New: func() interface{} { return &bytes.Buffer{} }, -} - type ExecutionBlockReaderByNumber interface { TransactionsSSZ(w io.Writer, number uint64, hash libcommon.Hash) error WithdrawalsSZZ(w io.Writer, number uint64, hash libcommon.Hash) error } +var buffersPool = sync.Pool{ + New: func() interface{} { return &bytes.Buffer{} }, +} + const ( blockBaseOffset = 100 /* Signature + Block Offset */ + 84 /* Slot + ProposerIndex + ParentRoot + StateRoot + Body Offset */ + @@ -141,7 +141,7 @@ func ReadBlockFromSnapshot(r io.Reader, executionReader ExecutionBlockReaderByNu } // ReadBlockHeaderFromSnapshotWithExecutionData reads the beacon block header and the EL block number and block hash. -func ReadBlockHeaderFromSnapshotWithExecutionData(r io.Reader, cfg *clparams.BeaconChainConfig) (*cltypes.SignedBeaconBlockHeader, uint64, libcommon.Hash, error) { +func ReadBlockHeaderFromSnapshotWithExecutionData(r io.Reader) (*cltypes.SignedBeaconBlockHeader, uint64, libcommon.Hash, error) { buffer := buffersPool.Get().(*bytes.Buffer) defer buffersPool.Put(buffer) buffer.Reset() diff --git a/cl/persistence/format/snapshot_format/blocks_test.go b/cl/persistence/format/snapshot_format/blocks_test.go index 8c357fd4b..b5d0815fc 100644 --- a/cl/persistence/format/snapshot_format/blocks_test.go +++ b/cl/persistence/format/snapshot_format/blocks_test.go @@ -70,7 +70,7 @@ func TestBlockSnapshotEncoding(t *testing.T) { b.Reset() _, err = snapshot_format.WriteBlockForSnapshot(&b, blk, nil) require.NoError(t, err) - header, bn, bHash, err := snapshot_format.ReadBlockHeaderFromSnapshotWithExecutionData(&b, &clparams.MainnetBeaconConfig) + header, bn, bHash, err := snapshot_format.ReadBlockHeaderFromSnapshotWithExecutionData(&b) require.NoError(t, err) hash3, err := header.HashSSZ() require.NoError(t, err) diff --git a/cl/persistence/format/snapshot_format/getters/execution_snapshot.go b/cl/persistence/format/snapshot_format/getters/execution_snapshot.go new file mode 100644 index 000000000..d201a63ea --- /dev/null +++ b/cl/persistence/format/snapshot_format/getters/execution_snapshot.go @@ -0,0 +1,166 @@ +package getters + +import ( + "context" + "encoding/binary" + "fmt" + "io" + + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/types/ssz" + "github.com/ledgerwatch/erigon/cl/phase1/core/state/lru" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/turbo/services" +) + +type cacheEntry struct { + number uint64 + hash libcommon.Hash +} +type ExecutionSnapshotReader struct { + ctx context.Context + + blockReader services.FullBlockReader + + db kv.RoDB + txsCache *lru.Cache[cacheEntry, []byte] + withdrawalsCache *lru.Cache[cacheEntry, []byte] +} + +func NewExecutionSnapshotReader(ctx context.Context, blockReader services.FullBlockReader, db kv.RoDB) *ExecutionSnapshotReader { + txsCache, err := lru.New[cacheEntry, []byte]("txsCache", 96) + if err != nil { + panic(err) + } + withdrawalsCache, err := lru.New[cacheEntry, []byte]("wsCache", 96) + if err != nil { + panic(err) + } + return &ExecutionSnapshotReader{ctx: ctx, blockReader: blockReader, withdrawalsCache: withdrawalsCache, txsCache: txsCache, db: db} +} + +func (r *ExecutionSnapshotReader) TransactionsSSZ(w io.Writer, number uint64, hash libcommon.Hash) error { + ok, err := r.lookupTransactionsInCache(w, number, hash) + if err != nil { + return err + } + if ok { + return nil + } + + tx, err := r.db.BeginRo(r.ctx) + if err != nil { + return err + } + defer tx.Rollback() + // Get the body and fill both caches + body, err := r.blockReader.BodyWithTransactions(r.ctx, tx, hash, number) + if err != nil { + return err + } + if body == nil { + return fmt.Errorf("transactions not found for block %d", number) + } + // compute txs flats + txs, err := types.MarshalTransactionsBinary(body.Transactions) + if err != nil { + return err + } + flattenedTxs := convertTxsToBytesSSZ(txs) + r.txsCache.Add(cacheEntry{number: number, hash: hash}, flattenedTxs) + // compute withdrawals flat + ws := body.Withdrawals + flattenedWs := convertWithdrawalsToBytesSSZ(ws) + + r.withdrawalsCache.Add(cacheEntry{number: number, hash: hash}, flattenedWs) + _, err = w.Write(flattenedTxs) + return err +} + +func convertTxsToBytesSSZ(txs [][]byte) []byte { + sumLenTxs := 0 + for _, tx := range txs { + sumLenTxs += len(tx) + } + flat := make([]byte, 0, 4*len(txs)+sumLenTxs) + offset := len(txs) * 4 + for _, tx := range txs { + flat = append(flat, ssz.OffsetSSZ(uint32(offset))...) + offset += len(tx) + } + for _, tx := range txs { + flat = append(flat, tx...) + } + return flat +} + +func convertWithdrawalsToBytesSSZ(ws []*types.Withdrawal) []byte { + ret := make([]byte, 44*len(ws)) + for i, w := range ws { + currentPos := i * 44 + binary.LittleEndian.PutUint64(ret[currentPos:currentPos+8], w.Index) + binary.LittleEndian.PutUint64(ret[currentPos+8:currentPos+16], w.Validator) + copy(ret[currentPos+16:currentPos+36], w.Address[:]) + binary.LittleEndian.PutUint64(ret[currentPos+36:currentPos+44], w.Amount) + } + return ret +} + +func (r *ExecutionSnapshotReader) WithdrawalsSZZ(w io.Writer, number uint64, hash libcommon.Hash) error { + ok, err := r.lookupWithdrawalsInCache(w, number, hash) + if err != nil { + return err + } + if ok { + return nil + } + tx, err := r.db.BeginRo(r.ctx) + if err != nil { + return err + } + defer tx.Rollback() + // Get the body and fill both caches + body, err := r.blockReader.BodyWithTransactions(r.ctx, tx, hash, number) + if err != nil { + return err + } + if body == nil { + return fmt.Errorf("transactions not found for block %d", number) + } + // compute txs flats + txs, err := types.MarshalTransactionsBinary(body.Transactions) + if err != nil { + return err + } + flattenedTxs := convertTxsToBytesSSZ(txs) + r.txsCache.Add(cacheEntry{number: number, hash: hash}, flattenedTxs) + // compute withdrawals flat + ws := body.Withdrawals + flattenedWs := convertWithdrawalsToBytesSSZ(ws) + + r.withdrawalsCache.Add(cacheEntry{number: number, hash: hash}, flattenedWs) + _, err = w.Write(flattenedWs) + + return err +} + +func (r *ExecutionSnapshotReader) lookupWithdrawalsInCache(w io.Writer, number uint64, hash libcommon.Hash) (bool, error) { + var wsBytes []byte + var ok bool + if wsBytes, ok = r.withdrawalsCache.Get(cacheEntry{number: number, hash: hash}); !ok { + return false, nil + } + _, err := w.Write(wsBytes) + return true, err +} + +func (r *ExecutionSnapshotReader) lookupTransactionsInCache(w io.Writer, number uint64, hash libcommon.Hash) (bool, error) { + var wsBytes []byte + var ok bool + if wsBytes, ok = r.txsCache.Get(cacheEntry{number: number, hash: hash}); !ok { + return false, nil + } + _, err := w.Write(wsBytes) + return true, err +} diff --git a/cmd/capcli/cli.go b/cmd/capcli/cli.go index bc7c27ebd..8807a9dd4 100644 --- a/cmd/capcli/cli.go +++ b/cmd/capcli/cli.go @@ -504,7 +504,7 @@ func (c *CheckSnapshots) Run(ctx *Context) error { } br := &snapshot_format.MockBlockReader{} - snReader := freezeblocks.NewBeaconSnapshotReader(csn, br, beaconConfig) + snReader := freezeblocks.NewBeaconSnapshotReader(csn, br, beaconDB, beaconConfig) for i := c.Slot; i < to; i++ { // Read the original canonical slot data, err := beaconDB.GetBlock(ctx, tx, i) @@ -522,7 +522,7 @@ func (c *CheckSnapshots) Run(ctx *Context) error { if blk.Version() >= clparams.BellatrixVersion { br.Block = blk.Block.Body.ExecutionPayload } - blk2, err := snReader.ReadBlock(i) + blk2, err := snReader.ReadBlockBySlot(ctx, tx, i) if err != nil { log.Error("Error detected in decoding snapshots", "err", err, "slot", i) return nil @@ -538,7 +538,7 @@ func (c *CheckSnapshots) Run(ctx *Context) error { log.Error("Mismatching blocks", "slot", i, "gotSlot", blk2.Block.Slot, "datadir", libcommon.Hash(hash1), "snapshot", libcommon.Hash(hash2)) return nil } - header, _, _, err := snReader.ReadHeader(i) + header, _, _, err := csn.ReadHeader(i) if err != nil { return err } @@ -574,7 +574,7 @@ func (c *LoopSnapshots) Run(ctx *Context) error { log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) rawDB := persistence.AferoRawBeaconBlockChainFromOsPath(beaconConfig, dirs.CaplinHistory) - _, db, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, rawDB, dirs.CaplinIndexing, nil, false) + beaconDB, db, err := caplin1.OpenCaplinDatabase(ctx, db_config.DatabaseConfiguration{PruneDepth: math.MaxUint64}, beaconConfig, rawDB, dirs.CaplinIndexing, nil, false) if err != nil { return err } @@ -598,10 +598,10 @@ func (c *LoopSnapshots) Run(ctx *Context) error { } br := &snapshot_format.MockBlockReader{} - snReader := freezeblocks.NewBeaconSnapshotReader(csn, br, beaconConfig) + snReader := freezeblocks.NewBeaconSnapshotReader(csn, br, beaconDB, beaconConfig) start := time.Now() for i := c.Slot; i < to; i++ { - snReader.ReadBlock(i) + snReader.ReadBlockBySlot(ctx, tx, i) } log.Info("Successfully checked", "slot", c.Slot, "time", time.Since(start)) return nil diff --git a/cmd/caplin/caplin1/run.go b/cmd/caplin/caplin1/run.go index 456c61daf..98b6eb6a4 100644 --- a/cmd/caplin/caplin1/run.go +++ b/cmd/caplin/caplin1/run.go @@ -9,6 +9,7 @@ import ( proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader" "github.com/ledgerwatch/erigon/cl/antiquary" "github.com/ledgerwatch/erigon/cl/beacon" + "github.com/ledgerwatch/erigon/cl/beacon/beacon_router_configuration" "github.com/ledgerwatch/erigon/cl/beacon/handler" "github.com/ledgerwatch/erigon/cl/cltypes/solid" "github.com/ledgerwatch/erigon/cl/freezer" @@ -19,6 +20,7 @@ import ( "github.com/ledgerwatch/erigon/cl/persistence" persistence2 "github.com/ledgerwatch/erigon/cl/persistence" "github.com/ledgerwatch/erigon/cl/persistence/db_config" + "github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format" "github.com/ledgerwatch/erigon/cl/phase1/core/state" "github.com/ledgerwatch/erigon/cl/phase1/execution_client" "github.com/ledgerwatch/erigon/cl/phase1/forkchoice" @@ -79,7 +81,7 @@ func OpenCaplinDatabase(ctx context.Context, func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, engine execution_client.ExecutionEngine, beaconConfig *clparams.BeaconChainConfig, genesisConfig *clparams.GenesisConfig, state *state.CachingBeaconState, - caplinFreezer freezer.Freezer, dirs datadir.Dirs, cfg beacon.RouterConfiguration, + caplinFreezer freezer.Freezer, dirs datadir.Dirs, cfg beacon_router_configuration.RouterConfiguration, eth1Getter snapshot_format.ExecutionBlockReaderByNumber, snDownloader proto_downloader.DownloaderClient, backfilling bool) error { rawDB := persistence.AferoRawBeaconBlockChainFromOsPath(beaconConfig, dirs.CaplinHistory) beaconDB, db, err := OpenCaplinDatabase(ctx, db_config.DefaultDatabaseConfiguration, beaconConfig, rawDB, dirs.CaplinIndexing, engine, false) @@ -94,7 +96,7 @@ func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, engi logger := log.New("app", "caplin") csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, dirs.Snap, logger) - rcsn := freezeblocks.NewBeaconSnapshotReader(csn, nil, beaconConfig) + rcsn := freezeblocks.NewBeaconSnapshotReader(csn, eth1Getter, beaconDB, beaconConfig) if caplinFreezer != nil { if err := freezer2.PutObjectSSZIntoFreezer("beaconState", "caplin_core", 0, state, caplinFreezer); err != nil { @@ -152,8 +154,8 @@ func RunCaplinPhase1(ctx context.Context, sentinel sentinel.SentinelClient, engi } if cfg.Active { - apiHandler := handler.NewApiHandler(genesisConfig, beaconConfig, rawDB, db, forkChoice, pool) - go beacon.ListenAndServe(apiHandler, &cfg) + apiHandler := handler.NewApiHandler(genesisConfig, beaconConfig, rawDB, db, forkChoice, pool, rcsn) + go beacon.ListenAndServe(apiHandler, cfg) log.Info("Beacon API started", "addr", cfg.Address) } diff --git a/cmd/caplin/main.go b/cmd/caplin/main.go index eb0a12da5..0a9e80a2c 100644 --- a/cmd/caplin/main.go +++ b/cmd/caplin/main.go @@ -18,7 +18,7 @@ import ( "fmt" "os" - "github.com/ledgerwatch/erigon/cl/beacon" + "github.com/ledgerwatch/erigon/cl/beacon/beacon_router_configuration" "github.com/ledgerwatch/erigon/cl/cltypes" "github.com/ledgerwatch/erigon/cl/fork" freezer2 "github.com/ledgerwatch/erigon/cl/freezer" @@ -121,12 +121,12 @@ func runCaplinNode(cliCtx *cli.Context) error { } } - return caplin1.RunCaplinPhase1(ctx, sentinel, executionEngine, cfg.BeaconCfg, cfg.GenesisCfg, state, caplinFreezer, cfg.Dirs, beacon.RouterConfiguration{ + return caplin1.RunCaplinPhase1(ctx, sentinel, executionEngine, cfg.BeaconCfg, cfg.GenesisCfg, state, caplinFreezer, cfg.Dirs, beacon_router_configuration.RouterConfiguration{ Protocol: cfg.BeaconProtocol, Address: cfg.BeaconAddr, ReadTimeTimeout: cfg.BeaconApiReadTimeout, WriteTimeout: cfg.BeaconApiWriteTimeout, IdleTimeout: cfg.BeaconApiWriteTimeout, Active: !cfg.NoBeaconApi, - }, nil, false) + }, nil, nil, false) } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 41fcfb504..20a5dc832 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -25,6 +25,7 @@ import ( "runtime" "strconv" "strings" + "time" "github.com/ledgerwatch/erigon/cl/clparams" @@ -862,6 +863,41 @@ var ( Name: "silkworm.sentry", Usage: "Enable embedded Silkworm Sentry service", } + BeaconAPIFlag = cli.BoolFlag{ + Name: "beacon.api", + Usage: "Enable beacon API", + Value: false, + } + BeaconApiProtocolFlag = cli.StringFlag{ + Name: "beacon.api.protocol", + Usage: "Protocol for beacon API", + Value: "tcp", + } + BeaconApiReadTimeoutFlag = cli.Uint64Flag{ + Name: "beacon.api.read.timeout", + Usage: "Sets the seconds for a read time out in the beacon api", + Value: 5, + } + BeaconApiWriteTimeoutFlag = cli.Uint64Flag{ + Name: "beacon.api.write.timeout", + Usage: "Sets the seconds for a write time out in the beacon api", + Value: 5, + } + BeaconApiIdleTimeoutFlag = cli.Uint64Flag{ + Name: "beacon.api.ide.timeout", + Usage: "Sets the seconds for a write time out in the beacon api", + Value: 25, + } + BeaconApiAddrFlag = cli.StringFlag{ + Name: "beacon.api.addr", + Usage: "sets the host to listen for beacon api requests", + Value: "localhost", + } + BeaconApiPortFlag = cli.UintFlag{ + Name: "beacon.api.port", + Usage: "sets the port to listen for beacon api requests", + Value: 5555, + } ) var MetricFlags = []cli.Flag{&MetricsEnabledFlag, &MetricsHTTPFlag, &MetricsPortFlag} @@ -1482,6 +1518,15 @@ func setWhitelist(ctx *cli.Context, cfg *ethconfig.Config) { } } +func setBeaconAPI(ctx *cli.Context, cfg *ethconfig.Config) { + cfg.BeaconRouter.Active = ctx.Bool(BeaconAPIFlag.Name) + cfg.BeaconRouter.Protocol = ctx.String(BeaconApiProtocolFlag.Name) + cfg.BeaconRouter.Address = fmt.Sprintf("%s:%d", ctx.String(BeaconApiAddrFlag.Name), ctx.Int(BeaconApiPortFlag.Name)) + cfg.BeaconRouter.ReadTimeTimeout = time.Duration(ctx.Uint64(BeaconApiReadTimeoutFlag.Name)) * time.Second + cfg.BeaconRouter.WriteTimeout = time.Duration(ctx.Uint64(BeaconApiWriteTimeoutFlag.Name)) * time.Second + cfg.BeaconRouter.IdleTimeout = time.Duration(ctx.Uint64(BeaconApiIdleTimeoutFlag.Name)) * time.Second +} + func setSilkworm(ctx *cli.Context, cfg *ethconfig.Config) { cfg.SilkwormPath = ctx.String(SilkwormPathFlag.Name) if ctx.IsSet(SilkwormExecutionFlag.Name) { @@ -1599,6 +1644,7 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *nodecfg.Config, cfg *ethconfig.C setWhitelist(ctx, cfg) setBorConfig(ctx, cfg) setSilkworm(ctx, cfg) + setBeaconAPI(ctx, cfg) cfg.Ethstats = ctx.String(EthStatsURLFlag.Name) cfg.HistoryV3 = ctx.Bool(HistoryV3Flag.Name) diff --git a/eth/backend.go b/eth/backend.go index 9737aba24..9bb1a8f5d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -36,11 +36,11 @@ import ( "github.com/ledgerwatch/erigon-lib/diagnostics" "github.com/ledgerwatch/erigon-lib/downloader/downloadergrpc" "github.com/ledgerwatch/erigon-lib/kv/kvcfg" - "github.com/ledgerwatch/erigon/cl/beacon" "github.com/ledgerwatch/erigon/cl/clparams" "github.com/ledgerwatch/erigon/cl/cltypes" "github.com/ledgerwatch/erigon/cl/fork" "github.com/ledgerwatch/erigon/cl/persistence" + "github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format/getters" clcore "github.com/ledgerwatch/erigon/cl/phase1/core" "github.com/ledgerwatch/erigon/cl/phase1/execution_client" "github.com/ledgerwatch/erigon/cl/sentinel" @@ -853,7 +853,8 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger backend.sentinel = client go func() { - if err := caplin1.RunCaplinPhase1(ctx, client, engine, beaconCfg, genesisCfg, state, nil, dirs, beacon.RouterConfiguration{Active: false}, backend.downloaderClient, true); err != nil { + eth1Getter := getters.NewExecutionSnapshotReader(ctx, blockReader, backend.chainDB) + if err := caplin1.RunCaplinPhase1(ctx, client, engine, beaconCfg, genesisCfg, state, nil, dirs, config.BeaconRouter, eth1Getter, backend.downloaderClient, true); err != nil { logger.Error("could not start caplin", "err", err) } ctxCancel() diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index ddc7d3281..dacaf2f73 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -34,6 +34,7 @@ import ( "github.com/ledgerwatch/erigon-lib/downloader/downloadercfg" "github.com/ledgerwatch/erigon-lib/txpool/txpoolcfg" + "github.com/ledgerwatch/erigon/cl/beacon/beacon_router_configuration" "github.com/ledgerwatch/erigon/consensus/ethash/ethashcfg" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/ethconfig/estimate" @@ -186,8 +187,9 @@ type Config struct { BadBlockHash common.Hash // hash of the block marked as bad - Snapshot BlocksFreezing - Downloader *downloadercfg.Cfg + Snapshot BlocksFreezing + Downloader *downloadercfg.Cfg + BeaconRouter beacon_router_configuration.RouterConfiguration Dirs datadir.Dirs diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index f3ee00f82..7963cb347 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -170,5 +170,13 @@ var DefaultFlags = []cli.Flag{ &utils.SilkwormRpcDaemonFlag, &utils.SilkwormSentryFlag, + &utils.BeaconAPIFlag, + &utils.BeaconApiAddrFlag, + &utils.BeaconApiPortFlag, + &utils.BeaconApiReadTimeoutFlag, + &utils.BeaconApiWriteTimeoutFlag, + &utils.BeaconApiProtocolFlag, + &utils.BeaconApiIdleTimeoutFlag, + &utils.TrustedSetupFile, } diff --git a/turbo/snapshotsync/freezeblocks/beacon_block_reader.go b/turbo/snapshotsync/freezeblocks/beacon_block_reader.go index cd74d94a6..c83b42e91 100644 --- a/turbo/snapshotsync/freezeblocks/beacon_block_reader.go +++ b/turbo/snapshotsync/freezeblocks/beacon_block_reader.go @@ -2,12 +2,16 @@ package freezeblocks import ( "bytes" + "context" "fmt" "sync" libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/cl/clparams" "github.com/ledgerwatch/erigon/cl/cltypes" + "github.com/ledgerwatch/erigon/cl/persistence" + "github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies" "github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format" "github.com/pierrec/lz4" ) @@ -25,8 +29,8 @@ var lz4ReaderPool = sync.Pool{ type BeaconSnapshotReader interface { // ReadBlock reads the block at the given slot. // If the block is not present, it returns nil. - ReadBlock(slot uint64) (*cltypes.SignedBeaconBlock, error) - ReadHeader(slot uint64) (*cltypes.SignedBeaconBlockHeader, uint64, libcommon.Hash, error) + ReadBlockBySlot(ctx context.Context, tx kv.Tx, slot uint64) (*cltypes.SignedBeaconBlock, error) + ReadBlockByRoot(ctx context.Context, tx kv.Tx, blockRoot libcommon.Hash) (*cltypes.SignedBeaconBlock, error) FrozenSlots() uint64 } @@ -35,22 +39,30 @@ type beaconSnapshotReader struct { sn *CaplinSnapshots eth1Getter snapshot_format.ExecutionBlockReaderByNumber + beaconDB persistence.BlockSource cfg *clparams.BeaconChainConfig } -func NewBeaconSnapshotReader(snapshots *CaplinSnapshots, eth1Getter snapshot_format.ExecutionBlockReaderByNumber, cfg *clparams.BeaconChainConfig) BeaconSnapshotReader { - return &beaconSnapshotReader{sn: snapshots, eth1Getter: eth1Getter, cfg: cfg} +func NewBeaconSnapshotReader(snapshots *CaplinSnapshots, eth1Getter snapshot_format.ExecutionBlockReaderByNumber, beaconDB persistence.BlockSource, cfg *clparams.BeaconChainConfig) BeaconSnapshotReader { + return &beaconSnapshotReader{sn: snapshots, eth1Getter: eth1Getter, cfg: cfg, beaconDB: beaconDB} } func (r *beaconSnapshotReader) FrozenSlots() uint64 { return r.sn.BlocksAvailable() } -func (r *beaconSnapshotReader) ReadBlock(slot uint64) (*cltypes.SignedBeaconBlock, error) { +func (r *beaconSnapshotReader) ReadBlockBySlot(ctx context.Context, tx kv.Tx, slot uint64) (*cltypes.SignedBeaconBlock, error) { view := r.sn.View() defer view.Close() var buf []byte + if slot > r.sn.BlocksAvailable() { + data, err := r.beaconDB.GetBlock(ctx, tx, slot) + return data.Data, err + } + if r.eth1Getter == nil { + return nil, nil + } seg, ok := view.BeaconBlocksSegment(slot) if !ok { @@ -90,31 +102,54 @@ func (r *beaconSnapshotReader) ReadBlock(slot uint64) (*cltypes.SignedBeaconBloc return snapshot_format.ReadBlockFromSnapshot(lzReader, r.eth1Getter, r.cfg) } -func (r *beaconSnapshotReader) ReadHeader(slot uint64) (*cltypes.SignedBeaconBlockHeader, uint64, libcommon.Hash, error) { +func (r *beaconSnapshotReader) ReadBlockByRoot(ctx context.Context, tx kv.Tx, root libcommon.Hash) (*cltypes.SignedBeaconBlock, error) { view := r.sn.View() defer view.Close() + signedHeader, canonical, err := beacon_indicies.ReadSignedHeaderByBlockRoot(ctx, tx, root) + if err != nil { + return nil, err + } + // root non-canonical? BAD + if !canonical { + return nil, nil + } + if signedHeader == nil { + return nil, nil + } + slot := signedHeader.Header.Slot var buf []byte + if slot > r.sn.BlocksAvailable() { + data, err := r.beaconDB.GetBlock(ctx, tx, slot) + return data.Data, err + } + if r.eth1Getter == nil { + return nil, nil + } seg, ok := view.BeaconBlocksSegment(slot) if !ok { - return nil, 0, libcommon.Hash{}, nil + return nil, nil } if seg.idxSlot == nil { - return nil, 0, libcommon.Hash{}, nil + return nil, nil + } + if slot < seg.idxSlot.BaseDataID() { + return nil, fmt.Errorf("slot %d is before the base data id %d", slot, seg.idxSlot.BaseDataID()) } blockOffset := seg.idxSlot.OrdinalLookup(slot - seg.idxSlot.BaseDataID()) gg := seg.seg.MakeGetter() gg.Reset(blockOffset) if !gg.HasNext() { - return nil, 0, libcommon.Hash{}, nil + return nil, nil } + buf = buf[:0] buf, _ = gg.Next(buf) if len(buf) == 0 { - return nil, 0, libcommon.Hash{}, nil + return nil, nil } // Decompress this thing buffer := buffersPool.Get().(*bytes.Buffer) @@ -127,5 +162,5 @@ func (r *beaconSnapshotReader) ReadHeader(slot uint64) (*cltypes.SignedBeaconBlo lzReader.Reset(buffer) // Use pooled buffers and readers to avoid allocations. - return snapshot_format.ReadBlockHeaderFromSnapshotWithExecutionData(lzReader, r.cfg) + return snapshot_format.ReadBlockFromSnapshot(lzReader, r.eth1Getter, r.cfg) } diff --git a/turbo/snapshotsync/freezeblocks/block_reader.go b/turbo/snapshotsync/freezeblocks/block_reader.go index d1b67cc64..9904cdeac 100644 --- a/turbo/snapshotsync/freezeblocks/block_reader.go +++ b/turbo/snapshotsync/freezeblocks/block_reader.go @@ -384,6 +384,7 @@ func (r *BlockReader) Header(ctx context.Context, tx kv.Getter, hash common.Hash } func (r *BlockReader) BodyWithTransactions(ctx context.Context, tx kv.Getter, hash common.Hash, blockHeight uint64) (body *types.Body, err error) { + body, err = rawdb.ReadBodyWithTransactions(tx, hash, blockHeight) if err != nil { return nil, err diff --git a/turbo/snapshotsync/freezeblocks/caplin_snapshots.go b/turbo/snapshotsync/freezeblocks/caplin_snapshots.go index 5f849d1a1..7862c85ad 100644 --- a/turbo/snapshotsync/freezeblocks/caplin_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/caplin_snapshots.go @@ -12,6 +12,7 @@ import ( "sync" "sync/atomic" + libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/background" "github.com/ledgerwatch/erigon-lib/common/cmp" "github.com/ledgerwatch/erigon-lib/common/dbg" @@ -19,6 +20,7 @@ import ( "github.com/ledgerwatch/erigon-lib/downloader/snaptype" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/recsplit" + "github.com/ledgerwatch/erigon/cl/cltypes" "github.com/ledgerwatch/erigon/cl/persistence" "github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format" "github.com/ledgerwatch/erigon/eth/ethconfig" @@ -438,3 +440,43 @@ func (s *CaplinSnapshots) BuildMissingIndices(ctx context.Context, logger log.Lo return s.ReopenFolder() } + +func (s *CaplinSnapshots) ReadHeader(slot uint64) (*cltypes.SignedBeaconBlockHeader, uint64, libcommon.Hash, error) { + view := s.View() + defer view.Close() + + var buf []byte + + seg, ok := view.BeaconBlocksSegment(slot) + if !ok { + return nil, 0, libcommon.Hash{}, nil + } + + if seg.idxSlot == nil { + return nil, 0, libcommon.Hash{}, nil + } + blockOffset := seg.idxSlot.OrdinalLookup(slot - seg.idxSlot.BaseDataID()) + + gg := seg.seg.MakeGetter() + gg.Reset(blockOffset) + if !gg.HasNext() { + return nil, 0, libcommon.Hash{}, nil + } + + buf, _ = gg.Next(buf) + if len(buf) == 0 { + return nil, 0, libcommon.Hash{}, nil + } + // Decompress this thing + buffer := buffersPool.Get().(*bytes.Buffer) + defer buffersPool.Put(buffer) + + buffer.Reset() + buffer.Write(buf) + lzReader := lz4ReaderPool.Get().(*lz4.Reader) + defer lz4ReaderPool.Put(lzReader) + lzReader.Reset(buffer) + + // Use pooled buffers and readers to avoid allocations. + return snapshot_format.ReadBlockHeaderFromSnapshotWithExecutionData(lzReader) +}