Fix for removed peers (#1917)

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
This commit is contained in:
ledgerwatch 2021-05-11 20:35:59 +01:00 committed by GitHub
parent bf0ea48823
commit 7f51f13804
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -70,6 +70,7 @@ type PeerInfo struct {
deadlines []time.Time // Request deadlines deadlines []time.Time // Request deadlines
height uint64 height uint64
rw p2p.MsgReadWriter rw p2p.MsgReadWriter
removed bool
} }
// AddDeadline adds given deadline to the list of deadlines // AddDeadline adds given deadline to the list of deadlines
@ -100,6 +101,18 @@ func (pi *PeerInfo) ClearDeadlines(now time.Time, givePermit bool) int {
return len(pi.deadlines) return len(pi.deadlines)
} }
func (pi *PeerInfo) Remove() {
pi.lock.Lock()
defer pi.lock.Unlock()
pi.removed = true
}
func (pi *PeerInfo) Removed() bool {
pi.lock.RLock()
defer pi.lock.RUnlock()
return pi.removed
}
func makeP2PServer( func makeP2PServer(
ctx context.Context, ctx context.Context,
nodeName string, nodeName string,
@ -317,7 +330,9 @@ func runPeer(
if err = common.Stopped(ctx.Done()); err != nil { if err = common.Stopped(ctx.Done()); err != nil {
return err return err
} }
if peerInfo.Removed() {
return fmt.Errorf("peer removed")
}
msg, err := rw.ReadMsg() msg, err := rw.ReadMsg()
if err != nil { if err != nil {
return fmt.Errorf("reading message: %v", err) return fmt.Errorf("reading message: %v", err)
@ -608,6 +623,12 @@ type SentryServerImpl struct {
func (ss *SentryServerImpl) PenalizePeer(_ context.Context, req *proto_sentry.PenalizePeerRequest) (*empty.Empty, error) { func (ss *SentryServerImpl) PenalizePeer(_ context.Context, req *proto_sentry.PenalizePeerRequest) (*empty.Empty, error) {
//log.Warn("Received penalty", "kind", req.GetPenalty().Descriptor().FullName, "from", fmt.Sprintf("%s", req.GetPeerId())) //log.Warn("Received penalty", "kind", req.GetPenalty().Descriptor().FullName, "from", fmt.Sprintf("%s", req.GetPeerId()))
strId := string(gointerfaces.ConvertH512ToBytes(req.PeerId)) strId := string(gointerfaces.ConvertH512ToBytes(req.PeerId))
if x, ok := ss.Peers.Load(strId); ok {
peerInfo := x.(*PeerInfo)
if peerInfo != nil {
peerInfo.Remove()
}
}
ss.Peers.Delete(strId) ss.Peers.Delete(strId)
return &empty.Empty{}, nil return &empty.Empty{}, nil
} }
@ -672,6 +693,12 @@ func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *prot
return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageByMinBlock not implemented for message Id: %s", inreq.Data.Id) return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageByMinBlock not implemented for message Id: %s", inreq.Data.Id)
} }
if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(inreq.Data.Data)), Payload: bytes.NewReader(inreq.Data.Data)}); err != nil { if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(inreq.Data.Data)), Payload: bytes.NewReader(inreq.Data.Data)}); err != nil {
if x, ok := ss.Peers.Load(peerID); ok {
peerInfo := x.(*PeerInfo)
if peerInfo != nil {
peerInfo.Remove()
}
}
ss.Peers.Delete(peerID) ss.Peers.Delete(peerID)
return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageByMinBlock to peer %s: %v", peerID, err) return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageByMinBlock to peer %s: %v", peerID, err)
} }
@ -707,6 +734,12 @@ func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sent
} }
if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(inreq.Data.Data)), Payload: bytes.NewReader(inreq.Data.Data)}); err != nil { if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(inreq.Data.Data)), Payload: bytes.NewReader(inreq.Data.Data)}); err != nil {
if x, ok := ss.Peers.Load(peerID); ok {
peerInfo := x.(*PeerInfo)
if peerInfo != nil {
peerInfo.Remove()
}
}
ss.Peers.Delete(peerID) ss.Peers.Delete(peerID)
return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageById to peer %s: %v", peerID, err) return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageById to peer %s: %v", peerID, err)
} }
@ -745,6 +778,12 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p
return true return true
} }
if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(req.Data.Data)), Payload: bytes.NewReader(req.Data.Data)}); err != nil { if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(req.Data.Data)), Payload: bytes.NewReader(req.Data.Data)}); err != nil {
if x, ok := ss.Peers.Load(peerID); ok {
peerInfo := x.(*PeerInfo)
if peerInfo != nil {
peerInfo.Remove()
}
}
ss.Peers.Delete(peerID) ss.Peers.Delete(peerID)
innerErr = err innerErr = err
return false return false
@ -779,6 +818,12 @@ func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sen
return true return true
} }
if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(req.Data)), Payload: bytes.NewReader(req.Data)}); err != nil { if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(req.Data)), Payload: bytes.NewReader(req.Data)}); err != nil {
if x, ok := ss.Peers.Load(peerID); ok {
peerInfo := x.(*PeerInfo)
if peerInfo != nil {
peerInfo.Remove()
}
}
ss.Peers.Delete(peerID) ss.Peers.Delete(peerID)
innerErr = err innerErr = err
return false return false