diff --git a/swarm/network/simulation/events.go b/swarm/network/simulation/events.go index 594d36225..d73c3af4e 100644 --- a/swarm/network/simulation/events.go +++ b/swarm/network/simulation/events.go @@ -20,16 +20,18 @@ import ( "context" "sync" - "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/simulations" ) // PeerEvent is the type of the channel returned by Simulation.PeerEvents. type PeerEvent struct { // NodeID is the ID of node that the event is caught on. NodeID enode.ID + // PeerID is the ID of the peer node that the event is caught on. + PeerID enode.ID // Event is the event that is caught. - Event *p2p.PeerEvent + Event *simulations.Event // Error is the error that may have happened during event watching. Error error } @@ -37,9 +39,13 @@ type PeerEvent struct { // PeerEventsFilter defines a filter on PeerEvents to exclude messages with // defined properties. Use PeerEventsFilter methods to set required options. type PeerEventsFilter struct { - t *p2p.PeerEventType - protocol *string - msgCode *uint64 + eventType simulations.EventType + + connUp *bool + + msgReceive *bool + protocol *string + msgCode *uint64 } // NewPeerEventsFilter returns a new PeerEventsFilter instance. @@ -47,20 +53,48 @@ func NewPeerEventsFilter() *PeerEventsFilter { return &PeerEventsFilter{} } -// Type sets the filter to only one peer event type. -func (f *PeerEventsFilter) Type(t p2p.PeerEventType) *PeerEventsFilter { - f.t = &t +// Connect sets the filter to events when two nodes connect. +func (f *PeerEventsFilter) Connect() *PeerEventsFilter { + f.eventType = simulations.EventTypeConn + b := true + f.connUp = &b + return f +} + +// Drop sets the filter to events when two nodes disconnect. +func (f *PeerEventsFilter) Drop() *PeerEventsFilter { + f.eventType = simulations.EventTypeConn + b := false + f.connUp = &b + return f +} + +// ReceivedMessages sets the filter to only messages that are received. +func (f *PeerEventsFilter) ReceivedMessages() *PeerEventsFilter { + f.eventType = simulations.EventTypeMsg + b := true + f.msgReceive = &b + return f +} + +// SentMessages sets the filter to only messages that are sent. +func (f *PeerEventsFilter) SentMessages() *PeerEventsFilter { + f.eventType = simulations.EventTypeMsg + b := false + f.msgReceive = &b return f } // Protocol sets the filter to only one message protocol. func (f *PeerEventsFilter) Protocol(p string) *PeerEventsFilter { + f.eventType = simulations.EventTypeMsg f.protocol = &p return f } // MsgCode sets the filter to only one msg code. func (f *PeerEventsFilter) MsgCode(c uint64) *PeerEventsFilter { + f.eventType = simulations.EventTypeMsg f.msgCode = &c return f } @@ -80,19 +114,8 @@ func (s *Simulation) PeerEvents(ctx context.Context, ids []enode.ID, filters ... go func(id enode.ID) { defer s.shutdownWG.Done() - client, err := s.Net.GetNode(id).Client() - if err != nil { - subsWG.Done() - eventC <- PeerEvent{NodeID: id, Error: err} - return - } - events := make(chan *p2p.PeerEvent) - sub, err := client.Subscribe(ctx, "admin", events, "peerEvents") - if err != nil { - subsWG.Done() - eventC <- PeerEvent{NodeID: id, Error: err} - return - } + events := make(chan *simulations.Event) + sub := s.Net.Events().Subscribe(events) defer sub.Unsubscribe() subsWG.Done() @@ -110,28 +133,55 @@ func (s *Simulation) PeerEvents(ctx context.Context, ids []enode.ID, filters ... case <-s.Done(): return case e := <-events: + // ignore control events + if e.Control { + continue + } match := len(filters) == 0 // if there are no filters match all events for _, f := range filters { - if f.t != nil && *f.t != e.Type { - continue + if f.eventType == simulations.EventTypeConn && e.Conn != nil { + if *f.connUp != e.Conn.Up { + continue + } + // all connection filter parameters matched, break the loop + match = true + break } - if f.protocol != nil && *f.protocol != e.Protocol { - continue + if f.eventType == simulations.EventTypeMsg && e.Msg != nil { + if f.msgReceive != nil && *f.msgReceive != e.Msg.Received { + continue + } + if f.protocol != nil && *f.protocol != e.Msg.Protocol { + continue + } + if f.msgCode != nil && *f.msgCode != e.Msg.Code { + continue + } + // all message filter parameters matched, break the loop + match = true + break } - if f.msgCode != nil && e.MsgCode != nil && *f.msgCode != *e.MsgCode { - continue + } + var peerID enode.ID + switch e.Type { + case simulations.EventTypeConn: + peerID = e.Conn.One + if peerID == id { + peerID = e.Conn.Other + } + case simulations.EventTypeMsg: + peerID = e.Msg.One + if peerID == id { + peerID = e.Msg.Other } - // all filter parameters matched, break the loop - match = true - break } if match { select { - case eventC <- PeerEvent{NodeID: id, Event: e}: + case eventC <- PeerEvent{NodeID: id, PeerID: peerID, Event: e}: case <-ctx.Done(): if err := ctx.Err(); err != nil { select { - case eventC <- PeerEvent{NodeID: id, Error: err}: + case eventC <- PeerEvent{NodeID: id, PeerID: peerID, Error: err}: case <-s.Done(): } } diff --git a/swarm/network/simulation/example_test.go b/swarm/network/simulation/example_test.go index 84b0634b4..bacc64d53 100644 --- a/swarm/network/simulation/example_test.go +++ b/swarm/network/simulation/example_test.go @@ -24,7 +24,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" @@ -87,7 +86,7 @@ func ExampleSimulation_PeerEvents() { log.Error("peer event", "err", e.Error) continue } - log.Info("peer event", "node", e.NodeID, "peer", e.Event.Peer, "msgcode", e.Event.MsgCode) + log.Info("peer event", "node", e.NodeID, "peer", e.PeerID, "type", e.Event.Type) } }() } @@ -100,7 +99,7 @@ func ExampleSimulation_PeerEvents_disconnections() { disconnections := sim.PeerEvents( context.Background(), sim.NodeIDs(), - simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + simulation.NewPeerEventsFilter().Drop(), ) go func() { @@ -109,7 +108,7 @@ func ExampleSimulation_PeerEvents_disconnections() { log.Error("peer drop", "err", d.Error) continue } - log.Warn("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + log.Warn("peer drop", "node", d.NodeID, "peer", d.PeerID) } }() } @@ -124,8 +123,8 @@ func ExampleSimulation_PeerEvents_multipleFilters() { context.Background(), sim.NodeIDs(), // Watch when bzz messages 1 and 4 are received. - simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("bzz").MsgCode(1), - simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("bzz").MsgCode(4), + simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("bzz").MsgCode(1), + simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("bzz").MsgCode(4), ) go func() { @@ -134,7 +133,7 @@ func ExampleSimulation_PeerEvents_multipleFilters() { log.Error("bzz message", "err", m.Error) continue } - log.Info("bzz message", "node", m.NodeID, "peer", m.Event.Peer) + log.Info("bzz message", "node", m.NodeID, "peer", m.PeerID) } }() } diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index c9a530115..6b6025115 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -565,13 +565,13 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck disconnections := sim.PeerEvents( context.Background(), sim.NodeIDs(), - simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + simulation.NewPeerEventsFilter().Drop(), ) go func() { for d := range disconnections { if d.Error != nil { - log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) t.Fatal(d.Error) } } @@ -697,13 +697,13 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip disconnections := sim.PeerEvents( context.Background(), sim.NodeIDs(), - simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + simulation.NewPeerEventsFilter().Drop(), ) go func() { for d := range disconnections { if d.Error != nil { - log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) b.Fatal(d.Error) } } diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index 037984f22..b9525d4a4 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/swarm/network" @@ -154,7 +153,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { disconnections := sim.PeerEvents( context.Background(), sim.NodeIDs(), - simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + simulation.NewPeerEventsFilter().Drop(), ) err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top) @@ -165,7 +164,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { go func() { for d := range disconnections { if d.Error != nil { - log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) t.Fatal(d.Error) } } diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 4bd7f38f5..96c37bddc 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" @@ -210,12 +209,12 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) { disconnections := sim.PeerEvents( context.Background(), sim.NodeIDs(), - simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + simulation.NewPeerEventsFilter().Drop(), ) go func() { for d := range disconnections { - log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) t.Fatal("unexpected disconnect") cancelSimRun() } @@ -402,12 +401,12 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) disconnections := sim.PeerEvents( context.Background(), sim.NodeIDs(), - simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + simulation.NewPeerEventsFilter().Drop(), ) go func() { for d := range disconnections { - log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) t.Fatal("unexpected disconnect") cancelSimRun() } @@ -428,7 +427,7 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) var subscriptionCount int - filter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(4) + filter := simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(4) eventC := sim.PeerEvents(ctx, nodeIDs, filter) for j, node := range nodeIDs { diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index a543cae05..f4e055451 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -28,7 +28,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/swarm/log" @@ -151,13 +150,13 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck disconnections := sim.PeerEvents( context.Background(), sim.NodeIDs(), - simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop), + simulation.NewPeerEventsFilter().Drop(), ) go func() { for d := range disconnections { if d.Error != nil { - log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer) + log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID) t.Fatal(d.Error) } }