From 7e93a08155a8be29042ea432b4536be09200ef11 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Wed, 2 Feb 2022 09:58:34 +0700 Subject: [PATCH] Revert "save" This reverts commit 5ad09bfef393971fd28b33a7af2ed3f98edcfc96. --- cmd/sentry/sentry/sentry.go | 95 ++++++++++++++----------------------- 1 file changed, 35 insertions(+), 60 deletions(-) diff --git a/cmd/sentry/sentry/sentry.go b/cmd/sentry/sentry/sentry.go index 89e477da3..e3feb2c4c 100644 --- a/cmd/sentry/sentry/sentry.go +++ b/cmd/sentry/sentry/sentry.go @@ -55,20 +55,7 @@ type PeerInfo struct { deadlines []time.Time // Request deadlines height uint64 rw p2p.MsgReadWriter - - removed chan struct{} // close this channel on remove - removeOnce sync.Once - tasks chan func() -} - -func NewPeerInfo(peer *p2p.Peer, rw p2p.MsgReadWriter) *PeerInfo { - p := &PeerInfo{peer: peer, rw: rw, removed: make(chan struct{}), tasks: make(chan func(), 32)} - go func() { // each peer has own worker, then slow - for f := range p.tasks { - f() - } - }() - return p + removed bool } func (pi *PeerInfo) ID() enode.ID { @@ -118,36 +105,15 @@ func (pi *PeerInfo) ClearDeadlines(now time.Time, givePermit bool) int { } func (pi *PeerInfo) Remove() { - pi.removeOnce.Do(func() { - close(pi.removed) - close(pi.tasks) - }) -} - -func (pi *PeerInfo) Async(f func()) { - select { - case <-pi.removed: // noop if peer removed - case pi.tasks <- f: - fmt.Printf("check: %d, %d\n", len(pi.tasks), cap(pi.tasks)) - if len(pi.tasks) == cap(pi.tasks) { // if channel full - loose old messages - for i := 0; i < cap(pi.tasks)/2; i++ { - select { - case <-pi.tasks: - default: - } - } - fmt.Printf("lost some messages\n") - } - } + pi.lock.Lock() + defer pi.lock.Unlock() + pi.removed = true } func (pi *PeerInfo) Removed() bool { - select { - case <-pi.removed: - return true - default: - return false - } + pi.lock.RLock() + defer pi.lock.RUnlock() + return pi.removed } // ConvertH256ToPeerID() ensures the return type is enode.ID rather than [32]byte @@ -545,7 +511,10 @@ func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNod } log.Trace(fmt.Sprintf("[%s] Start with peer", peerID)) - peerInfo := NewPeerInfo(peer, rw) + peerInfo := &PeerInfo{ + peer: peer, + rw: rw, + } defer ss.GoodPeers.Delete(peerID) err := handShake(ctx, ss.GetStatus(), peerID, rw, protocol, protocol, func(bestHash common.Hash) error { @@ -652,19 +621,13 @@ func (ss *SentryServerImpl) removePeer(peerID enode.ID) { } } -func (ss *SentryServerImpl) writePeer(logPrefix string, peerInfo *PeerInfo, msgcode uint64, data []byte, ttl time.Duration) { - peerInfo.Async(func() { - err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(data)), Payload: bytes.NewReader(data)}) - if err != nil { - peerInfo.Remove() - ss.GoodPeers.Delete(peerInfo.ID()) - log.Debug(logPrefix, "msgcode", msgcode, "err", err) - } else { - if ttl > 0 { - peerInfo.AddDeadline(time.Now().Add(ttl)) - } - } - }) +func (ss *SentryServerImpl) writePeer(peerInfo *PeerInfo, msgcode uint64, data []byte) error { + err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(data)), Payload: bytes.NewReader(data)}) + if err != nil { + peerInfo.Remove() + ss.GoodPeers.Delete(peerInfo.ID()) + } + return err } func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash, peerID enode.ID) error { @@ -747,8 +710,12 @@ func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *prot if !found { break } - ss.writePeer("sendMessageByMinBlock", peerInfo, msgcode, inreq.Data.Data, 30*time.Second) - reply.Peers = []*proto_types.H256{gointerfaces.ConvertHashToH256(peerInfo.ID())} + if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil { + lastErr = fmt.Errorf("sendMessageByMinBlock to peer %s: %w", peerInfo.ID(), err) + } else { + peerInfo.AddDeadline(time.Now().Add(30 * time.Second)) + reply.Peers = []*proto_types.H256{gointerfaces.ConvertHashToH256(peerInfo.ID())} + } } return reply, lastErr } @@ -775,7 +742,9 @@ func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sent return reply, nil } - ss.writePeer("sendMessageById", peerInfo, msgcode, inreq.Data.Data, 0) + if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil { + return reply, fmt.Errorf("sendMessageById to peer %s: %w", peerID, err) + } reply.Peers = []*proto_types.H256{inreq.PeerId} return reply, nil } @@ -805,7 +774,10 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p i := 0 var lastErr error ss.rangePeers(func(peerInfo *PeerInfo) bool { - ss.writePeer("sendMessageToRandomPeers", peerInfo, msgcode, req.Data.Data, 0) + if err := ss.writePeer(peerInfo, msgcode, req.Data.Data); err != nil { + lastErr = fmt.Errorf("sendMessageToRandomPeers to peer %s: %w", peerInfo.ID(), err) + return true + } reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID())) i++ return i < sendToAmount @@ -825,7 +797,10 @@ func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sen var lastErr error ss.rangePeers(func(peerInfo *PeerInfo) bool { - ss.writePeer("SendMessageToAll", peerInfo, msgcode, req.Data, 0) + if err := ss.writePeer(peerInfo, msgcode, req.Data); err != nil { + lastErr = fmt.Errorf("SendMessageToAll to peer %s: %w", peerInfo.ID(), err) + return true + } reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID())) return true })