Sentry: 1 goroutine per peer (for outbound requests) (#3403)

This commit is contained in:
Alex Sharov 2022-02-02 14:59:00 +07:00 committed by GitHub
parent fc6e5d7e63
commit 68f1b52b1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -55,7 +55,23 @@ type PeerInfo struct {
deadlines []time.Time // Request deadlines
height uint64
rw p2p.MsgReadWriter
removed bool
removed chan struct{} // close this channel on remove
removeOnce sync.Once
// each peer has own worker (goroutine) - all funcs from this queue will execute on this worker
// if this queue is full (means peer is slow) - old messages will be dropped
// channel closed on peer remove
tasks chan func()
}
func NewPeerInfo(peer *p2p.Peer, rw p2p.MsgReadWriter) *PeerInfo {
p := &PeerInfo{peer: peer, rw: rw, removed: make(chan struct{}), tasks: make(chan func(), 16)}
go func() { // each peer has own worker, then slow
for f := range p.tasks {
f()
}
}()
return p
}
func (pi *PeerInfo) ID() enode.ID {
@ -105,15 +121,35 @@ func (pi *PeerInfo) ClearDeadlines(now time.Time, givePermit bool) int {
}
func (pi *PeerInfo) Remove() {
pi.lock.Lock()
defer pi.lock.Unlock()
pi.removed = true
pi.removeOnce.Do(func() {
close(pi.removed)
close(pi.tasks)
})
}
func (pi *PeerInfo) Async(f func()) {
select {
case <-pi.removed: // noop if peer removed
case pi.tasks <- f:
if len(pi.tasks) == cap(pi.tasks) { // if channel full - loose old messages
for i := 0; i < cap(pi.tasks)/2; i++ {
select {
case <-pi.tasks:
default:
}
}
log.Debug("slow peer or too many requests, dropping its old requests", "name", pi.peer.Name())
}
}
}
func (pi *PeerInfo) Removed() bool {
pi.lock.RLock()
defer pi.lock.RUnlock()
return pi.removed
select {
case <-pi.removed:
return true
default:
return false
}
}
// ConvertH256ToPeerID() ensures the return type is enode.ID rather than [32]byte
@ -511,10 +547,7 @@ func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNod
}
log.Trace(fmt.Sprintf("[%s] Start with peer", peerID))
peerInfo := &PeerInfo{
peer: peer,
rw: rw,
}
peerInfo := NewPeerInfo(peer, rw)
defer ss.GoodPeers.Delete(peerID)
err := handShake(ctx, ss.GetStatus(), peerID, rw, protocol, protocol, func(bestHash common.Hash) error {
@ -621,13 +654,19 @@ func (ss *SentryServerImpl) removePeer(peerID enode.ID) {
}
}
func (ss *SentryServerImpl) writePeer(peerInfo *PeerInfo, msgcode uint64, data []byte) error {
err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(data)), Payload: bytes.NewReader(data)})
if err != nil {
peerInfo.Remove()
ss.GoodPeers.Delete(peerInfo.ID())
}
return err
func (ss *SentryServerImpl) writePeer(logPrefix string, peerInfo *PeerInfo, msgcode uint64, data []byte, ttl time.Duration) {
peerInfo.Async(func() {
err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(data)), Payload: bytes.NewReader(data)})
if err != nil {
peerInfo.Remove()
ss.GoodPeers.Delete(peerInfo.ID())
log.Debug(logPrefix, "msgcode", msgcode, "err", err)
} else {
if ttl > 0 {
peerInfo.AddDeadline(time.Now().Add(ttl))
}
}
})
}
func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash, peerID enode.ID) error {
@ -710,12 +749,8 @@ func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *prot
if !found {
break
}
if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil {
lastErr = fmt.Errorf("sendMessageByMinBlock to peer %s: %w", peerInfo.ID(), err)
} else {
peerInfo.AddDeadline(time.Now().Add(30 * time.Second))
reply.Peers = []*proto_types.H256{gointerfaces.ConvertHashToH256(peerInfo.ID())}
}
ss.writePeer("sendMessageByMinBlock", peerInfo, msgcode, inreq.Data.Data, 30*time.Second)
reply.Peers = []*proto_types.H256{gointerfaces.ConvertHashToH256(peerInfo.ID())}
}
return reply, lastErr
}
@ -742,9 +777,7 @@ func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sent
return reply, nil
}
if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil {
return reply, fmt.Errorf("sendMessageById to peer %s: %w", peerID, err)
}
ss.writePeer("sendMessageById", peerInfo, msgcode, inreq.Data.Data, 0)
reply.Peers = []*proto_types.H256{inreq.PeerId}
return reply, nil
}
@ -774,10 +807,7 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p
i := 0
var lastErr error
ss.rangePeers(func(peerInfo *PeerInfo) bool {
if err := ss.writePeer(peerInfo, msgcode, req.Data.Data); err != nil {
lastErr = fmt.Errorf("sendMessageToRandomPeers to peer %s: %w", peerInfo.ID(), err)
return true
}
ss.writePeer("sendMessageToRandomPeers", peerInfo, msgcode, req.Data.Data, 0)
reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID()))
i++
return i < sendToAmount
@ -797,10 +827,7 @@ func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sen
var lastErr error
ss.rangePeers(func(peerInfo *PeerInfo) bool {
if err := ss.writePeer(peerInfo, msgcode, req.Data); err != nil {
lastErr = fmt.Errorf("SendMessageToAll to peer %s: %w", peerInfo.ID(), err)
return true
}
ss.writePeer("SendMessageToAll", peerInfo, msgcode, req.Data, 0)
reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID()))
return true
})