Deadline-based permits in sentry (#1915)

* Not increase permit on announcements

* Init permits

* Deadlines

* Fix NPEs

* Fix NPE

* Fix findPeer

* Reduce max permits to 4

* 8 max permits, but 30 seconds timeout

* Back to 4

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
This commit is contained in:
ledgerwatch 2021-05-11 18:20:00 +01:00 committed by GitHub
parent 0b7d185d1f
commit bf0ea48823
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 115 additions and 121 deletions

View File

@ -10,6 +10,7 @@ import (
"net"
"os"
"os/signal"
"sort"
"sync"
"syscall"
"time"
@ -42,7 +43,7 @@ const (
// handshakeTimeout is the maximum allowed time for the `eth` handshake to
// complete before dropping the connection.= as malicious.
handshakeTimeout = 5 * time.Second
maxPermitsPerPeer = 8 // How many outstanding requests per peer we may have
maxPermitsPerPeer = 4 // How many outstanding requests per peer we may have
)
func nodeKey() *ecdsa.PrivateKey {
@ -61,6 +62,44 @@ func nodeKey() *ecdsa.PrivateKey {
return key
}
// PeerInfo collects various extra bits of information about the peer,
// for example deadlines that is used for regulating requests sent to the peer
type PeerInfo struct {
peer *p2p.Peer
lock sync.RWMutex
deadlines []time.Time // Request deadlines
height uint64
rw p2p.MsgReadWriter
}
// AddDeadline adds given deadline to the list of deadlines
// Deadlines must be added in the chronological order for the function
// ClearDeadlines to work correctly (it uses binary search)
func (pi *PeerInfo) AddDeadline(deadline time.Time) {
pi.lock.Lock()
defer pi.lock.Unlock()
pi.deadlines = append(pi.deadlines, deadline)
}
// ClearDeadlines goes through the deadlines of
// given peers and removes the ones that have passed
// Optionally, it also clears one extra deadline - this is used when response is received
// It returns the number of deadlines left
func (pi *PeerInfo) ClearDeadlines(now time.Time, givePermit bool) int {
pi.lock.Lock()
defer pi.lock.Unlock()
// Look for the first deadline which is not passed yet
firstNotPassed := sort.Search(len(pi.deadlines), func(i int) bool {
return pi.deadlines[i].After(now)
})
cutOff := firstNotPassed
if cutOff < len(pi.deadlines) && givePermit {
cutOff++
}
pi.deadlines = pi.deadlines[cutOff:]
return len(pi.deadlines)
}
func makeP2PServer(
ctx context.Context,
nodeName string,
@ -68,9 +107,6 @@ func makeP2PServer(
natSetting string,
port int,
peers *sync.Map,
peerHeightMap *sync.Map,
peerPermitMap *sync.Map,
peerRwMap *sync.Map,
genesisHash common.Hash,
statusFn func() *proto_sentry.StatusData,
receiveCh chan<- StreamMsg,
@ -121,7 +157,7 @@ func makeP2PServer(
}
}
p2pConfig.Protocols = MakeProtocols(ctx, readNodeInfo, dialCandidates, peers, peerHeightMap, peerPermitMap, peerRwMap, statusFn, receiveCh, receiveUploadCh, receiveTxCh)
p2pConfig.Protocols = MakeProtocols(ctx, readNodeInfo, dialCandidates, peers, statusFn, receiveCh, receiveUploadCh, receiveTxCh)
return &p2p.Server{Config: p2pConfig}, nil
}
@ -129,9 +165,6 @@ func MakeProtocols(ctx context.Context,
readNodeInfo func() *eth.NodeInfo,
dialCandidates enode.Iterator,
peers *sync.Map,
peerHeightMap *sync.Map,
peerPermitMap *sync.Map,
peerRwMap *sync.Map,
statusFn func() *proto_sentry.StatusData,
receiveCh chan<- StreamMsg,
receiveUploadCh chan<- StreamMsg,
@ -146,17 +179,20 @@ func MakeProtocols(ctx context.Context,
Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
peerID := peer.ID().String()
log.Debug(fmt.Sprintf("[%s] Start with peer", peerID))
peers.Store(peerID, rw)
peerRwMap.Store(peerID, rw)
if err := handShake(ctx, statusFn(), peerID, rw, eth.ProtocolVersions[0], eth.ProtocolVersions[0]); err != nil {
return fmt.Errorf("handshake to peer %s: %v", peerID, err)
}
log.Debug(fmt.Sprintf("[%s] Received status message OK", peerID), "name", peer.Name())
peerInfo := &PeerInfo{
peer: peer,
rw: rw,
}
peers.Store(peerID, peerInfo)
if err := runPeer(
ctx,
peerHeightMap,
peerPermitMap,
peerRwMap,
peer,
eth.ProtocolVersions[0], // version == eth66
eth.ProtocolVersions[0], // minVersion == eth66
statusFn(),
peerID,
rw,
peerInfo,
receiveCh,
receiveUploadCh,
receiveTxCh,
@ -164,9 +200,6 @@ func MakeProtocols(ctx context.Context,
log.Debug(fmt.Sprintf("[%s] Error while running peer: %v", peerID, err))
}
peers.Delete(peerID)
peerHeightMap.Delete(peerID)
peerPermitMap.Delete(peerID)
peerRwMap.Delete(peerID)
return nil
},
NodeInfo: func() interface{} {
@ -177,7 +210,7 @@ func MakeProtocols(ctx context.Context,
if !ok {
return nil
}
return p.(*p2p.Peer).Info()
return p.(*PeerInfo).peer.Info()
},
//Attributes: []enr.Entry{eth.CurrentENREntry(chainConfig, genesisHash, headHeight)},
},
@ -272,38 +305,19 @@ func handShake(
func runPeer(
ctx context.Context,
peerHeightMap *sync.Map,
peerPermitMap *sync.Map,
peerRwMap *sync.Map,
peer *p2p.Peer,
version uint,
minVersion uint,
status *proto_sentry.StatusData,
peerID string,
rw p2p.MsgReadWriter,
peerInfo *PeerInfo,
receiveCh chan<- StreamMsg,
receiveUploadCh chan<- StreamMsg,
receiveTxCh chan<- StreamMsg,
) error {
peerID := peer.ID().String()
rwRaw, ok := peerRwMap.Load(peerID)
if !ok {
return fmt.Errorf("peer has been penalized")
}
rw, _ := rwRaw.(p2p.MsgReadWriter)
if err := handShake(ctx, status, peerID, rw, version, minVersion); err != nil {
return fmt.Errorf("handshake to peer %s: %v", peerID, err)
}
log.Debug(fmt.Sprintf("[%s] Received status message OK", peerID), "name", peer.Name())
for {
var err error
if err = common.Stopped(ctx.Done()); err != nil {
return err
}
if _, ok := peerRwMap.Load(peerID); !ok {
return fmt.Errorf("peer has been penalized")
}
msg, err := rw.ReadMsg()
if err != nil {
return fmt.Errorf("reading message: %v", err)
@ -312,7 +326,7 @@ func runPeer(
msg.Discard()
return fmt.Errorf("message is too large %d, limit %d", msg.Size, eth.ProtocolMaxMsgSize)
}
increasePermit := false
givePermit := false
switch msg.Code {
case eth.StatusMsg:
msg.Discard()
@ -325,7 +339,7 @@ func runPeer(
}
trySend(receiveUploadCh, &StreamMsg{b, peerID, "GetBlockHeadersMsg", proto_sentry.MessageId_GetBlockHeaders})
case eth.BlockHeadersMsg:
increasePermit = true
givePermit = true
b := make([]byte, msg.Size)
if _, err := io.ReadFull(msg.Payload, b); err != nil {
log.Error(fmt.Sprintf("%s: reading msg into bytes: %v", peerID, err))
@ -338,7 +352,7 @@ func runPeer(
}
trySend(receiveUploadCh, &StreamMsg{b, peerID, "GetBlockBodiesMsg", proto_sentry.MessageId_GetBlockBodies})
case eth.BlockBodiesMsg:
increasePermit = true
givePermit = true
b := make([]byte, msg.Size)
if _, err := io.ReadFull(msg.Payload, b); err != nil {
log.Error(fmt.Sprintf("%s: reading msg into bytes: %v", peerID, err))
@ -351,14 +365,12 @@ func runPeer(
case eth.ReceiptsMsg:
//log.Info(fmt.Sprintf("[%s] ReceiptsMsg", peerID))
case eth.NewBlockHashesMsg:
increasePermit = true
b := make([]byte, msg.Size)
if _, err := io.ReadFull(msg.Payload, b); err != nil {
log.Error(fmt.Sprintf("%s: reading msg into bytes: %v", peerID, err))
}
trySend(receiveCh, &StreamMsg{b, peerID, "NewBlockHashesMsg", proto_sentry.MessageId_NewBlockHashes})
case eth.NewBlockMsg:
increasePermit = true
b := make([]byte, msg.Size)
if _, err := io.ReadFull(msg.Payload, b); err != nil {
log.Error(fmt.Sprintf("%s: reading msg into bytes: %v", peerID, err))
@ -419,15 +431,7 @@ func runPeer(
log.Error(fmt.Sprintf("[%s] Unknown message code: %d", peerID, msg.Code))
}
msg.Discard()
if increasePermit {
// Peer responded or sent message - reset the "back off" timer
permitsRaw, _ := peerPermitMap.Load(peerID)
permits, _ := permitsRaw.(int)
if permits < maxPermitsPerPeer {
permits++
peerPermitMap.Store(peerID, permits)
}
}
peerInfo.ClearDeadlines(time.Now(), givePermit)
}
}
@ -530,9 +534,6 @@ func p2pServer(ctx context.Context,
natSetting,
port,
&sentryServer.Peers,
&sentryServer.PeerHeightMap,
&sentryServer.PeerPermitMap,
&sentryServer.PeerRwMap,
genesisHash,
sentryServer.GetStatus,
sentryServer.ReceiveCh,
@ -592,9 +593,6 @@ type SentryServerImpl struct {
discovery bool
netRestrict string
Peers sync.Map
PeerHeightMap sync.Map
PeerRwMap sync.Map
PeerPermitMap sync.Map // Keeps track of request permits for each peer - for request load balancing
statusData *proto_sentry.StatusData
P2pServer *p2p.Server
nodeName string
@ -610,55 +608,60 @@ type SentryServerImpl struct {
func (ss *SentryServerImpl) PenalizePeer(_ context.Context, req *proto_sentry.PenalizePeerRequest) (*empty.Empty, error) {
//log.Warn("Received penalty", "kind", req.GetPenalty().Descriptor().FullName, "from", fmt.Sprintf("%s", req.GetPeerId()))
strId := string(gointerfaces.ConvertH512ToBytes(req.PeerId))
ss.PeerRwMap.Delete(strId)
ss.PeerPermitMap.Delete(strId)
ss.PeerHeightMap.Delete(strId)
ss.Peers.Delete(strId)
return &empty.Empty{}, nil
}
func (ss *SentryServerImpl) PeerMinBlock(_ context.Context, req *proto_sentry.PeerMinBlockRequest) (*empty.Empty, error) {
peerID := string(gointerfaces.ConvertH512ToBytes(req.PeerId))
x, _ := ss.PeerHeightMap.Load(peerID)
highestBlock, _ := x.(uint64)
if req.MinBlock > highestBlock {
ss.PeerHeightMap.Store(peerID, req.MinBlock)
x, _ := ss.Peers.Load(peerID)
peerInfo, _ := x.(*PeerInfo)
if peerInfo == nil {
return &empty.Empty{}, nil
}
peerInfo.lock.Lock()
defer peerInfo.lock.Unlock()
if req.MinBlock > peerInfo.height {
peerInfo.height = req.MinBlock
}
return &empty.Empty{}, nil
}
func (ss *SentryServerImpl) findPeer(minBlock uint64) (string, bool) {
func (ss *SentryServerImpl) findPeer(minBlock uint64) (string, *PeerInfo, bool) {
// Choose a peer that we can send this request to, with maximum number of permits
var foundPeerID string
var foundPeerInfo *PeerInfo
var maxPermits int
ss.PeerHeightMap.Range(func(key, value interface{}) bool {
valUint, _ := value.(uint64)
if valUint >= minBlock {
peerID := key.(string)
permitsRaw, _ := ss.PeerPermitMap.Load(peerID)
permits, _ := permitsRaw.(int)
if permits > maxPermits {
maxPermits = permits
foundPeerID = peerID
now := time.Now()
ss.Peers.Range(func(key, value interface{}) bool {
peerID := key.(string)
x, _ := ss.Peers.Load(peerID)
peerInfo, _ := x.(*PeerInfo)
if peerInfo == nil {
return true
}
if peerInfo.height >= minBlock {
deadlines := peerInfo.ClearDeadlines(now, false /* givePermit */)
//fmt.Printf("%d deadlines for peer %s\n", deadlines, peerID)
if deadlines < maxPermitsPerPeer {
permits := maxPermitsPerPeer - deadlines
if permits > maxPermits {
maxPermits = permits
foundPeerID = peerID
foundPeerInfo = peerInfo
}
}
}
return true
})
return foundPeerID, maxPermits > 0
return foundPeerID, foundPeerInfo, maxPermits > 0
}
func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *proto_sentry.SendMessageByMinBlockRequest) (*proto_sentry.SentPeers, error) {
peerID, found := ss.findPeer(inreq.MinBlock)
peerID, peerInfo, found := ss.findPeer(inreq.MinBlock)
if !found {
return &proto_sentry.SentPeers{}, nil
}
rwRaw, _ := ss.PeerRwMap.Load(peerID)
rw, _ := rwRaw.(p2p.MsgReadWriter)
if rw == nil {
ss.PeerHeightMap.Delete(peerID)
ss.PeerPermitMap.Delete(peerID)
ss.PeerRwMap.Delete(peerID)
return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageByMinBlock find rw for peer %s", peerID)
}
var msgcode uint64
switch inreq.Data.Id {
case proto_sentry.MessageId_GetBlockHeaders:
@ -668,27 +671,21 @@ func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *prot
default:
return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageByMinBlock not implemented for message Id: %s", inreq.Data.Id)
}
if err := rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(inreq.Data.Data)), Payload: bytes.NewReader(inreq.Data.Data)}); err != nil {
ss.PeerHeightMap.Delete(peerID)
ss.PeerPermitMap.Delete(peerID)
ss.PeerRwMap.Delete(peerID)
if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(inreq.Data.Data)), Payload: bytes.NewReader(inreq.Data.Data)}); err != nil {
ss.Peers.Delete(peerID)
return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageByMinBlock to peer %s: %v", peerID, err)
}
permitsRaw, _ := ss.PeerPermitMap.Load(peerID)
permits, _ := permitsRaw.(int)
if permits > 0 {
ss.PeerPermitMap.Store(peerID, permits-1)
}
peerInfo.AddDeadline(time.Now().Add(30 * time.Second))
return &proto_sentry.SentPeers{Peers: []*proto_types.H512{gointerfaces.ConvertBytesToH512([]byte(peerID))}}, nil
}
func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sentry.SendMessageByIdRequest) (*proto_sentry.SentPeers, error) {
peerID := string(gointerfaces.ConvertH512ToBytes(inreq.PeerId))
rwRaw, ok := ss.PeerRwMap.Load(peerID)
x, ok := ss.Peers.Load(peerID)
if !ok {
return &proto_sentry.SentPeers{}, fmt.Errorf("peer not found: %s", peerID)
}
rw, _ := rwRaw.(p2p.MsgReadWriter)
peerInfo := x.(*PeerInfo)
var msgcode uint64
switch inreq.Data.Id {
case proto_sentry.MessageId_GetBlockHeaders:
@ -709,10 +706,8 @@ func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sent
return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageById not implemented for message Id: %s", inreq.Data.Id)
}
if err := rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(inreq.Data.Data)), Payload: bytes.NewReader(inreq.Data.Data)}); err != nil {
ss.PeerHeightMap.Delete(peerID)
ss.PeerPermitMap.Delete(peerID)
ss.PeerRwMap.Delete(peerID)
if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(inreq.Data.Data)), Payload: bytes.NewReader(inreq.Data.Data)}); err != nil {
ss.Peers.Delete(peerID)
return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageById to peer %s: %v", peerID, err)
}
return &proto_sentry.SentPeers{Peers: []*proto_types.H512{inreq.PeerId}}, nil
@ -730,7 +725,7 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p
}
amount := uint64(0)
ss.PeerRwMap.Range(func(key, value interface{}) bool {
ss.Peers.Range(func(key, value interface{}) bool {
amount++
return true
})
@ -743,13 +738,14 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p
i := 0
var innerErr error
reply := &proto_sentry.SentPeers{Peers: []*proto_types.H512{}}
ss.PeerRwMap.Range(func(key, value interface{}) bool {
ss.Peers.Range(func(key, value interface{}) bool {
peerID := key.(string)
rw, _ := value.(p2p.MsgReadWriter)
if err := rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(req.Data.Data)), Payload: bytes.NewReader(req.Data.Data)}); err != nil {
ss.PeerHeightMap.Delete(peerID)
ss.PeerPermitMap.Delete(peerID)
ss.PeerRwMap.Delete(peerID)
peerInfo, _ := value.(*PeerInfo)
if peerInfo == nil {
return true
}
if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(req.Data.Data)), Payload: bytes.NewReader(req.Data.Data)}); err != nil {
ss.Peers.Delete(peerID)
innerErr = err
return false
}
@ -776,13 +772,14 @@ func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sen
var innerErr error
reply := &proto_sentry.SentPeers{Peers: []*proto_types.H512{}}
ss.PeerRwMap.Range(func(key, value interface{}) bool {
ss.Peers.Range(func(key, value interface{}) bool {
peerID := key.(string)
rw, _ := value.(p2p.MsgReadWriter)
if err := rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(req.Data)), Payload: bytes.NewReader(req.Data)}); err != nil {
ss.PeerHeightMap.Delete(peerID)
ss.PeerPermitMap.Delete(peerID)
ss.PeerRwMap.Delete(peerID)
peerInfo, _ := value.(*PeerInfo)
if peerInfo == nil {
return true
}
if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(req.Data)), Payload: bytes.NewReader(req.Data)}); err != nil {
ss.Peers.Delete(peerID)
innerErr = err
return false
}

View File

@ -775,9 +775,6 @@ func (s *Ethereum) Protocols() []p2p.Protocol {
readNodeInfo,
s.ethDialCandidates,
&s.sentryServer.Peers,
&s.sentryServer.PeerHeightMap,
&s.sentryServer.PeerPermitMap,
&s.sentryServer.PeerRwMap,
s.sentryServer.GetStatus,
s.sentryServer.ReceiveCh,
s.sentryServer.ReceiveUploadCh,