mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 03:30:37 +00:00
317 lines
8.9 KiB
Go
317 lines
8.9 KiB
Go
package sentinel
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"io"
|
|
"testing"
|
|
|
|
"github.com/golang/snappy"
|
|
"github.com/ledgerwatch/erigon-lib/common/datadir"
|
|
"github.com/ledgerwatch/erigon-lib/kv"
|
|
"github.com/ledgerwatch/erigon-lib/kv/memdb"
|
|
"github.com/ledgerwatch/erigon/cl/antiquary"
|
|
"github.com/ledgerwatch/erigon/cl/antiquary/tests"
|
|
"github.com/ledgerwatch/erigon/cl/clparams"
|
|
"github.com/ledgerwatch/erigon/cl/cltypes"
|
|
"github.com/ledgerwatch/erigon/cl/cltypes/solid"
|
|
"github.com/ledgerwatch/erigon/cl/fork"
|
|
"github.com/ledgerwatch/erigon/cl/persistence"
|
|
state_accessors "github.com/ledgerwatch/erigon/cl/persistence/state"
|
|
"github.com/ledgerwatch/erigon/cl/phase1/core/state"
|
|
"github.com/ledgerwatch/erigon/cl/sentinel/communication"
|
|
"github.com/ledgerwatch/erigon/cl/sentinel/communication/ssz_snappy"
|
|
"github.com/ledgerwatch/erigon/cl/utils"
|
|
"github.com/ledgerwatch/log/v3"
|
|
"github.com/libp2p/go-libp2p"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/libp2p/go-libp2p/core/protocol"
|
|
"github.com/spf13/afero"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func loadChain(t *testing.T) (db kv.RwDB, blocks []*cltypes.SignedBeaconBlock, f afero.Fs, preState, postState *state.CachingBeaconState) {
|
|
blocks, preState, postState = tests.GetPhase0Random()
|
|
db = memdb.NewTestDB(t)
|
|
var reader *tests.MockBlockReader
|
|
reader, f = tests.LoadChain(blocks, postState, db, t)
|
|
|
|
ctx := context.Background()
|
|
vt := state_accessors.NewStaticValidatorTable()
|
|
a := antiquary.NewAntiquary(ctx, preState, vt, &clparams.MainnetBeaconConfig, datadir.New("/tmp"), nil, db, nil, reader, nil, log.New(), true, true, f)
|
|
require.NoError(t, a.IncrementBeaconState(ctx, blocks[len(blocks)-1].Block.Slot+33))
|
|
return
|
|
}
|
|
|
|
func TestSentinelBlocksByRange(t *testing.T) {
|
|
listenAddrHost := "127.0.0.1"
|
|
|
|
ctx := context.Background()
|
|
db, blocks, f, _, _ := loadChain(t)
|
|
raw := persistence.NewAferoRawBlockSaver(f, &clparams.MainnetBeaconConfig)
|
|
genesisConfig, networkConfig, beaconConfig := clparams.GetConfigsByNetwork(clparams.MainnetNetwork)
|
|
sentinel, err := New(ctx, &SentinelConfig{
|
|
NetworkConfig: networkConfig,
|
|
BeaconConfig: beaconConfig,
|
|
GenesisConfig: genesisConfig,
|
|
IpAddr: listenAddrHost,
|
|
Port: 7070,
|
|
EnableBlocks: true,
|
|
}, raw, db, log.New())
|
|
require.NoError(t, err)
|
|
defer sentinel.Stop()
|
|
|
|
require.NoError(t, sentinel.Start())
|
|
h := sentinel.host
|
|
|
|
listenAddrHost1 := "/ip4/127.0.0.1/tcp/3202"
|
|
host1, err := libp2p.New(libp2p.ListenAddrStrings(listenAddrHost1))
|
|
require.NoError(t, err)
|
|
|
|
err = h.Connect(ctx, peer.AddrInfo{
|
|
ID: host1.ID(),
|
|
Addrs: host1.Addrs(),
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
stream, err := host1.NewStream(ctx, h.ID(), protocol.ID(communication.BeaconBlocksByRangeProtocolV2))
|
|
require.NoError(t, err)
|
|
|
|
req := &cltypes.BeaconBlocksByRangeRequest{
|
|
StartSlot: blocks[0].Block.Slot,
|
|
Count: 6,
|
|
}
|
|
|
|
if err := ssz_snappy.EncodeAndWrite(stream, req); err != nil {
|
|
return
|
|
}
|
|
|
|
code := make([]byte, 1)
|
|
_, err = stream.Read(code)
|
|
require.NoError(t, err)
|
|
require.Equal(t, code[0], uint8(0))
|
|
|
|
var w bytes.Buffer
|
|
_, err = io.Copy(&w, stream)
|
|
require.NoError(t, err)
|
|
|
|
responsePacket := make([]*cltypes.SignedBeaconBlock, 0)
|
|
|
|
r := bytes.NewReader(w.Bytes())
|
|
for i := 0; i < len(blocks); i++ {
|
|
forkDigest := make([]byte, 4)
|
|
if _, err := r.Read(forkDigest); err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// Read varint for length of message.
|
|
encodedLn, _, err := ssz_snappy.ReadUvarint(r)
|
|
require.NoError(t, err)
|
|
|
|
// Read bytes using snappy into a new raw buffer of side encodedLn.
|
|
raw := make([]byte, encodedLn)
|
|
sr := snappy.NewReader(r)
|
|
bytesRead := 0
|
|
for bytesRead < int(encodedLn) {
|
|
n, err := sr.Read(raw[bytesRead:])
|
|
require.NoError(t, err)
|
|
bytesRead += n
|
|
}
|
|
// Fork digests
|
|
respForkDigest := binary.BigEndian.Uint32(forkDigest)
|
|
require.NoError(t, err)
|
|
|
|
version, err := fork.ForkDigestVersion(utils.Uint32ToBytes4(respForkDigest), beaconConfig, genesisConfig.GenesisValidatorRoot)
|
|
require.NoError(t, err)
|
|
|
|
responseChunk := cltypes.NewSignedBeaconBlock(beaconConfig)
|
|
|
|
require.NoError(t, responseChunk.DecodeSSZ(raw, int(version)))
|
|
|
|
responsePacket = append(responsePacket, responseChunk)
|
|
// TODO(issues/5884): figure out why there is this extra byte.
|
|
r.ReadByte()
|
|
}
|
|
require.Equal(t, len(responsePacket), len(blocks))
|
|
for i := 0; i < len(blocks); i++ {
|
|
root1, err := responsePacket[i].HashSSZ()
|
|
require.NoError(t, err)
|
|
|
|
root2, err := blocks[i].HashSSZ()
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, root1, root2)
|
|
}
|
|
|
|
}
|
|
|
|
func TestSentinelBlocksByRoots(t *testing.T) {
|
|
listenAddrHost := "127.0.0.1"
|
|
|
|
ctx := context.Background()
|
|
db, blocks, f, _, _ := loadChain(t)
|
|
raw := persistence.NewAferoRawBlockSaver(f, &clparams.MainnetBeaconConfig)
|
|
genesisConfig, networkConfig, beaconConfig := clparams.GetConfigsByNetwork(clparams.MainnetNetwork)
|
|
sentinel, err := New(ctx, &SentinelConfig{
|
|
NetworkConfig: networkConfig,
|
|
BeaconConfig: beaconConfig,
|
|
GenesisConfig: genesisConfig,
|
|
IpAddr: listenAddrHost,
|
|
Port: 7070,
|
|
EnableBlocks: true,
|
|
}, raw, db, log.New())
|
|
require.NoError(t, err)
|
|
defer sentinel.Stop()
|
|
|
|
require.NoError(t, sentinel.Start())
|
|
h := sentinel.host
|
|
|
|
listenAddrHost1 := "/ip4/127.0.0.1/tcp/5021"
|
|
host1, err := libp2p.New(libp2p.ListenAddrStrings(listenAddrHost1))
|
|
require.NoError(t, err)
|
|
|
|
err = h.Connect(ctx, peer.AddrInfo{
|
|
ID: host1.ID(),
|
|
Addrs: host1.Addrs(),
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
stream, err := host1.NewStream(ctx, h.ID(), protocol.ID(communication.BeaconBlocksByRootProtocolV2))
|
|
require.NoError(t, err)
|
|
|
|
req := solid.NewHashList(1232)
|
|
rt, err := blocks[0].Block.HashSSZ()
|
|
require.NoError(t, err)
|
|
|
|
req.Append(rt)
|
|
rt, err = blocks[1].Block.HashSSZ()
|
|
require.NoError(t, err)
|
|
req.Append(rt)
|
|
|
|
if err := ssz_snappy.EncodeAndWrite(stream, req); err != nil {
|
|
return
|
|
}
|
|
|
|
code := make([]byte, 1)
|
|
_, err = stream.Read(code)
|
|
require.NoError(t, err)
|
|
require.Equal(t, code[0], uint8(0))
|
|
|
|
var w bytes.Buffer
|
|
_, err = io.Copy(&w, stream)
|
|
require.NoError(t, err)
|
|
|
|
responsePacket := make([]*cltypes.SignedBeaconBlock, 0)
|
|
|
|
r := bytes.NewReader(w.Bytes())
|
|
for i := 0; i < len(blocks); i++ {
|
|
forkDigest := make([]byte, 4)
|
|
if _, err := r.Read(forkDigest); err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// Read varint for length of message.
|
|
encodedLn, _, err := ssz_snappy.ReadUvarint(r)
|
|
require.NoError(t, err)
|
|
|
|
// Read bytes using snappy into a new raw buffer of side encodedLn.
|
|
raw := make([]byte, encodedLn)
|
|
sr := snappy.NewReader(r)
|
|
bytesRead := 0
|
|
for bytesRead < int(encodedLn) {
|
|
n, err := sr.Read(raw[bytesRead:])
|
|
require.NoError(t, err)
|
|
bytesRead += n
|
|
}
|
|
// Fork digests
|
|
respForkDigest := binary.BigEndian.Uint32(forkDigest)
|
|
require.NoError(t, err)
|
|
|
|
version, err := fork.ForkDigestVersion(utils.Uint32ToBytes4(respForkDigest), beaconConfig, genesisConfig.GenesisValidatorRoot)
|
|
require.NoError(t, err)
|
|
|
|
responseChunk := cltypes.NewSignedBeaconBlock(beaconConfig)
|
|
|
|
require.NoError(t, responseChunk.DecodeSSZ(raw, int(version)))
|
|
|
|
responsePacket = append(responsePacket, responseChunk)
|
|
// TODO(issues/5884): figure out why there is this extra byte.
|
|
r.ReadByte()
|
|
}
|
|
|
|
require.Equal(t, len(responsePacket), len(blocks))
|
|
for i := 0; i < len(responsePacket); i++ {
|
|
root1, err := responsePacket[i].HashSSZ()
|
|
require.NoError(t, err)
|
|
|
|
root2, err := blocks[i].HashSSZ()
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, root1, root2)
|
|
}
|
|
}
|
|
|
|
func TestSentinelStatusRequest(t *testing.T) {
|
|
t.Skip("TODO: fix me")
|
|
listenAddrHost := "127.0.0.1"
|
|
|
|
ctx := context.Background()
|
|
db, blocks, f, _, _ := loadChain(t)
|
|
raw := persistence.NewAferoRawBlockSaver(f, &clparams.MainnetBeaconConfig)
|
|
genesisConfig, networkConfig, beaconConfig := clparams.GetConfigsByNetwork(clparams.MainnetNetwork)
|
|
sentinel, err := New(ctx, &SentinelConfig{
|
|
NetworkConfig: networkConfig,
|
|
BeaconConfig: beaconConfig,
|
|
GenesisConfig: genesisConfig,
|
|
IpAddr: listenAddrHost,
|
|
Port: 7070,
|
|
EnableBlocks: true,
|
|
}, raw, db, log.New())
|
|
require.NoError(t, err)
|
|
defer sentinel.Stop()
|
|
|
|
require.NoError(t, sentinel.Start())
|
|
h := sentinel.host
|
|
|
|
listenAddrHost1 := "/ip4/127.0.0.1/tcp/5001"
|
|
host1, err := libp2p.New(libp2p.ListenAddrStrings(listenAddrHost1))
|
|
require.NoError(t, err)
|
|
|
|
err = h.Connect(ctx, peer.AddrInfo{
|
|
ID: host1.ID(),
|
|
Addrs: host1.Addrs(),
|
|
})
|
|
require.NoError(t, err)
|
|
req := &cltypes.Status{
|
|
HeadRoot: blocks[0].Block.ParentRoot,
|
|
HeadSlot: 1234,
|
|
}
|
|
sentinel.SetStatus(req)
|
|
stream, err := host1.NewStream(ctx, h.ID(), protocol.ID(communication.StatusProtocolV1))
|
|
require.NoError(t, err)
|
|
|
|
if err := ssz_snappy.EncodeAndWrite(stream, req); err != nil {
|
|
return
|
|
}
|
|
|
|
code := make([]byte, 1)
|
|
_, err = stream.Read(code)
|
|
require.NoError(t, err)
|
|
require.Equal(t, code[0], uint8(0))
|
|
|
|
resp := &cltypes.Status{}
|
|
if err := ssz_snappy.DecodeAndReadNoForkDigest(stream, resp, 0); err != nil {
|
|
return
|
|
}
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, resp, req)
|
|
}
|