diff --git a/cl/sentinel/handlers/blocks.go b/cl/sentinel/handlers/blocks.go index 21f4aca2a..c202a7ece 100644 --- a/cl/sentinel/handlers/blocks.go +++ b/cl/sentinel/handlers/blocks.go @@ -14,24 +14,174 @@ package handlers import ( + "errors" + "io" + + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/cl/clparams" + "github.com/ledgerwatch/erigon/cl/cltypes" + "github.com/ledgerwatch/erigon/cl/cltypes/solid" + "github.com/ledgerwatch/erigon/cl/fork" + "github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies" "github.com/ledgerwatch/erigon/cl/sentinel/communication/ssz_snappy" - "github.com/ledgerwatch/log/v3" + "github.com/ledgerwatch/erigon/cl/utils" "github.com/libp2p/go-libp2p/core/network" ) -// func (c *ConsensusHandlers) blocksByRangeHandlerPROTODONOTTOUCH69(stream network.Stream) error { -// log.Trace("Got block by range handler call") -// return ssz_snappy.EncodeAndWrite(stream, &emptyString{}, ResourceUnavaiablePrefix) -// } +const MAX_REQUEST_BLOCKS = 96 -func (c *ConsensusHandlers) blocksByRangeHandler(stream network.Stream) error { - log.Trace("Got block by range handler call") - return ssz_snappy.EncodeAndWrite(stream, &emptyString{}, ResourceUnavaiablePrefix) +func (c *ConsensusHandlers) beaconBlocksByRangeHandler(s network.Stream) error { + peerId := s.Conn().RemotePeer().String() + if err := c.checkRateLimit(peerId, "beaconBlocksByRange", rateLimits.beaconBlocksByRangeLimit); err != nil { + ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix) + return err + } + + req := &cltypes.BeaconBlocksByRangeRequest{} + if err := ssz_snappy.DecodeAndReadNoForkDigest(s, req, clparams.Phase0Version); err != nil { + return err + } + + if req.Step != 1 { + return errors.New("step must be 1") + } + + tx, err := c.indiciesDB.BeginRo(c.ctx) + if err != nil { + return err + } + defer tx.Rollback() + + // Limit the number of blocks to the count specified in the request. + if int(req.Count) > MAX_REQUEST_BLOCKS { + req.Count = MAX_REQUEST_BLOCKS + } + + beaconBlockRooots, slots, err := beacon_indicies.ReadBeaconBlockRootsInSlotRange(c.ctx, tx, req.StartSlot, req.Count-1) + if err != nil { + return err + } + if len(beaconBlockRooots) == 0 || len(slots) == 0 { + return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavaiablePrefix) + } + // Read the fork digest + forkDigest, err := fork.ComputeForkDigestForVersion( + utils.Uint32ToBytes4(c.beaconConfig.GenesisForkVersion), + c.genesisConfig.GenesisValidatorRoot, + ) + if err != nil { + return err + } + + resourceAvaiable := false + for i, slot := range slots { + r, err := c.beaconDB.BlockReader(c.ctx, slot, beaconBlockRooots[i]) + if err != nil { + return err + } + defer r.Close() + + if !resourceAvaiable { + if _, err := s.Write([]byte{1}); err != nil { + return err + } + resourceAvaiable = true + } + + if _, err := s.Write(forkDigest[:]); err != nil { + return err + } + _, err = io.Copy(s, r) + if err != nil { + return err + } + } + if !resourceAvaiable { + return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavaiablePrefix) + } + return nil } -func (c *ConsensusHandlers) beaconBlocksByRootHandler(stream network.Stream) error { - log.Trace("Got beacon block by root handler call") - return ssz_snappy.EncodeAndWrite(stream, &emptyString{}, ResourceUnavaiablePrefix) +func (c *ConsensusHandlers) beaconBlocksByRootHandler(s network.Stream) error { + peerId := s.Conn().RemotePeer().String() + if err := c.checkRateLimit(peerId, "beaconBlocksByRoot", rateLimits.beaconBlocksByRootLimit); err != nil { + ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix) + return err + } + + var req solid.HashListSSZ = solid.NewHashList(100) + if err := ssz_snappy.DecodeAndReadNoForkDigest(s, req, clparams.Phase0Version); err != nil { + return err + } + + blockRoots := []libcommon.Hash{} + for i := 0; i < req.Length(); i++ { + blockRoot := req.Get(i) + blockRoots = append(blockRoots, blockRoot) + // Limit the number of blocks to the count specified in the request. + if len(blockRoots) >= MAX_REQUEST_BLOCKS { + break + } + } + if len(blockRoots) == 0 { + return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavaiablePrefix) + } + tx, err := c.indiciesDB.BeginRo(c.ctx) + if err != nil { + return err + } + defer tx.Rollback() + + // Read the fork digest + forkDigest, err := fork.ComputeForkDigestForVersion( + utils.Uint32ToBytes4(c.beaconConfig.GenesisForkVersion), + c.genesisConfig.GenesisValidatorRoot, + ) + if err != nil { + return err + } + + resourceAvaiable := false + for i, blockRoot := range blockRoots { + slot, err := beacon_indicies.ReadBlockSlotByBlockRoot(tx, blockRoot) + if slot == nil { + continue + } + if err != nil { + return err + } + + r, err := c.beaconDB.BlockReader(c.ctx, *slot, blockRoots[i]) + if err != nil { + return err + } + defer r.Close() + + if !resourceAvaiable { + if _, err := s.Write([]byte{1}); err != nil { + return err + } + resourceAvaiable = true + } + + if _, err := s.Write(forkDigest[:]); err != nil { + return err + } + + // Read block from DB + block := cltypes.NewSignedBeaconBlock(c.beaconConfig) + + if err := ssz_snappy.DecodeAndReadNoForkDigest(r, block, clparams.Phase0Version); err != nil { + return err + } + if err := ssz_snappy.EncodeAndWrite(s, block); err != nil { + return err + } + } + if !resourceAvaiable { + return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavaiablePrefix) + } + return nil } type emptyString struct{} diff --git a/cl/sentinel/handlers/blocks_by_range_test.go b/cl/sentinel/handlers/blocks_by_range_test.go new file mode 100644 index 000000000..1e8804514 --- /dev/null +++ b/cl/sentinel/handlers/blocks_by_range_test.go @@ -0,0 +1,145 @@ +package handlers + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "io" + "testing" + + "github.com/golang/snappy" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/cl/clparams" + "github.com/ledgerwatch/erigon/cl/cltypes" + "github.com/ledgerwatch/erigon/cl/fork" + "github.com/ledgerwatch/erigon/cl/persistence" + "github.com/ledgerwatch/erigon/cl/sentinel/communication" + "github.com/ledgerwatch/erigon/cl/sentinel/communication/ssz_snappy" + "github.com/ledgerwatch/erigon/cl/sentinel/peers" + "github.com/ledgerwatch/erigon/cl/utils" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/stretchr/testify/require" +) + +func TestBlocksByRootHandler(t *testing.T) { + ctx := context.Background() + + listenAddrHost := "/ip4/127.0.0.1/tcp/5000" + host, err := libp2p.New(libp2p.ListenAddrStrings(listenAddrHost)) + require.NoError(t, err) + + listenAddrHost1 := "/ip4/127.0.0.1/tcp/5001" + host1, err := libp2p.New(libp2p.ListenAddrStrings(listenAddrHost1)) + require.NoError(t, err) + + err = host.Connect(ctx, peer.AddrInfo{ + ID: host1.ID(), + Addrs: host1.Addrs(), + }) + require.NoError(t, err) + + peersPool := peers.NewPool() + beaconDB, indiciesDB := setupStore(t) + store := persistence.NewBeaconChainDatabaseFilesystem(beaconDB, nil, &clparams.MainnetBeaconConfig) + + tx, _ := indiciesDB.BeginRw(ctx) + + startSlot := uint64(100) + count := uint64(10) + step := uint64(1) + + expBlocks := populateDatabaseWithBlocks(t, store, tx, startSlot, count) + tx.Commit() + + genesisCfg, _, beaconCfg := clparams.GetConfigsByNetwork(1) + c := NewConsensusHandlers( + ctx, + beaconDB, + indiciesDB, + host, + peersPool, + beaconCfg, + genesisCfg, + &cltypes.Metadata{}, + ) + c.Start() + req := &cltypes.BeaconBlocksByRangeRequest{ + StartSlot: startSlot, + Count: count, + Step: step, + } + var reqBuf bytes.Buffer + if err := ssz_snappy.EncodeAndWrite(&reqBuf, req); err != nil { + return + } + + reqData := libcommon.CopyBytes(reqBuf.Bytes()) + stream, err := host1.NewStream(ctx, host.ID(), protocol.ID(communication.BeaconBlocksByRangeProtocolV1)) + require.NoError(t, err) + + _, err = stream.Write(reqData) + require.NoError(t, err) + + firstByte := make([]byte, 1) + _, err = stream.Read(firstByte) + require.NoError(t, err) + require.Equal(t, firstByte[0], byte(1)) + + for i := 0; i < int(count); i++ { + forkDigest := make([]byte, 4) + + _, err := stream.Read(forkDigest) + if err != nil { + if err == io.EOF { + t.Fatal("Stream is empty") + } else { + require.NoError(t, err) + } + } + + encodedLn, _, err := ssz_snappy.ReadUvarint(stream) + require.NoError(t, err) + + raw := make([]byte, encodedLn) + sr := snappy.NewReader(stream) + bytesRead := 0 + for bytesRead < int(encodedLn) { + n, err := sr.Read(raw[bytesRead:]) + require.NoError(t, err) + bytesRead += n + } + + // Fork digests + respForkDigest := binary.BigEndian.Uint32(forkDigest) + if respForkDigest == 0 { + require.NoError(t, fmt.Errorf("null fork digest")) + } + + version, err := fork.ForkDigestVersion(utils.Uint32ToBytes4(respForkDigest), beaconCfg, genesisCfg.GenesisValidatorRoot) + if err != nil { + require.NoError(t, err) + } + + block := cltypes.NewSignedBeaconBlock(&clparams.MainnetBeaconConfig) + if err = block.DecodeSSZ(raw, int(version)); err != nil { + require.NoError(t, err) + return + } + require.Equal(t, expBlocks[i].Block.Slot, block.Block.Slot) + require.Equal(t, expBlocks[i].Block.StateRoot, block.Block.StateRoot) + require.Equal(t, expBlocks[i].Block.ParentRoot, block.Block.ParentRoot) + require.Equal(t, expBlocks[i].Block.ProposerIndex, block.Block.ProposerIndex) + require.Equal(t, expBlocks[i].Block.Body.ExecutionPayload.BlockNumber, block.Block.Body.ExecutionPayload.BlockNumber) + } + + _, err = stream.Read(make([]byte, 1)) + if err != io.EOF { + t.Fatal("Stream is not empty") + } + + defer indiciesDB.Close() + defer tx.Rollback() +} diff --git a/cl/sentinel/handlers/blocks_by_root_test.go b/cl/sentinel/handlers/blocks_by_root_test.go new file mode 100644 index 000000000..ad7d3ae33 --- /dev/null +++ b/cl/sentinel/handlers/blocks_by_root_test.go @@ -0,0 +1,144 @@ +package handlers + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "io" + "testing" + + "github.com/golang/snappy" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/cl/clparams" + "github.com/ledgerwatch/erigon/cl/cltypes" + "github.com/ledgerwatch/erigon/cl/cltypes/solid" + "github.com/ledgerwatch/erigon/cl/fork" + "github.com/ledgerwatch/erigon/cl/persistence" + "github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies" + "github.com/ledgerwatch/erigon/cl/sentinel/communication" + "github.com/ledgerwatch/erigon/cl/sentinel/communication/ssz_snappy" + "github.com/ledgerwatch/erigon/cl/sentinel/peers" + "github.com/ledgerwatch/erigon/cl/utils" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/stretchr/testify/require" +) + +func TestBlocksByRangeHandler(t *testing.T) { + ctx := context.Background() + + listenAddrHost := "/ip4/127.0.0.1/tcp/6000" + host, err := libp2p.New(libp2p.ListenAddrStrings(listenAddrHost)) + require.NoError(t, err) + + listenAddrHost1 := "/ip4/127.0.0.1/tcp/6001" + host1, err := libp2p.New(libp2p.ListenAddrStrings(listenAddrHost1)) + require.NoError(t, err) + + err = host.Connect(ctx, peer.AddrInfo{ + ID: host1.ID(), + Addrs: host1.Addrs(), + }) + require.NoError(t, err) + + peersPool := peers.NewPool() + beaconDB, indiciesDB := setupStore(t) + store := persistence.NewBeaconChainDatabaseFilesystem(beaconDB, nil, &clparams.MainnetBeaconConfig) + + tx, _ := indiciesDB.BeginRw(ctx) + + startSlot := uint64(100) + count := uint64(10) + + expBlocks := populateDatabaseWithBlocks(t, store, tx, startSlot, count) + var blockRoots []libcommon.Hash + blockRoots, _, _ = beacon_indicies.ReadBeaconBlockRootsInSlotRange(ctx, tx, startSlot, startSlot+count) + tx.Commit() + + genesisCfg, _, beaconCfg := clparams.GetConfigsByNetwork(1) + c := NewConsensusHandlers( + ctx, + beaconDB, + indiciesDB, + host, + peersPool, + beaconCfg, + genesisCfg, + &cltypes.Metadata{}, + ) + c.Start() + var req solid.HashListSSZ = solid.NewHashList(len(expBlocks)) + + for _, block := range blockRoots { + req.Append(block) + } + var reqBuf bytes.Buffer + if err := ssz_snappy.EncodeAndWrite(&reqBuf, req); err != nil { + return + } + + reqData := libcommon.CopyBytes(reqBuf.Bytes()) + stream, err := host1.NewStream(ctx, host.ID(), protocol.ID(communication.BeaconBlocksByRootProtocolV1)) + require.NoError(t, err) + + _, err = stream.Write(reqData) + require.NoError(t, err) + + firstByte := make([]byte, 1) + _, err = stream.Read(firstByte) + require.NoError(t, err) + require.Equal(t, firstByte[0], byte(1)) + + for i := 0; i < len(blockRoots); i++ { + forkDigest := make([]byte, 4) + _, err := stream.Read(forkDigest) + if err != nil && err != io.EOF { + require.NoError(t, err) + } + + encodedLn, _, err := ssz_snappy.ReadUvarint(stream) + require.NoError(t, err) + + raw := make([]byte, encodedLn) + sr := snappy.NewReader(stream) + bytesRead := 0 + for bytesRead < int(encodedLn) { + n, err := sr.Read(raw[bytesRead:]) + if err != nil { + require.NoError(t, err) + } + bytesRead += n + } + + // Fork digests + respForkDigest := binary.BigEndian.Uint32(forkDigest) + if respForkDigest == 0 { + require.NoError(t, fmt.Errorf("null fork digest")) + } + version, err := fork.ForkDigestVersion(utils.Uint32ToBytes4(respForkDigest), beaconCfg, genesisCfg.GenesisValidatorRoot) + if err != nil { + require.NoError(t, err) + } + + block := cltypes.NewSignedBeaconBlock(&clparams.MainnetBeaconConfig) + if err = block.DecodeSSZ(raw, int(version)); err != nil { + require.NoError(t, err) + return + } + require.Equal(t, expBlocks[i].Block.Slot, block.Block.Slot) + require.Equal(t, expBlocks[i].Block.StateRoot, block.Block.StateRoot) + require.Equal(t, expBlocks[i].Block.ParentRoot, block.Block.ParentRoot) + require.Equal(t, expBlocks[i].Block.ProposerIndex, block.Block.ProposerIndex) + require.Equal(t, expBlocks[i].Block.Body.ExecutionPayload.BlockNumber, block.Block.Body.ExecutionPayload.BlockNumber) + } + + _, err = stream.Read(make([]byte, 1)) + if err != io.EOF { + t.Fatal("Stream is not empty") + } + + defer indiciesDB.Close() + defer tx.Rollback() +} diff --git a/cl/sentinel/handlers/handlers.go b/cl/sentinel/handlers/handlers.go index 196518469..5887f6984 100644 --- a/cl/sentinel/handlers/handlers.go +++ b/cl/sentinel/handlers/handlers.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/cl/sentinel/communication" "github.com/ledgerwatch/erigon/cl/sentinel/peers" "github.com/ledgerwatch/erigon/cl/utils" @@ -35,22 +36,27 @@ import ( ) type RateLimits struct { - pingLimit int - goodbyeLimit int - metadataV1Limit int - metadataV2Limit int - statusLimit int + pingLimit int + goodbyeLimit int + metadataV1Limit int + metadataV2Limit int + statusLimit int + beaconBlocksByRangeLimit int + beaconBlocksByRootLimit int } const punishmentPeriod = time.Minute const defaultRateLimit = 5000 +const defaultBlockHandlerRateLimit = 200 var rateLimits = RateLimits{ - pingLimit: defaultRateLimit, - goodbyeLimit: defaultRateLimit, - metadataV1Limit: defaultRateLimit, - metadataV2Limit: defaultRateLimit, - statusLimit: defaultRateLimit, + pingLimit: defaultRateLimit, + goodbyeLimit: defaultRateLimit, + metadataV1Limit: defaultRateLimit, + metadataV2Limit: defaultRateLimit, + statusLimit: defaultRateLimit, + beaconBlocksByRangeLimit: defaultBlockHandlerRateLimit, + beaconBlocksByRootLimit: defaultBlockHandlerRateLimit, } type ConsensusHandlers struct { @@ -61,6 +67,7 @@ type ConsensusHandlers struct { genesisConfig *clparams.GenesisConfig ctx context.Context beaconDB persistence.RawBeaconBlockChain + indiciesDB kv.RoDB peerRateLimits sync.Map punishmentEndTimes sync.Map } @@ -71,12 +78,13 @@ const ( ResourceUnavaiablePrefix = 0x03 ) -func NewConsensusHandlers(ctx context.Context, db persistence.RawBeaconBlockChain, host host.Host, +func NewConsensusHandlers(ctx context.Context, db persistence.RawBeaconBlockChain, indiciesDB kv.RoDB, host host.Host, peers *peers.Pool, beaconConfig *clparams.BeaconChainConfig, genesisConfig *clparams.GenesisConfig, metadata *cltypes.Metadata) *ConsensusHandlers { c := &ConsensusHandlers{ host: host, metadata: metadata, beaconDB: db, + indiciesDB: indiciesDB, genesisConfig: genesisConfig, beaconConfig: beaconConfig, ctx: ctx, @@ -90,7 +98,7 @@ func NewConsensusHandlers(ctx context.Context, db persistence.RawBeaconBlockChai communication.StatusProtocolV1: c.statusHandler, communication.MetadataProtocolV1: c.metadataV1Handler, communication.MetadataProtocolV2: c.metadataV2Handler, - communication.BeaconBlocksByRangeProtocolV1: c.blocksByRangeHandler, + communication.BeaconBlocksByRangeProtocolV1: c.beaconBlocksByRangeHandler, communication.BeaconBlocksByRootProtocolV1: c.beaconBlocksByRootHandler, } diff --git a/cl/sentinel/handlers/utils_test.go b/cl/sentinel/handlers/utils_test.go new file mode 100644 index 000000000..f197fea46 --- /dev/null +++ b/cl/sentinel/handlers/utils_test.go @@ -0,0 +1,61 @@ +package handlers + +import ( + "context" + "testing" + + "github.com/ledgerwatch/erigon-lib/common" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/memdb" + "github.com/ledgerwatch/erigon/cl/clparams" + "github.com/ledgerwatch/erigon/cl/cltypes" + "github.com/ledgerwatch/erigon/cl/persistence" + "github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies" + "github.com/spf13/afero" + "github.com/stretchr/testify/require" +) + +func setupStore(t *testing.T) (persistence.RawBeaconBlockChain, kv.RwDB) { + db := memdb.NewTestDB(t) + af := afero.NewMemMapFs() + rawDB := persistence.NewAferoRawBlockSaver(af, &clparams.MainnetBeaconConfig) + return rawDB, db +} + +func populateDatabaseWithBlocks(t *testing.T, store persistence.BeaconChainDatabase, tx kv.RwTx, startSlot, count uint64) []*cltypes.SignedBeaconBlock { + + mockParentRoot := common.Hash{1} + blocks := make([]*cltypes.SignedBeaconBlock, 0, count) + for i := uint64(0); i <= count; i++ { + slot := startSlot + i + block := cltypes.NewSignedBeaconBlock(&clparams.MainnetBeaconConfig) + block.Block.Slot = slot + block.Block.StateRoot = libcommon.Hash{byte(i)} + block.Block.ParentRoot = mockParentRoot + block.EncodingSizeSSZ() + bodyRoot, _ := block.Block.Body.HashSSZ() + canonical := true + + // Populate BeaconChainDatabase + store.WriteBlock(context.Background(), tx, block, canonical) + + // Populate indiciesDB + require.NoError(t, beacon_indicies.WriteBeaconBlockHeaderAndIndicies( + context.Background(), + tx, + &cltypes.SignedBeaconBlockHeader{ + Signature: block.Signature, + Header: &cltypes.BeaconBlockHeader{ + Slot: block.Block.Slot, + ParentRoot: block.Block.ParentRoot, + ProposerIndex: block.Block.ProposerIndex, + Root: block.Block.StateRoot, + BodyRoot: bodyRoot, + }, + }, + canonical)) + blocks = append(blocks, block) + } + return blocks +} diff --git a/cl/sentinel/sentinel.go b/cl/sentinel/sentinel.go index 839906fb1..793c9aea9 100644 --- a/cl/sentinel/sentinel.go +++ b/cl/sentinel/sentinel.go @@ -22,6 +22,7 @@ import ( "time" "github.com/go-chi/chi/v5" + "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/cl/sentinel/handlers" "github.com/ledgerwatch/erigon/cl/sentinel/handshake" "github.com/ledgerwatch/erigon/cl/sentinel/httpreqresp" @@ -74,7 +75,8 @@ type Sentinel struct { metadataV2 *cltypes.Metadata handshaker *handshake.HandShaker - db persistence.RawBeaconBlockChain + db persistence.RawBeaconBlockChain + indiciesDB kv.RoDB discoverConfig discover.Config pubsub *pubsub.PubSub @@ -166,7 +168,7 @@ func (s *Sentinel) createListener() (*discover.UDPv5, error) { } // Start stream handlers - handlers.NewConsensusHandlers(s.ctx, s.db, s.host, s.peers, s.cfg.BeaconConfig, s.cfg.GenesisConfig, s.metadataV2).Start() + handlers.NewConsensusHandlers(s.ctx, s.db, s.indiciesDB, s.host, s.peers, s.cfg.BeaconConfig, s.cfg.GenesisConfig, s.metadataV2).Start() net, err := discover.ListenV5(s.ctx, "any", conn, localNode, discCfg) if err != nil { @@ -180,14 +182,16 @@ func New( ctx context.Context, cfg *SentinelConfig, db persistence.RawBeaconBlockChain, + indiciesDB kv.RoDB, logger log.Logger, ) (*Sentinel, error) { s := &Sentinel{ - ctx: ctx, - cfg: cfg, - db: db, - metrics: true, - logger: logger, + ctx: ctx, + cfg: cfg, + db: db, + indiciesDB: indiciesDB, + metrics: true, + logger: logger, } // Setup discovery diff --git a/cl/sentinel/service/start.go b/cl/sentinel/service/start.go index efcd95ac7..50ad1b38c 100644 --- a/cl/sentinel/service/start.go +++ b/cl/sentinel/service/start.go @@ -8,6 +8,7 @@ import ( "github.com/ledgerwatch/erigon-lib/direct" sentinelrpc "github.com/ledgerwatch/erigon-lib/gointerfaces/sentinel" + "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon/cl/cltypes" "github.com/ledgerwatch/erigon/cl/persistence" "github.com/ledgerwatch/log/v3" @@ -20,8 +21,8 @@ type ServerConfig struct { Addr string } -func createSentinel(cfg *sentinel.SentinelConfig, db persistence.RawBeaconBlockChain, logger log.Logger) (*sentinel.Sentinel, error) { - sent, err := sentinel.New(context.Background(), cfg, db, logger) +func createSentinel(cfg *sentinel.SentinelConfig, db persistence.RawBeaconBlockChain, indiciesDB kv.RwDB, logger log.Logger) (*sentinel.Sentinel, error) { + sent, err := sentinel.New(context.Background(), cfg, db, indiciesDB, logger) if err != nil { return nil, err } @@ -57,9 +58,9 @@ func createSentinel(cfg *sentinel.SentinelConfig, db persistence.RawBeaconBlockC return sent, nil } -func StartSentinelService(cfg *sentinel.SentinelConfig, db persistence.RawBeaconBlockChain, srvCfg *ServerConfig, creds credentials.TransportCredentials, initialStatus *cltypes.Status, logger log.Logger) (sentinelrpc.SentinelClient, error) { +func StartSentinelService(cfg *sentinel.SentinelConfig, db persistence.RawBeaconBlockChain, indiciesDB kv.RwDB, srvCfg *ServerConfig, creds credentials.TransportCredentials, initialStatus *cltypes.Status, logger log.Logger) (sentinelrpc.SentinelClient, error) { ctx := context.Background() - sent, err := createSentinel(cfg, db, logger) + sent, err := createSentinel(cfg, db, indiciesDB, logger) if err != nil { return nil, err } diff --git a/cmd/caplin/main.go b/cmd/caplin/main.go index 91b3c0b35..99c63c8a9 100644 --- a/cmd/caplin/main.go +++ b/cmd/caplin/main.go @@ -87,7 +87,7 @@ func runCaplinNode(cliCtx *cli.Context) error { NetworkConfig: cfg.NetworkCfg, BeaconConfig: cfg.BeaconCfg, NoDiscovery: cfg.NoDiscovery, - }, nil, &service.ServerConfig{Network: cfg.ServerProtocol, Addr: cfg.ServerAddr}, nil, &cltypes.Status{ + }, nil, nil, &service.ServerConfig{Network: cfg.ServerProtocol, Addr: cfg.ServerAddr}, nil, &cltypes.Status{ ForkDigest: forkDigest, FinalizedRoot: state.FinalizedCheckpoint().BlockRoot(), FinalizedEpoch: state.FinalizedCheckpoint().Epoch(), diff --git a/cmd/sentinel/main.go b/cmd/sentinel/main.go index 3208d9c3e..9453bf492 100644 --- a/cmd/sentinel/main.go +++ b/cmd/sentinel/main.go @@ -54,7 +54,7 @@ func runSentinelNode(cliCtx *cli.Context) error { BeaconConfig: cfg.BeaconCfg, NoDiscovery: cfg.NoDiscovery, LocalDiscovery: cfg.LocalDiscovery, - }, nil, &service.ServerConfig{Network: cfg.ServerProtocol, Addr: cfg.ServerAddr}, nil, nil, log.Root()) + }, nil, nil, &service.ServerConfig{Network: cfg.ServerProtocol, Addr: cfg.ServerAddr}, nil, nil, log.Root()) if err != nil { log.Error("[Sentinel] Could not start sentinel", "err", err) return err diff --git a/eth/backend.go b/eth/backend.go index cda7367f6..dc7ad0c08 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -40,6 +40,7 @@ import ( "github.com/ledgerwatch/erigon/cl/cltypes" "github.com/ledgerwatch/erigon/cl/fork" "github.com/ledgerwatch/erigon/cl/persistence" + "github.com/ledgerwatch/erigon/cl/persistence/db_config" "github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format/getters" clcore "github.com/ledgerwatch/erigon/cl/phase1/core" "github.com/ledgerwatch/erigon/cl/phase1/execution_client" @@ -846,6 +847,10 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger } rawBeaconBlockChainDb, _ := persistence.AferoRawBeaconBlockChainFromOsPath(beaconCfg, dirs.CaplinHistory) + _, indiciesDB, err := caplin1.OpenCaplinDatabase(ctx, db_config.DefaultDatabaseConfiguration, beaconCfg, rawBeaconBlockChainDb, dirs.CaplinIndexing, engine, false) + if err != nil { + return nil, err + } client, err := service.StartSentinelService(&sentinel.SentinelConfig{ IpAddr: config.LightClientDiscoveryAddr, @@ -855,7 +860,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger NetworkConfig: networkCfg, BeaconConfig: beaconCfg, TmpDir: tmpdir, - }, rawBeaconBlockChainDb, &service.ServerConfig{Network: "tcp", Addr: fmt.Sprintf("%s:%d", config.SentinelAddr, config.SentinelPort)}, creds, &cltypes.Status{ + }, rawBeaconBlockChainDb, indiciesDB, &service.ServerConfig{Network: "tcp", Addr: fmt.Sprintf("%s:%d", config.SentinelAddr, config.SentinelPort)}, creds, &cltypes.Status{ ForkDigest: forkDigest, FinalizedRoot: state.FinalizedCheckpoint().BlockRoot(), FinalizedEpoch: state.FinalizedCheckpoint().Epoch(),