mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-22 03:30:37 +00:00
req/resp heartbeat handler modifications (#5859)
addressing some aspects of the req/resp portion of the CL spec: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#reqresp-interaction. this PR does a few things: 1. moves `blocksByRangeHandler` and `beaconBlocksByRootHandler` handlers to a different file in the `handlers` package. these are going to be the more complicated handlers so they will be better in their own files. 2. makes `pingHandler` a method on the `*ConsensusHandlers` receiver and starts returning the sequence number instead of the request. (see https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#ping `Peers request and respond with their local metadata sequence number`). 3. adds a `goodbyeHandler` to respond with a status message of 1. (see https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#goodbye). 4. makes `statusHandler` a method on the `*ConsensusHandlers` receiver. (the rest of this handler is still not implemented. 5. refactored the `heartbeats_test` into a table driven test. this makes the test much more readable: https://dave.cheney.net/2019/05/07/prefer-table-driven-tests.
This commit is contained in:
parent
b71ba5326f
commit
29ff8daa92
27
cmd/lightclient/sentinel/handlers/blocks.go
Normal file
27
cmd/lightclient/sentinel/handlers/blocks.go
Normal file
@ -0,0 +1,27 @@
|
||||
/*
|
||||
Copyright 2022 Erigon-Lightclient contributors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
)
|
||||
|
||||
func (c *ConsensusHandlers) blocksByRangeHandler(stream network.Stream) {
|
||||
log.Info("Got block by range handler call")
|
||||
}
|
||||
|
||||
func (c *ConsensusHandlers) beaconBlocksByRootHandler(stream network.Stream) {
|
||||
log.Info("Got beacon block by root handler call")
|
||||
}
|
@ -18,7 +18,6 @@ import (
|
||||
"github.com/ledgerwatch/erigon/cmd/lightclient/sentinel/communication/ssz_snappy"
|
||||
"github.com/ledgerwatch/erigon/cmd/lightclient/sentinel/peers"
|
||||
|
||||
"github.com/ledgerwatch/log/v3"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
@ -31,7 +30,7 @@ type ConsensusHandlers struct {
|
||||
metadata *cltypes.MetadataV2
|
||||
}
|
||||
|
||||
const SuccessfullResponsePrefix = 0x00
|
||||
const SuccessfulResponsePrefix = 0x00
|
||||
|
||||
var NoRequestHandlers = map[string]bool{
|
||||
MetadataProtocolV1: true,
|
||||
@ -46,9 +45,9 @@ func NewConsensusHandlers(host host.Host, peers *peers.Peers, metadata *cltypes.
|
||||
metadata: metadata,
|
||||
}
|
||||
c.handlers = map[protocol.ID]network.StreamHandler{
|
||||
protocol.ID(PingProtocolV1): curryStreamHandler(ssz_snappy.NewStreamCodec, pingHandler),
|
||||
protocol.ID(GoodbyeProtocolV1): curryStreamHandler(ssz_snappy.NewStreamCodec, pingHandler),
|
||||
protocol.ID(StatusProtocolV1): curryStreamHandler(ssz_snappy.NewStreamCodec, statusHandler),
|
||||
protocol.ID(PingProtocolV1): curryStreamHandler(ssz_snappy.NewStreamCodec, c.pingHandler),
|
||||
protocol.ID(GoodbyeProtocolV1): curryStreamHandler(ssz_snappy.NewStreamCodec, c.goodbyeHandler),
|
||||
protocol.ID(StatusProtocolV1): curryStreamHandler(ssz_snappy.NewStreamCodec, c.statusHandler),
|
||||
protocol.ID(MetadataProtocolV1): curryStreamHandler(ssz_snappy.NewStreamCodec, c.metadataV1Handler),
|
||||
protocol.ID(MetadataProtocolV2): curryStreamHandler(ssz_snappy.NewStreamCodec, c.metadataV2Handler),
|
||||
protocol.ID(BeaconBlockByRangeProtocolV1): c.blocksByRangeHandler,
|
||||
@ -57,14 +56,6 @@ func NewConsensusHandlers(host host.Host, peers *peers.Peers, metadata *cltypes.
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *ConsensusHandlers) blocksByRangeHandler(stream network.Stream) {
|
||||
log.Info("Got block by range handler call")
|
||||
}
|
||||
|
||||
func (c *ConsensusHandlers) beaconBlocksByRootHandler(stream network.Stream) {
|
||||
log.Info("Got beacon block by root handler call")
|
||||
}
|
||||
|
||||
func (c *ConsensusHandlers) Start() {
|
||||
for id, handler := range c.handlers {
|
||||
c.host.SetStreamHandler(id, handler)
|
||||
|
@ -18,32 +18,38 @@ import (
|
||||
"github.com/ledgerwatch/erigon/cmd/lightclient/sentinel/communication"
|
||||
)
|
||||
|
||||
// type safe handlers which all have access to the original stream & decompressed data
|
||||
// ping handler
|
||||
func pingHandler(ctx *communication.StreamContext, dat *cltypes.Ping) error {
|
||||
// since packets are just structs, they can be resent with no issue
|
||||
return ctx.Codec.WritePacket(dat, SuccessfullResponsePrefix)
|
||||
// Type safe handlers which all have access to the original stream & decompressed data.
|
||||
// Since packets are just structs, they can be resent with no issue
|
||||
|
||||
func (c *ConsensusHandlers) pingHandler(ctx *communication.StreamContext, _ *communication.EmptyPacket) error {
|
||||
return ctx.Codec.WritePacket(&cltypes.Ping{
|
||||
Id: c.metadata.SeqNumber,
|
||||
}, SuccessfulResponsePrefix)
|
||||
}
|
||||
|
||||
func (c *ConsensusHandlers) goodbyeHandler(ctx *communication.StreamContext, _ *communication.EmptyPacket) error {
|
||||
// From the spec, these are the valid goodbye numbers. Start with just
|
||||
// sending 1, but we should think about when the others need to be sent.
|
||||
// 1: Client shut down.
|
||||
// 2: Irrelevant network.
|
||||
// 3: Fault/error.
|
||||
return ctx.Codec.WritePacket(&cltypes.Ping{
|
||||
Id: 1,
|
||||
}, SuccessfulResponsePrefix)
|
||||
}
|
||||
|
||||
func (c *ConsensusHandlers) metadataV1Handler(ctx *communication.StreamContext, _ *communication.EmptyPacket) error {
|
||||
// since packets are just structs, they can be resent with no issue
|
||||
return ctx.Codec.WritePacket(&cltypes.MetadataV1{
|
||||
SeqNumber: c.metadata.SeqNumber,
|
||||
Attnets: c.metadata.Attnets,
|
||||
}, SuccessfullResponsePrefix)
|
||||
}, SuccessfulResponsePrefix)
|
||||
}
|
||||
|
||||
func (c *ConsensusHandlers) metadataV2Handler(ctx *communication.StreamContext, _ *communication.EmptyPacket) error {
|
||||
// since packets are just structs, they can be resent with no issue
|
||||
return ctx.Codec.WritePacket(c.metadata, SuccessfullResponsePrefix)
|
||||
}
|
||||
|
||||
// does nothing
|
||||
func nilHandler(ctx *communication.StreamContext, dat *communication.EmptyPacket) error {
|
||||
return nil
|
||||
return ctx.Codec.WritePacket(c.metadata, SuccessfulResponsePrefix)
|
||||
}
|
||||
|
||||
// TODO: Actually respond with proper status
|
||||
func statusHandler(ctx *communication.StreamContext, dat *cltypes.Status) error {
|
||||
return ctx.Codec.WritePacket(dat, SuccessfullResponsePrefix)
|
||||
func (c *ConsensusHandlers) statusHandler(ctx *communication.StreamContext, dat *cltypes.Status) error {
|
||||
return ctx.Codec.WritePacket(dat, SuccessfulResponsePrefix)
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ledgerwatch/erigon/cmd/lightclient/cltypes"
|
||||
"github.com/ledgerwatch/erigon/cmd/lightclient/sentinel/communication"
|
||||
"github.com/ledgerwatch/erigon/cmd/lightclient/sentinel/communication/ssz_snappy"
|
||||
"github.com/ledgerwatch/erigon/cmd/lightclient/sentinel/peers"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
@ -29,6 +30,18 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
testLocalMetadataV2 = &cltypes.MetadataV2{
|
||||
SeqNumber: 42,
|
||||
Attnets: 43,
|
||||
Syncnets: 44,
|
||||
}
|
||||
testLocalMetadataV1 = &cltypes.MetadataV1{
|
||||
SeqNumber: testLocalMetadataV2.SeqNumber,
|
||||
Attnets: testLocalMetadataV2.Attnets,
|
||||
}
|
||||
)
|
||||
|
||||
func initializeNetwork(t *testing.T, ctx context.Context) (*ConsensusHandlers, host.Host, host.Host) {
|
||||
h1, err := basichost.NewHost(swarmt.GenSwarm(t), nil)
|
||||
require.NoError(t, err)
|
||||
@ -38,64 +51,74 @@ func initializeNetwork(t *testing.T, ctx context.Context) (*ConsensusHandlers, h
|
||||
h2pi := h2.Peerstore().PeerInfo(h2.ID())
|
||||
require.NoError(t, h1.Connect(ctx, h2pi))
|
||||
|
||||
return NewConsensusHandlers(h2, &peers.Peers{}, &cltypes.MetadataV2{}), h1, h2
|
||||
return NewConsensusHandlers(h2, &peers.Peers{}, testLocalMetadataV2), h1, h2
|
||||
}
|
||||
|
||||
func TestPingHandler(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
|
||||
handlers, h1, h2 := initializeNetwork(t, ctx)
|
||||
defer h1.Close()
|
||||
defer h2.Close()
|
||||
handlers.Start()
|
||||
|
||||
stream, err := h1.NewStream(ctx, h2.ID(), protocol.ID(PingProtocolV1))
|
||||
require.NoError(t, err)
|
||||
packet := &cltypes.Ping{
|
||||
Id: 32,
|
||||
func TestHeartbeatHandlers(t *testing.T) {
|
||||
tests := map[string]struct {
|
||||
protocol string
|
||||
writePacket communication.Packet
|
||||
gotPacket communication.Packet
|
||||
expectedPacket communication.Packet
|
||||
}{
|
||||
"ping": {
|
||||
protocol: PingProtocolV1,
|
||||
writePacket: &cltypes.Ping{Id: 32},
|
||||
gotPacket: &cltypes.Ping{},
|
||||
expectedPacket: &cltypes.Ping{Id: testLocalMetadataV2.SeqNumber},
|
||||
},
|
||||
"goodbye": {
|
||||
protocol: GoodbyeProtocolV1,
|
||||
writePacket: &cltypes.Ping{Id: 1},
|
||||
gotPacket: &cltypes.Ping{},
|
||||
expectedPacket: &cltypes.Ping{Id: 1},
|
||||
},
|
||||
"status": {
|
||||
protocol: StatusProtocolV1,
|
||||
writePacket: &cltypes.Status{HeadSlot: 666999},
|
||||
gotPacket: &cltypes.Status{},
|
||||
expectedPacket: &cltypes.Status{HeadSlot: 666999},
|
||||
},
|
||||
"metadatav1": {
|
||||
protocol: MetadataProtocolV1,
|
||||
writePacket: nil,
|
||||
gotPacket: &cltypes.MetadataV1{},
|
||||
expectedPacket: testLocalMetadataV1,
|
||||
},
|
||||
"metadatav2": {
|
||||
protocol: MetadataProtocolV2,
|
||||
writePacket: nil,
|
||||
gotPacket: &cltypes.MetadataV2{},
|
||||
expectedPacket: testLocalMetadataV2,
|
||||
},
|
||||
}
|
||||
codec := ssz_snappy.NewStreamCodec(stream)
|
||||
err = codec.WritePacket(packet)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, codec.CloseWriter())
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
r := &cltypes.Ping{}
|
||||
|
||||
code := make([]byte, 1)
|
||||
stream.Read(code)
|
||||
assert.Equal(t, code, []byte{SuccessfullResponsePrefix})
|
||||
|
||||
_, err = codec.Decode(r)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, r, packet)
|
||||
}
|
||||
|
||||
func TestStatusHandler(t *testing.T) {
|
||||
ctx := context.TODO()
|
||||
for name, tc := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
handlers, h1, h2 := initializeNetwork(t, ctx)
|
||||
defer h1.Close()
|
||||
defer h2.Close()
|
||||
handlers.Start()
|
||||
|
||||
handlers, h1, h2 := initializeNetwork(t, ctx)
|
||||
defer h1.Close()
|
||||
defer h2.Close()
|
||||
handlers.Start()
|
||||
stream, err := h1.NewStream(ctx, h2.ID(), protocol.ID(tc.protocol))
|
||||
require.NoError(t, err)
|
||||
codec := ssz_snappy.NewStreamCodec(stream)
|
||||
|
||||
stream, err := h1.NewStream(ctx, h2.ID(), protocol.ID(StatusProtocolV1))
|
||||
require.NoError(t, err)
|
||||
packet := &cltypes.Status{
|
||||
HeadSlot: 666999,
|
||||
// Write packet.
|
||||
err = codec.WritePacket(tc.writePacket)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, codec.CloseWriter())
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Read packet.
|
||||
code := make([]byte, 1)
|
||||
stream.Read(code)
|
||||
assert.Equal(t, code, []byte{SuccessfulResponsePrefix})
|
||||
_, err = codec.Decode(tc.gotPacket)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Assert on expected packet.
|
||||
assert.Equal(t, tc.gotPacket, tc.expectedPacket)
|
||||
})
|
||||
}
|
||||
codec := ssz_snappy.NewStreamCodec(stream)
|
||||
require.NoError(t, codec.WritePacket(packet))
|
||||
require.NoError(t, codec.CloseWriter())
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
r := &cltypes.Status{}
|
||||
|
||||
code := make([]byte, 1)
|
||||
stream.Read(code)
|
||||
assert.Equal(t, code, []byte{SuccessfullResponsePrefix})
|
||||
|
||||
_, err = codec.Decode(r)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, r, packet)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user