mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-21 19:20:39 +00:00
Add blocksByRange
& blocksByRoot
P2P rpc handlers (#8885)
This PR is ready to review. PR introduces `blocksByRange` and `blocksByRoot` P2P RPC methods - `blocksByRange` - allows peers to request a range of blocks - `blocksByRoot` - enables block requests using their root hashes(list format) Reference: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyrange
This commit is contained in:
parent
2b87d65285
commit
a2f375c0b1
@ -14,24 +14,174 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
|
||||
libcommon "github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon/cl/clparams"
|
||||
"github.com/ledgerwatch/erigon/cl/cltypes"
|
||||
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
|
||||
"github.com/ledgerwatch/erigon/cl/fork"
|
||||
"github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies"
|
||||
"github.com/ledgerwatch/erigon/cl/sentinel/communication/ssz_snappy"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"github.com/ledgerwatch/erigon/cl/utils"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
)
|
||||
|
||||
// func (c *ConsensusHandlers) blocksByRangeHandlerPROTODONOTTOUCH69(stream network.Stream) error {
|
||||
// log.Trace("Got block by range handler call")
|
||||
// return ssz_snappy.EncodeAndWrite(stream, &emptyString{}, ResourceUnavaiablePrefix)
|
||||
// }
|
||||
const MAX_REQUEST_BLOCKS = 96
|
||||
|
||||
func (c *ConsensusHandlers) blocksByRangeHandler(stream network.Stream) error {
|
||||
log.Trace("Got block by range handler call")
|
||||
return ssz_snappy.EncodeAndWrite(stream, &emptyString{}, ResourceUnavaiablePrefix)
|
||||
func (c *ConsensusHandlers) beaconBlocksByRangeHandler(s network.Stream) error {
|
||||
peerId := s.Conn().RemotePeer().String()
|
||||
if err := c.checkRateLimit(peerId, "beaconBlocksByRange", rateLimits.beaconBlocksByRangeLimit); err != nil {
|
||||
ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix)
|
||||
return err
|
||||
}
|
||||
|
||||
req := &cltypes.BeaconBlocksByRangeRequest{}
|
||||
if err := ssz_snappy.DecodeAndReadNoForkDigest(s, req, clparams.Phase0Version); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if req.Step != 1 {
|
||||
return errors.New("step must be 1")
|
||||
}
|
||||
|
||||
tx, err := c.indiciesDB.BeginRo(c.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Limit the number of blocks to the count specified in the request.
|
||||
if int(req.Count) > MAX_REQUEST_BLOCKS {
|
||||
req.Count = MAX_REQUEST_BLOCKS
|
||||
}
|
||||
|
||||
beaconBlockRooots, slots, err := beacon_indicies.ReadBeaconBlockRootsInSlotRange(c.ctx, tx, req.StartSlot, req.Count-1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(beaconBlockRooots) == 0 || len(slots) == 0 {
|
||||
return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavaiablePrefix)
|
||||
}
|
||||
// Read the fork digest
|
||||
forkDigest, err := fork.ComputeForkDigestForVersion(
|
||||
utils.Uint32ToBytes4(c.beaconConfig.GenesisForkVersion),
|
||||
c.genesisConfig.GenesisValidatorRoot,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resourceAvaiable := false
|
||||
for i, slot := range slots {
|
||||
r, err := c.beaconDB.BlockReader(c.ctx, slot, beaconBlockRooots[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
if !resourceAvaiable {
|
||||
if _, err := s.Write([]byte{1}); err != nil {
|
||||
return err
|
||||
}
|
||||
resourceAvaiable = true
|
||||
}
|
||||
|
||||
if _, err := s.Write(forkDigest[:]); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = io.Copy(s, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if !resourceAvaiable {
|
||||
return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavaiablePrefix)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ConsensusHandlers) beaconBlocksByRootHandler(stream network.Stream) error {
|
||||
log.Trace("Got beacon block by root handler call")
|
||||
return ssz_snappy.EncodeAndWrite(stream, &emptyString{}, ResourceUnavaiablePrefix)
|
||||
func (c *ConsensusHandlers) beaconBlocksByRootHandler(s network.Stream) error {
|
||||
peerId := s.Conn().RemotePeer().String()
|
||||
if err := c.checkRateLimit(peerId, "beaconBlocksByRoot", rateLimits.beaconBlocksByRootLimit); err != nil {
|
||||
ssz_snappy.EncodeAndWrite(s, &emptyString{}, RateLimitedPrefix)
|
||||
return err
|
||||
}
|
||||
|
||||
var req solid.HashListSSZ = solid.NewHashList(100)
|
||||
if err := ssz_snappy.DecodeAndReadNoForkDigest(s, req, clparams.Phase0Version); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
blockRoots := []libcommon.Hash{}
|
||||
for i := 0; i < req.Length(); i++ {
|
||||
blockRoot := req.Get(i)
|
||||
blockRoots = append(blockRoots, blockRoot)
|
||||
// Limit the number of blocks to the count specified in the request.
|
||||
if len(blockRoots) >= MAX_REQUEST_BLOCKS {
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(blockRoots) == 0 {
|
||||
return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavaiablePrefix)
|
||||
}
|
||||
tx, err := c.indiciesDB.BeginRo(c.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Read the fork digest
|
||||
forkDigest, err := fork.ComputeForkDigestForVersion(
|
||||
utils.Uint32ToBytes4(c.beaconConfig.GenesisForkVersion),
|
||||
c.genesisConfig.GenesisValidatorRoot,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resourceAvaiable := false
|
||||
for i, blockRoot := range blockRoots {
|
||||
slot, err := beacon_indicies.ReadBlockSlotByBlockRoot(tx, blockRoot)
|
||||
if slot == nil {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r, err := c.beaconDB.BlockReader(c.ctx, *slot, blockRoots[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
if !resourceAvaiable {
|
||||
if _, err := s.Write([]byte{1}); err != nil {
|
||||
return err
|
||||
}
|
||||
resourceAvaiable = true
|
||||
}
|
||||
|
||||
if _, err := s.Write(forkDigest[:]); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Read block from DB
|
||||
block := cltypes.NewSignedBeaconBlock(c.beaconConfig)
|
||||
|
||||
if err := ssz_snappy.DecodeAndReadNoForkDigest(r, block, clparams.Phase0Version); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ssz_snappy.EncodeAndWrite(s, block); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if !resourceAvaiable {
|
||||
return ssz_snappy.EncodeAndWrite(s, &emptyString{}, ResourceUnavaiablePrefix)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type emptyString struct{}
|
||||
|
145
cl/sentinel/handlers/blocks_by_range_test.go
Normal file
145
cl/sentinel/handlers/blocks_by_range_test.go
Normal file
@ -0,0 +1,145 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
libcommon "github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon/cl/clparams"
|
||||
"github.com/ledgerwatch/erigon/cl/cltypes"
|
||||
"github.com/ledgerwatch/erigon/cl/fork"
|
||||
"github.com/ledgerwatch/erigon/cl/persistence"
|
||||
"github.com/ledgerwatch/erigon/cl/sentinel/communication"
|
||||
"github.com/ledgerwatch/erigon/cl/sentinel/communication/ssz_snappy"
|
||||
"github.com/ledgerwatch/erigon/cl/sentinel/peers"
|
||||
"github.com/ledgerwatch/erigon/cl/utils"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBlocksByRootHandler(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
listenAddrHost := "/ip4/127.0.0.1/tcp/5000"
|
||||
host, err := libp2p.New(libp2p.ListenAddrStrings(listenAddrHost))
|
||||
require.NoError(t, err)
|
||||
|
||||
listenAddrHost1 := "/ip4/127.0.0.1/tcp/5001"
|
||||
host1, err := libp2p.New(libp2p.ListenAddrStrings(listenAddrHost1))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = host.Connect(ctx, peer.AddrInfo{
|
||||
ID: host1.ID(),
|
||||
Addrs: host1.Addrs(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
peersPool := peers.NewPool()
|
||||
beaconDB, indiciesDB := setupStore(t)
|
||||
store := persistence.NewBeaconChainDatabaseFilesystem(beaconDB, nil, &clparams.MainnetBeaconConfig)
|
||||
|
||||
tx, _ := indiciesDB.BeginRw(ctx)
|
||||
|
||||
startSlot := uint64(100)
|
||||
count := uint64(10)
|
||||
step := uint64(1)
|
||||
|
||||
expBlocks := populateDatabaseWithBlocks(t, store, tx, startSlot, count)
|
||||
tx.Commit()
|
||||
|
||||
genesisCfg, _, beaconCfg := clparams.GetConfigsByNetwork(1)
|
||||
c := NewConsensusHandlers(
|
||||
ctx,
|
||||
beaconDB,
|
||||
indiciesDB,
|
||||
host,
|
||||
peersPool,
|
||||
beaconCfg,
|
||||
genesisCfg,
|
||||
&cltypes.Metadata{},
|
||||
)
|
||||
c.Start()
|
||||
req := &cltypes.BeaconBlocksByRangeRequest{
|
||||
StartSlot: startSlot,
|
||||
Count: count,
|
||||
Step: step,
|
||||
}
|
||||
var reqBuf bytes.Buffer
|
||||
if err := ssz_snappy.EncodeAndWrite(&reqBuf, req); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
reqData := libcommon.CopyBytes(reqBuf.Bytes())
|
||||
stream, err := host1.NewStream(ctx, host.ID(), protocol.ID(communication.BeaconBlocksByRangeProtocolV1))
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = stream.Write(reqData)
|
||||
require.NoError(t, err)
|
||||
|
||||
firstByte := make([]byte, 1)
|
||||
_, err = stream.Read(firstByte)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, firstByte[0], byte(1))
|
||||
|
||||
for i := 0; i < int(count); i++ {
|
||||
forkDigest := make([]byte, 4)
|
||||
|
||||
_, err := stream.Read(forkDigest)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
t.Fatal("Stream is empty")
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
encodedLn, _, err := ssz_snappy.ReadUvarint(stream)
|
||||
require.NoError(t, err)
|
||||
|
||||
raw := make([]byte, encodedLn)
|
||||
sr := snappy.NewReader(stream)
|
||||
bytesRead := 0
|
||||
for bytesRead < int(encodedLn) {
|
||||
n, err := sr.Read(raw[bytesRead:])
|
||||
require.NoError(t, err)
|
||||
bytesRead += n
|
||||
}
|
||||
|
||||
// Fork digests
|
||||
respForkDigest := binary.BigEndian.Uint32(forkDigest)
|
||||
if respForkDigest == 0 {
|
||||
require.NoError(t, fmt.Errorf("null fork digest"))
|
||||
}
|
||||
|
||||
version, err := fork.ForkDigestVersion(utils.Uint32ToBytes4(respForkDigest), beaconCfg, genesisCfg.GenesisValidatorRoot)
|
||||
if err != nil {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
block := cltypes.NewSignedBeaconBlock(&clparams.MainnetBeaconConfig)
|
||||
if err = block.DecodeSSZ(raw, int(version)); err != nil {
|
||||
require.NoError(t, err)
|
||||
return
|
||||
}
|
||||
require.Equal(t, expBlocks[i].Block.Slot, block.Block.Slot)
|
||||
require.Equal(t, expBlocks[i].Block.StateRoot, block.Block.StateRoot)
|
||||
require.Equal(t, expBlocks[i].Block.ParentRoot, block.Block.ParentRoot)
|
||||
require.Equal(t, expBlocks[i].Block.ProposerIndex, block.Block.ProposerIndex)
|
||||
require.Equal(t, expBlocks[i].Block.Body.ExecutionPayload.BlockNumber, block.Block.Body.ExecutionPayload.BlockNumber)
|
||||
}
|
||||
|
||||
_, err = stream.Read(make([]byte, 1))
|
||||
if err != io.EOF {
|
||||
t.Fatal("Stream is not empty")
|
||||
}
|
||||
|
||||
defer indiciesDB.Close()
|
||||
defer tx.Rollback()
|
||||
}
|
144
cl/sentinel/handlers/blocks_by_root_test.go
Normal file
144
cl/sentinel/handlers/blocks_by_root_test.go
Normal file
@ -0,0 +1,144 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
libcommon "github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon/cl/clparams"
|
||||
"github.com/ledgerwatch/erigon/cl/cltypes"
|
||||
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
|
||||
"github.com/ledgerwatch/erigon/cl/fork"
|
||||
"github.com/ledgerwatch/erigon/cl/persistence"
|
||||
"github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies"
|
||||
"github.com/ledgerwatch/erigon/cl/sentinel/communication"
|
||||
"github.com/ledgerwatch/erigon/cl/sentinel/communication/ssz_snappy"
|
||||
"github.com/ledgerwatch/erigon/cl/sentinel/peers"
|
||||
"github.com/ledgerwatch/erigon/cl/utils"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBlocksByRangeHandler(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
listenAddrHost := "/ip4/127.0.0.1/tcp/6000"
|
||||
host, err := libp2p.New(libp2p.ListenAddrStrings(listenAddrHost))
|
||||
require.NoError(t, err)
|
||||
|
||||
listenAddrHost1 := "/ip4/127.0.0.1/tcp/6001"
|
||||
host1, err := libp2p.New(libp2p.ListenAddrStrings(listenAddrHost1))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = host.Connect(ctx, peer.AddrInfo{
|
||||
ID: host1.ID(),
|
||||
Addrs: host1.Addrs(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
peersPool := peers.NewPool()
|
||||
beaconDB, indiciesDB := setupStore(t)
|
||||
store := persistence.NewBeaconChainDatabaseFilesystem(beaconDB, nil, &clparams.MainnetBeaconConfig)
|
||||
|
||||
tx, _ := indiciesDB.BeginRw(ctx)
|
||||
|
||||
startSlot := uint64(100)
|
||||
count := uint64(10)
|
||||
|
||||
expBlocks := populateDatabaseWithBlocks(t, store, tx, startSlot, count)
|
||||
var blockRoots []libcommon.Hash
|
||||
blockRoots, _, _ = beacon_indicies.ReadBeaconBlockRootsInSlotRange(ctx, tx, startSlot, startSlot+count)
|
||||
tx.Commit()
|
||||
|
||||
genesisCfg, _, beaconCfg := clparams.GetConfigsByNetwork(1)
|
||||
c := NewConsensusHandlers(
|
||||
ctx,
|
||||
beaconDB,
|
||||
indiciesDB,
|
||||
host,
|
||||
peersPool,
|
||||
beaconCfg,
|
||||
genesisCfg,
|
||||
&cltypes.Metadata{},
|
||||
)
|
||||
c.Start()
|
||||
var req solid.HashListSSZ = solid.NewHashList(len(expBlocks))
|
||||
|
||||
for _, block := range blockRoots {
|
||||
req.Append(block)
|
||||
}
|
||||
var reqBuf bytes.Buffer
|
||||
if err := ssz_snappy.EncodeAndWrite(&reqBuf, req); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
reqData := libcommon.CopyBytes(reqBuf.Bytes())
|
||||
stream, err := host1.NewStream(ctx, host.ID(), protocol.ID(communication.BeaconBlocksByRootProtocolV1))
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = stream.Write(reqData)
|
||||
require.NoError(t, err)
|
||||
|
||||
firstByte := make([]byte, 1)
|
||||
_, err = stream.Read(firstByte)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, firstByte[0], byte(1))
|
||||
|
||||
for i := 0; i < len(blockRoots); i++ {
|
||||
forkDigest := make([]byte, 4)
|
||||
_, err := stream.Read(forkDigest)
|
||||
if err != nil && err != io.EOF {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
encodedLn, _, err := ssz_snappy.ReadUvarint(stream)
|
||||
require.NoError(t, err)
|
||||
|
||||
raw := make([]byte, encodedLn)
|
||||
sr := snappy.NewReader(stream)
|
||||
bytesRead := 0
|
||||
for bytesRead < int(encodedLn) {
|
||||
n, err := sr.Read(raw[bytesRead:])
|
||||
if err != nil {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
bytesRead += n
|
||||
}
|
||||
|
||||
// Fork digests
|
||||
respForkDigest := binary.BigEndian.Uint32(forkDigest)
|
||||
if respForkDigest == 0 {
|
||||
require.NoError(t, fmt.Errorf("null fork digest"))
|
||||
}
|
||||
version, err := fork.ForkDigestVersion(utils.Uint32ToBytes4(respForkDigest), beaconCfg, genesisCfg.GenesisValidatorRoot)
|
||||
if err != nil {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
block := cltypes.NewSignedBeaconBlock(&clparams.MainnetBeaconConfig)
|
||||
if err = block.DecodeSSZ(raw, int(version)); err != nil {
|
||||
require.NoError(t, err)
|
||||
return
|
||||
}
|
||||
require.Equal(t, expBlocks[i].Block.Slot, block.Block.Slot)
|
||||
require.Equal(t, expBlocks[i].Block.StateRoot, block.Block.StateRoot)
|
||||
require.Equal(t, expBlocks[i].Block.ParentRoot, block.Block.ParentRoot)
|
||||
require.Equal(t, expBlocks[i].Block.ProposerIndex, block.Block.ProposerIndex)
|
||||
require.Equal(t, expBlocks[i].Block.Body.ExecutionPayload.BlockNumber, block.Block.Body.ExecutionPayload.BlockNumber)
|
||||
}
|
||||
|
||||
_, err = stream.Read(make([]byte, 1))
|
||||
if err != io.EOF {
|
||||
t.Fatal("Stream is not empty")
|
||||
}
|
||||
|
||||
defer indiciesDB.Close()
|
||||
defer tx.Rollback()
|
||||
}
|
@ -20,6 +20,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon/cl/sentinel/communication"
|
||||
"github.com/ledgerwatch/erigon/cl/sentinel/peers"
|
||||
"github.com/ledgerwatch/erigon/cl/utils"
|
||||
@ -35,22 +36,27 @@ import (
|
||||
)
|
||||
|
||||
type RateLimits struct {
|
||||
pingLimit int
|
||||
goodbyeLimit int
|
||||
metadataV1Limit int
|
||||
metadataV2Limit int
|
||||
statusLimit int
|
||||
pingLimit int
|
||||
goodbyeLimit int
|
||||
metadataV1Limit int
|
||||
metadataV2Limit int
|
||||
statusLimit int
|
||||
beaconBlocksByRangeLimit int
|
||||
beaconBlocksByRootLimit int
|
||||
}
|
||||
|
||||
const punishmentPeriod = time.Minute
|
||||
const defaultRateLimit = 5000
|
||||
const defaultBlockHandlerRateLimit = 200
|
||||
|
||||
var rateLimits = RateLimits{
|
||||
pingLimit: defaultRateLimit,
|
||||
goodbyeLimit: defaultRateLimit,
|
||||
metadataV1Limit: defaultRateLimit,
|
||||
metadataV2Limit: defaultRateLimit,
|
||||
statusLimit: defaultRateLimit,
|
||||
pingLimit: defaultRateLimit,
|
||||
goodbyeLimit: defaultRateLimit,
|
||||
metadataV1Limit: defaultRateLimit,
|
||||
metadataV2Limit: defaultRateLimit,
|
||||
statusLimit: defaultRateLimit,
|
||||
beaconBlocksByRangeLimit: defaultBlockHandlerRateLimit,
|
||||
beaconBlocksByRootLimit: defaultBlockHandlerRateLimit,
|
||||
}
|
||||
|
||||
type ConsensusHandlers struct {
|
||||
@ -61,6 +67,7 @@ type ConsensusHandlers struct {
|
||||
genesisConfig *clparams.GenesisConfig
|
||||
ctx context.Context
|
||||
beaconDB persistence.RawBeaconBlockChain
|
||||
indiciesDB kv.RoDB
|
||||
peerRateLimits sync.Map
|
||||
punishmentEndTimes sync.Map
|
||||
}
|
||||
@ -71,12 +78,13 @@ const (
|
||||
ResourceUnavaiablePrefix = 0x03
|
||||
)
|
||||
|
||||
func NewConsensusHandlers(ctx context.Context, db persistence.RawBeaconBlockChain, host host.Host,
|
||||
func NewConsensusHandlers(ctx context.Context, db persistence.RawBeaconBlockChain, indiciesDB kv.RoDB, host host.Host,
|
||||
peers *peers.Pool, beaconConfig *clparams.BeaconChainConfig, genesisConfig *clparams.GenesisConfig, metadata *cltypes.Metadata) *ConsensusHandlers {
|
||||
c := &ConsensusHandlers{
|
||||
host: host,
|
||||
metadata: metadata,
|
||||
beaconDB: db,
|
||||
indiciesDB: indiciesDB,
|
||||
genesisConfig: genesisConfig,
|
||||
beaconConfig: beaconConfig,
|
||||
ctx: ctx,
|
||||
@ -90,7 +98,7 @@ func NewConsensusHandlers(ctx context.Context, db persistence.RawBeaconBlockChai
|
||||
communication.StatusProtocolV1: c.statusHandler,
|
||||
communication.MetadataProtocolV1: c.metadataV1Handler,
|
||||
communication.MetadataProtocolV2: c.metadataV2Handler,
|
||||
communication.BeaconBlocksByRangeProtocolV1: c.blocksByRangeHandler,
|
||||
communication.BeaconBlocksByRangeProtocolV1: c.beaconBlocksByRangeHandler,
|
||||
communication.BeaconBlocksByRootProtocolV1: c.beaconBlocksByRootHandler,
|
||||
}
|
||||
|
||||
|
61
cl/sentinel/handlers/utils_test.go
Normal file
61
cl/sentinel/handlers/utils_test.go
Normal file
@ -0,0 +1,61 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/ledgerwatch/erigon-lib/common"
|
||||
libcommon "github.com/ledgerwatch/erigon-lib/common"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon-lib/kv/memdb"
|
||||
"github.com/ledgerwatch/erigon/cl/clparams"
|
||||
"github.com/ledgerwatch/erigon/cl/cltypes"
|
||||
"github.com/ledgerwatch/erigon/cl/persistence"
|
||||
"github.com/ledgerwatch/erigon/cl/persistence/beacon_indicies"
|
||||
"github.com/spf13/afero"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func setupStore(t *testing.T) (persistence.RawBeaconBlockChain, kv.RwDB) {
|
||||
db := memdb.NewTestDB(t)
|
||||
af := afero.NewMemMapFs()
|
||||
rawDB := persistence.NewAferoRawBlockSaver(af, &clparams.MainnetBeaconConfig)
|
||||
return rawDB, db
|
||||
}
|
||||
|
||||
func populateDatabaseWithBlocks(t *testing.T, store persistence.BeaconChainDatabase, tx kv.RwTx, startSlot, count uint64) []*cltypes.SignedBeaconBlock {
|
||||
|
||||
mockParentRoot := common.Hash{1}
|
||||
blocks := make([]*cltypes.SignedBeaconBlock, 0, count)
|
||||
for i := uint64(0); i <= count; i++ {
|
||||
slot := startSlot + i
|
||||
block := cltypes.NewSignedBeaconBlock(&clparams.MainnetBeaconConfig)
|
||||
block.Block.Slot = slot
|
||||
block.Block.StateRoot = libcommon.Hash{byte(i)}
|
||||
block.Block.ParentRoot = mockParentRoot
|
||||
block.EncodingSizeSSZ()
|
||||
bodyRoot, _ := block.Block.Body.HashSSZ()
|
||||
canonical := true
|
||||
|
||||
// Populate BeaconChainDatabase
|
||||
store.WriteBlock(context.Background(), tx, block, canonical)
|
||||
|
||||
// Populate indiciesDB
|
||||
require.NoError(t, beacon_indicies.WriteBeaconBlockHeaderAndIndicies(
|
||||
context.Background(),
|
||||
tx,
|
||||
&cltypes.SignedBeaconBlockHeader{
|
||||
Signature: block.Signature,
|
||||
Header: &cltypes.BeaconBlockHeader{
|
||||
Slot: block.Block.Slot,
|
||||
ParentRoot: block.Block.ParentRoot,
|
||||
ProposerIndex: block.Block.ProposerIndex,
|
||||
Root: block.Block.StateRoot,
|
||||
BodyRoot: bodyRoot,
|
||||
},
|
||||
},
|
||||
canonical))
|
||||
blocks = append(blocks, block)
|
||||
}
|
||||
return blocks
|
||||
}
|
@ -22,6 +22,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon/cl/sentinel/handlers"
|
||||
"github.com/ledgerwatch/erigon/cl/sentinel/handshake"
|
||||
"github.com/ledgerwatch/erigon/cl/sentinel/httpreqresp"
|
||||
@ -74,7 +75,8 @@ type Sentinel struct {
|
||||
metadataV2 *cltypes.Metadata
|
||||
handshaker *handshake.HandShaker
|
||||
|
||||
db persistence.RawBeaconBlockChain
|
||||
db persistence.RawBeaconBlockChain
|
||||
indiciesDB kv.RoDB
|
||||
|
||||
discoverConfig discover.Config
|
||||
pubsub *pubsub.PubSub
|
||||
@ -166,7 +168,7 @@ func (s *Sentinel) createListener() (*discover.UDPv5, error) {
|
||||
}
|
||||
|
||||
// Start stream handlers
|
||||
handlers.NewConsensusHandlers(s.ctx, s.db, s.host, s.peers, s.cfg.BeaconConfig, s.cfg.GenesisConfig, s.metadataV2).Start()
|
||||
handlers.NewConsensusHandlers(s.ctx, s.db, s.indiciesDB, s.host, s.peers, s.cfg.BeaconConfig, s.cfg.GenesisConfig, s.metadataV2).Start()
|
||||
|
||||
net, err := discover.ListenV5(s.ctx, "any", conn, localNode, discCfg)
|
||||
if err != nil {
|
||||
@ -180,14 +182,16 @@ func New(
|
||||
ctx context.Context,
|
||||
cfg *SentinelConfig,
|
||||
db persistence.RawBeaconBlockChain,
|
||||
indiciesDB kv.RoDB,
|
||||
logger log.Logger,
|
||||
) (*Sentinel, error) {
|
||||
s := &Sentinel{
|
||||
ctx: ctx,
|
||||
cfg: cfg,
|
||||
db: db,
|
||||
metrics: true,
|
||||
logger: logger,
|
||||
ctx: ctx,
|
||||
cfg: cfg,
|
||||
db: db,
|
||||
indiciesDB: indiciesDB,
|
||||
metrics: true,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
// Setup discovery
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
|
||||
"github.com/ledgerwatch/erigon-lib/direct"
|
||||
sentinelrpc "github.com/ledgerwatch/erigon-lib/gointerfaces/sentinel"
|
||||
"github.com/ledgerwatch/erigon-lib/kv"
|
||||
"github.com/ledgerwatch/erigon/cl/cltypes"
|
||||
"github.com/ledgerwatch/erigon/cl/persistence"
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
@ -20,8 +21,8 @@ type ServerConfig struct {
|
||||
Addr string
|
||||
}
|
||||
|
||||
func createSentinel(cfg *sentinel.SentinelConfig, db persistence.RawBeaconBlockChain, logger log.Logger) (*sentinel.Sentinel, error) {
|
||||
sent, err := sentinel.New(context.Background(), cfg, db, logger)
|
||||
func createSentinel(cfg *sentinel.SentinelConfig, db persistence.RawBeaconBlockChain, indiciesDB kv.RwDB, logger log.Logger) (*sentinel.Sentinel, error) {
|
||||
sent, err := sentinel.New(context.Background(), cfg, db, indiciesDB, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -57,9 +58,9 @@ func createSentinel(cfg *sentinel.SentinelConfig, db persistence.RawBeaconBlockC
|
||||
return sent, nil
|
||||
}
|
||||
|
||||
func StartSentinelService(cfg *sentinel.SentinelConfig, db persistence.RawBeaconBlockChain, srvCfg *ServerConfig, creds credentials.TransportCredentials, initialStatus *cltypes.Status, logger log.Logger) (sentinelrpc.SentinelClient, error) {
|
||||
func StartSentinelService(cfg *sentinel.SentinelConfig, db persistence.RawBeaconBlockChain, indiciesDB kv.RwDB, srvCfg *ServerConfig, creds credentials.TransportCredentials, initialStatus *cltypes.Status, logger log.Logger) (sentinelrpc.SentinelClient, error) {
|
||||
ctx := context.Background()
|
||||
sent, err := createSentinel(cfg, db, logger)
|
||||
sent, err := createSentinel(cfg, db, indiciesDB, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -87,7 +87,7 @@ func runCaplinNode(cliCtx *cli.Context) error {
|
||||
NetworkConfig: cfg.NetworkCfg,
|
||||
BeaconConfig: cfg.BeaconCfg,
|
||||
NoDiscovery: cfg.NoDiscovery,
|
||||
}, nil, &service.ServerConfig{Network: cfg.ServerProtocol, Addr: cfg.ServerAddr}, nil, &cltypes.Status{
|
||||
}, nil, nil, &service.ServerConfig{Network: cfg.ServerProtocol, Addr: cfg.ServerAddr}, nil, &cltypes.Status{
|
||||
ForkDigest: forkDigest,
|
||||
FinalizedRoot: state.FinalizedCheckpoint().BlockRoot(),
|
||||
FinalizedEpoch: state.FinalizedCheckpoint().Epoch(),
|
||||
|
@ -54,7 +54,7 @@ func runSentinelNode(cliCtx *cli.Context) error {
|
||||
BeaconConfig: cfg.BeaconCfg,
|
||||
NoDiscovery: cfg.NoDiscovery,
|
||||
LocalDiscovery: cfg.LocalDiscovery,
|
||||
}, nil, &service.ServerConfig{Network: cfg.ServerProtocol, Addr: cfg.ServerAddr}, nil, nil, log.Root())
|
||||
}, nil, nil, &service.ServerConfig{Network: cfg.ServerProtocol, Addr: cfg.ServerAddr}, nil, nil, log.Root())
|
||||
if err != nil {
|
||||
log.Error("[Sentinel] Could not start sentinel", "err", err)
|
||||
return err
|
||||
|
@ -40,6 +40,7 @@ import (
|
||||
"github.com/ledgerwatch/erigon/cl/cltypes"
|
||||
"github.com/ledgerwatch/erigon/cl/fork"
|
||||
"github.com/ledgerwatch/erigon/cl/persistence"
|
||||
"github.com/ledgerwatch/erigon/cl/persistence/db_config"
|
||||
"github.com/ledgerwatch/erigon/cl/persistence/format/snapshot_format/getters"
|
||||
clcore "github.com/ledgerwatch/erigon/cl/phase1/core"
|
||||
"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
|
||||
@ -846,6 +847,10 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
|
||||
}
|
||||
|
||||
rawBeaconBlockChainDb, _ := persistence.AferoRawBeaconBlockChainFromOsPath(beaconCfg, dirs.CaplinHistory)
|
||||
_, indiciesDB, err := caplin1.OpenCaplinDatabase(ctx, db_config.DefaultDatabaseConfiguration, beaconCfg, rawBeaconBlockChainDb, dirs.CaplinIndexing, engine, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client, err := service.StartSentinelService(&sentinel.SentinelConfig{
|
||||
IpAddr: config.LightClientDiscoveryAddr,
|
||||
@ -855,7 +860,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
|
||||
NetworkConfig: networkCfg,
|
||||
BeaconConfig: beaconCfg,
|
||||
TmpDir: tmpdir,
|
||||
}, rawBeaconBlockChainDb, &service.ServerConfig{Network: "tcp", Addr: fmt.Sprintf("%s:%d", config.SentinelAddr, config.SentinelPort)}, creds, &cltypes.Status{
|
||||
}, rawBeaconBlockChainDb, indiciesDB, &service.ServerConfig{Network: "tcp", Addr: fmt.Sprintf("%s:%d", config.SentinelAddr, config.SentinelPort)}, creds, &cltypes.Status{
|
||||
ForkDigest: forkDigest,
|
||||
FinalizedRoot: state.FinalizedCheckpoint().BlockRoot(),
|
||||
FinalizedEpoch: state.FinalizedCheckpoint().Epoch(),
|
||||
|
Loading…
Reference in New Issue
Block a user