Clean hello tracker on peer disconnection (#3522)

* Clean hello tracker on peer disconnection

* Clean hello tracker on peer disconnection
This commit is contained in:
Preston Van Loon 2019-09-18 13:02:34 -07:00 committed by Raul Jordan
parent f69195f211
commit e1861bdb31
7 changed files with 102 additions and 9 deletions

View File

@ -55,18 +55,23 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer
})
}
// addDisconnectionHandler ensures that previously disconnected peers aren't dialed again. Due
// to either their ports being closed, nodes are no longer active,etc.
func (s *Service) addDisconnectionHandler() {
// AddDisconnectionHandler ensures that previously disconnected peers aren't dialed again. Due
// to either their ports being closed, nodes are no longer active,etc. This also calls the handler
// responsible for maintaining other parts of the sync or p2p system.
func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id peer.ID) error) {
s.host.Network().Notify(&network.NotifyBundle{
DisconnectedF: func(net network.Network, conn network.Conn) {
// Must be handled in a goroutine as this callback cannot be blocking.
go func() {
s.exclusionList.Set(conn.RemotePeer().String(), true, ttl)
log.WithField("peer", conn.RemotePeer()).Debug(
"Peer is added to exclusion list",
)
log := log.WithField("peer", conn.RemotePeer())
log.Debug("Peer is added to exclusion list")
ctx := context.Background()
if err := handler(ctx, conn.RemotePeer()); err != nil {
log.WithError(err).Error("Failed to handle disconnecting peer")
}
}()
},
})
}

View File

@ -36,6 +36,7 @@ type SetStreamHandler interface {
// ConnectionHandler configures p2p to handle connections with a peer.
type ConnectionHandler interface {
AddConnectionHandler(f func(ctx context.Context, id peer.ID) error)
AddDisconnectionHandler(f func(ctx context.Context, id peer.ID) error)
}
// EncodingProvider provides p2p network encoding.

View File

@ -92,7 +92,6 @@ func (s *Service) Start() {
log.Error("Attempted to start p2p service when it was already started")
return
}
s.addDisconnectionHandler()
if s.cfg.BootstrapNodeAddr != "" && !s.cfg.NoDiscovery {
ipAddr := ipAddr(s.cfg)

View File

@ -174,6 +174,18 @@ func (p *TestP2P) AddConnectionHandler(f func(ctx context.Context, id peer.ID) e
})
}
// AddDisconnectionHandler --
func (p *TestP2P) AddDisconnectionHandler(f func(ctx context.Context, id peer.ID) error) {
p.Host.Network().Notify(&network.NotifyBundle{
DisconnectedF: func(net network.Network, conn network.Conn) {
// Must be handled in a goroutine as this callback cannot be blocking.
go func() {
f(context.Background(), conn.RemotePeer())
}()
},
})
}
// Send a message to a specific peer.
func (p *TestP2P) Send(ctx context.Context, msg interface{}, pid peer.ID) (network.Stream, error) {
protocol := TopicMappings[reflect.TypeOf(msg)]

View File

@ -59,6 +59,13 @@ func (r *RegularSync) sendRPCHelloRequest(ctx context.Context, id peer.ID) error
return r.validateHelloMessage(msg, stream)
}
func (r *RegularSync) removeDisconnectedPeerStatus(ctx context.Context, pid peer.ID) error {
r.helloTrackerLock.Lock()
delete(r.helloTracker, pid)
r.helloTrackerLock.Unlock()
return nil
}
// helloRPCHandler reads the incoming Hello RPC from the peer and responds with our version of a hello message.
// This handler will disconnect any peer that does not match our fork version.
func (r *RegularSync) helloRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error {

View File

@ -18,8 +18,13 @@ import (
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/sirupsen/logrus"
)
func init() {
logrus.SetLevel(logrus.DebugLevel)
}
func TestHelloRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) {
// TODO(3441): Fix ssz string length issue.
t.Skip("3441: SSZ is decoding a string with an unexpected length")
@ -144,8 +149,71 @@ func TestHelloRPCHandler_ReturnsHelloMessage(t *testing.T) {
}
}
func TestRegularSync_HelloRPCHandler_AddsHandshake(t *testing.T) {
t.Skip("TODO(3147): Add a test to ensure the handshake was added.")
func TestHandshakeHandlers_Roundtrip(t *testing.T) {
// Scenario is that p1 and p2 connect, exchange handshakes.
// p2 disconnects and p1 should forget the handshake status.
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
r := &RegularSync{
p2p: p1,
chain: &mock.ChainService{
State: &pb.BeaconState{Slot: 5},
FinalizedCheckPoint: &ethpb.Checkpoint{},
},
helloTracker: make(map[peer.ID]*pb.Hello),
ctx: context.Background(),
}
r.Start()
// Setup streams
pcl := protocol.ID("/eth2/beacon_chain/req/hello/1/ssz")
var wg sync.WaitGroup
wg.Add(1)
p2.Host.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
out := &pb.Hello{}
if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil {
t.Fatal(err)
}
resp := &pb.Hello{HeadSlot: 100, ForkVersion: params.BeaconConfig().GenesisForkVersion}
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
t.Fatal(err)
}
_, err := r.p2p.Encoding().EncodeWithLength(stream, resp)
if err != nil {
t.Fatal(err)
}
stream.Close()
})
p1.Connect(p2)
if testutil.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
// Wait for stream buffer to be read.
time.Sleep(200 * time.Millisecond)
if len(r.helloTracker) != 1 {
t.Errorf("Expected 1 status in the tracker, got %d", len(r.helloTracker))
}
if err := p2.Disconnect(p1.PeerID()); err != nil {
t.Fatal(err)
}
// Wait for disconnect event to trigger.
time.Sleep(200 * time.Millisecond)
if len(r.helloTracker) != 0 {
t.Errorf("Expected 0 status in the tracker, got %d", len(r.helloTracker))
}
}
func TestHelloRPCRequest_RequestSent(t *testing.T) {

View File

@ -65,6 +65,7 @@ type RegularSync struct {
// Start the regular sync service.
func (r *RegularSync) Start() {
r.p2p.AddConnectionHandler(r.sendRPCHelloRequest)
r.p2p.AddDisconnectionHandler(r.removeDisconnectedPeerStatus)
}
// Stop the regular sync service.