From 5ad09bfef393971fd28b33a7af2ed3f98edcfc96 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Wed, 2 Feb 2022 09:58:16 +0700 Subject: [PATCH] save --- cmd/sentry/sentry/sentry.go | 95 +++++++++++++++++++++++-------------- 1 file changed, 60 insertions(+), 35 deletions(-) diff --git a/cmd/sentry/sentry/sentry.go b/cmd/sentry/sentry/sentry.go index e3feb2c4c..89e477da3 100644 --- a/cmd/sentry/sentry/sentry.go +++ b/cmd/sentry/sentry/sentry.go @@ -55,7 +55,20 @@ type PeerInfo struct { deadlines []time.Time // Request deadlines height uint64 rw p2p.MsgReadWriter - removed bool + + removed chan struct{} // close this channel on remove + removeOnce sync.Once + tasks chan func() +} + +func NewPeerInfo(peer *p2p.Peer, rw p2p.MsgReadWriter) *PeerInfo { + p := &PeerInfo{peer: peer, rw: rw, removed: make(chan struct{}), tasks: make(chan func(), 32)} + go func() { // each peer has own worker, then slow + for f := range p.tasks { + f() + } + }() + return p } func (pi *PeerInfo) ID() enode.ID { @@ -105,15 +118,36 @@ func (pi *PeerInfo) ClearDeadlines(now time.Time, givePermit bool) int { } func (pi *PeerInfo) Remove() { - pi.lock.Lock() - defer pi.lock.Unlock() - pi.removed = true + pi.removeOnce.Do(func() { + close(pi.removed) + close(pi.tasks) + }) +} + +func (pi *PeerInfo) Async(f func()) { + select { + case <-pi.removed: // noop if peer removed + case pi.tasks <- f: + fmt.Printf("check: %d, %d\n", len(pi.tasks), cap(pi.tasks)) + if len(pi.tasks) == cap(pi.tasks) { // if channel full - loose old messages + for i := 0; i < cap(pi.tasks)/2; i++ { + select { + case <-pi.tasks: + default: + } + } + fmt.Printf("lost some messages\n") + } + } } func (pi *PeerInfo) Removed() bool { - pi.lock.RLock() - defer pi.lock.RUnlock() - return pi.removed + select { + case <-pi.removed: + return true + default: + return false + } } // ConvertH256ToPeerID() ensures the return type is enode.ID rather than [32]byte @@ -511,10 +545,7 @@ func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNod } log.Trace(fmt.Sprintf("[%s] Start with peer", peerID)) - peerInfo := &PeerInfo{ - peer: peer, - rw: rw, - } + peerInfo := NewPeerInfo(peer, rw) defer ss.GoodPeers.Delete(peerID) err := handShake(ctx, ss.GetStatus(), peerID, rw, protocol, protocol, func(bestHash common.Hash) error { @@ -621,13 +652,19 @@ func (ss *SentryServerImpl) removePeer(peerID enode.ID) { } } -func (ss *SentryServerImpl) writePeer(peerInfo *PeerInfo, msgcode uint64, data []byte) error { - err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(data)), Payload: bytes.NewReader(data)}) - if err != nil { - peerInfo.Remove() - ss.GoodPeers.Delete(peerInfo.ID()) - } - return err +func (ss *SentryServerImpl) writePeer(logPrefix string, peerInfo *PeerInfo, msgcode uint64, data []byte, ttl time.Duration) { + peerInfo.Async(func() { + err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(data)), Payload: bytes.NewReader(data)}) + if err != nil { + peerInfo.Remove() + ss.GoodPeers.Delete(peerInfo.ID()) + log.Debug(logPrefix, "msgcode", msgcode, "err", err) + } else { + if ttl > 0 { + peerInfo.AddDeadline(time.Now().Add(ttl)) + } + } + }) } func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash, peerID enode.ID) error { @@ -710,12 +747,8 @@ func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *prot if !found { break } - if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil { - lastErr = fmt.Errorf("sendMessageByMinBlock to peer %s: %w", peerInfo.ID(), err) - } else { - peerInfo.AddDeadline(time.Now().Add(30 * time.Second)) - reply.Peers = []*proto_types.H256{gointerfaces.ConvertHashToH256(peerInfo.ID())} - } + ss.writePeer("sendMessageByMinBlock", peerInfo, msgcode, inreq.Data.Data, 30*time.Second) + reply.Peers = []*proto_types.H256{gointerfaces.ConvertHashToH256(peerInfo.ID())} } return reply, lastErr } @@ -742,9 +775,7 @@ func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sent return reply, nil } - if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil { - return reply, fmt.Errorf("sendMessageById to peer %s: %w", peerID, err) - } + ss.writePeer("sendMessageById", peerInfo, msgcode, inreq.Data.Data, 0) reply.Peers = []*proto_types.H256{inreq.PeerId} return reply, nil } @@ -774,10 +805,7 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p i := 0 var lastErr error ss.rangePeers(func(peerInfo *PeerInfo) bool { - if err := ss.writePeer(peerInfo, msgcode, req.Data.Data); err != nil { - lastErr = fmt.Errorf("sendMessageToRandomPeers to peer %s: %w", peerInfo.ID(), err) - return true - } + ss.writePeer("sendMessageToRandomPeers", peerInfo, msgcode, req.Data.Data, 0) reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID())) i++ return i < sendToAmount @@ -797,10 +825,7 @@ func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sen var lastErr error ss.rangePeers(func(peerInfo *PeerInfo) bool { - if err := ss.writePeer(peerInfo, msgcode, req.Data); err != nil { - lastErr = fmt.Errorf("SendMessageToAll to peer %s: %w", peerInfo.ID(), err) - return true - } + ss.writePeer("SendMessageToAll", peerInfo, msgcode, req.Data, 0) reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID())) return true })