From e1861bdb31b81acaa4bade40364bf9be442695c6 Mon Sep 17 00:00:00 2001 From: Preston Van Loon Date: Wed, 18 Sep 2019 13:02:34 -0700 Subject: [PATCH] Clean hello tracker on peer disconnection (#3522) * Clean hello tracker on peer disconnection * Clean hello tracker on peer disconnection --- beacon-chain/p2p/handshake.go | 17 ++++--- beacon-chain/p2p/interfaces.go | 1 + beacon-chain/p2p/service.go | 1 - beacon-chain/p2p/testing/p2p.go | 12 +++++ beacon-chain/sync/rpc_hello.go | 7 +++ beacon-chain/sync/rpc_hello_test.go | 72 ++++++++++++++++++++++++++++- beacon-chain/sync/service.go | 1 + 7 files changed, 102 insertions(+), 9 deletions(-) diff --git a/beacon-chain/p2p/handshake.go b/beacon-chain/p2p/handshake.go index 96cabe0df..bf7769e94 100644 --- a/beacon-chain/p2p/handshake.go +++ b/beacon-chain/p2p/handshake.go @@ -55,18 +55,23 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer }) } -// addDisconnectionHandler ensures that previously disconnected peers aren't dialed again. Due -// to either their ports being closed, nodes are no longer active,etc. -func (s *Service) addDisconnectionHandler() { +// AddDisconnectionHandler ensures that previously disconnected peers aren't dialed again. Due +// to either their ports being closed, nodes are no longer active,etc. This also calls the handler +// responsible for maintaining other parts of the sync or p2p system. +func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id peer.ID) error) { s.host.Network().Notify(&network.NotifyBundle{ DisconnectedF: func(net network.Network, conn network.Conn) { // Must be handled in a goroutine as this callback cannot be blocking. go func() { s.exclusionList.Set(conn.RemotePeer().String(), true, ttl) - log.WithField("peer", conn.RemotePeer()).Debug( - "Peer is added to exclusion list", - ) + log := log.WithField("peer", conn.RemotePeer()) + log.Debug("Peer is added to exclusion list") + ctx := context.Background() + if err := handler(ctx, conn.RemotePeer()); err != nil { + log.WithError(err).Error("Failed to handle disconnecting peer") + } }() + }, }) } diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index 314e00086..bd0aeeb4f 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -36,6 +36,7 @@ type SetStreamHandler interface { // ConnectionHandler configures p2p to handle connections with a peer. type ConnectionHandler interface { AddConnectionHandler(f func(ctx context.Context, id peer.ID) error) + AddDisconnectionHandler(f func(ctx context.Context, id peer.ID) error) } // EncodingProvider provides p2p network encoding. diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 9ebb91b93..31e4cd3f3 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -92,7 +92,6 @@ func (s *Service) Start() { log.Error("Attempted to start p2p service when it was already started") return } - s.addDisconnectionHandler() if s.cfg.BootstrapNodeAddr != "" && !s.cfg.NoDiscovery { ipAddr := ipAddr(s.cfg) diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 9f25f4c7d..fb3caf744 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -174,6 +174,18 @@ func (p *TestP2P) AddConnectionHandler(f func(ctx context.Context, id peer.ID) e }) } +// AddDisconnectionHandler -- +func (p *TestP2P) AddDisconnectionHandler(f func(ctx context.Context, id peer.ID) error) { + p.Host.Network().Notify(&network.NotifyBundle{ + DisconnectedF: func(net network.Network, conn network.Conn) { + // Must be handled in a goroutine as this callback cannot be blocking. + go func() { + f(context.Background(), conn.RemotePeer()) + }() + }, + }) +} + // Send a message to a specific peer. func (p *TestP2P) Send(ctx context.Context, msg interface{}, pid peer.ID) (network.Stream, error) { protocol := TopicMappings[reflect.TypeOf(msg)] diff --git a/beacon-chain/sync/rpc_hello.go b/beacon-chain/sync/rpc_hello.go index ad6821636..cab92ebad 100644 --- a/beacon-chain/sync/rpc_hello.go +++ b/beacon-chain/sync/rpc_hello.go @@ -59,6 +59,13 @@ func (r *RegularSync) sendRPCHelloRequest(ctx context.Context, id peer.ID) error return r.validateHelloMessage(msg, stream) } +func (r *RegularSync) removeDisconnectedPeerStatus(ctx context.Context, pid peer.ID) error { + r.helloTrackerLock.Lock() + delete(r.helloTracker, pid) + r.helloTrackerLock.Unlock() + return nil +} + // helloRPCHandler reads the incoming Hello RPC from the peer and responds with our version of a hello message. // This handler will disconnect any peer that does not match our fork version. func (r *RegularSync) helloRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { diff --git a/beacon-chain/sync/rpc_hello_test.go b/beacon-chain/sync/rpc_hello_test.go index cdeb89683..f1ed9afd5 100644 --- a/beacon-chain/sync/rpc_hello_test.go +++ b/beacon-chain/sync/rpc_hello_test.go @@ -18,8 +18,13 @@ import ( ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/testutil" + "github.com/sirupsen/logrus" ) +func init() { + logrus.SetLevel(logrus.DebugLevel) +} + func TestHelloRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) { // TODO(3441): Fix ssz string length issue. t.Skip("3441: SSZ is decoding a string with an unexpected length") @@ -144,8 +149,71 @@ func TestHelloRPCHandler_ReturnsHelloMessage(t *testing.T) { } } -func TestRegularSync_HelloRPCHandler_AddsHandshake(t *testing.T) { - t.Skip("TODO(3147): Add a test to ensure the handshake was added.") +func TestHandshakeHandlers_Roundtrip(t *testing.T) { + // Scenario is that p1 and p2 connect, exchange handshakes. + // p2 disconnects and p1 should forget the handshake status. + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + + r := &RegularSync{ + p2p: p1, + chain: &mock.ChainService{ + State: &pb.BeaconState{Slot: 5}, + FinalizedCheckPoint: ðpb.Checkpoint{}, + }, + helloTracker: make(map[peer.ID]*pb.Hello), + ctx: context.Background(), + } + + r.Start() + + // Setup streams + pcl := protocol.ID("/eth2/beacon_chain/req/hello/1/ssz") + var wg sync.WaitGroup + wg.Add(1) + p2.Host.SetStreamHandler(pcl, func(stream network.Stream) { + defer wg.Done() + out := &pb.Hello{} + if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil { + t.Fatal(err) + } + + resp := &pb.Hello{HeadSlot: 100, ForkVersion: params.BeaconConfig().GenesisForkVersion} + + if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { + t.Fatal(err) + } + _, err := r.p2p.Encoding().EncodeWithLength(stream, resp) + if err != nil { + t.Fatal(err) + } + stream.Close() + }) + + p1.Connect(p2) + + if testutil.WaitTimeout(&wg, 1*time.Second) { + t.Fatal("Did not receive stream within 1 sec") + } + + // Wait for stream buffer to be read. + time.Sleep(200 * time.Millisecond) + + if len(r.helloTracker) != 1 { + t.Errorf("Expected 1 status in the tracker, got %d", len(r.helloTracker)) + } + + if err := p2.Disconnect(p1.PeerID()); err != nil { + t.Fatal(err) + } + + // Wait for disconnect event to trigger. + time.Sleep(200 * time.Millisecond) + + if len(r.helloTracker) != 0 { + t.Errorf("Expected 0 status in the tracker, got %d", len(r.helloTracker)) + } + } func TestHelloRPCRequest_RequestSent(t *testing.T) { diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index f3b75d8a0..e2bb9b548 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -65,6 +65,7 @@ type RegularSync struct { // Start the regular sync service. func (r *RegularSync) Start() { r.p2p.AddConnectionHandler(r.sendRPCHelloRequest) + r.p2p.AddDisconnectionHandler(r.removeDisconnectedPeerStatus) } // Stop the regular sync service.