Revert "save"

This reverts commit 5ad09bfef3.
This commit is contained in:
alex.sharov 2022-02-02 09:58:34 +07:00
parent 5ad09bfef3
commit 7e93a08155

View File

@ -55,20 +55,7 @@ type PeerInfo struct {
deadlines []time.Time // Request deadlines
height uint64
rw p2p.MsgReadWriter
removed chan struct{} // close this channel on remove
removeOnce sync.Once
tasks chan func()
}
func NewPeerInfo(peer *p2p.Peer, rw p2p.MsgReadWriter) *PeerInfo {
p := &PeerInfo{peer: peer, rw: rw, removed: make(chan struct{}), tasks: make(chan func(), 32)}
go func() { // each peer has own worker, then slow
for f := range p.tasks {
f()
}
}()
return p
removed bool
}
func (pi *PeerInfo) ID() enode.ID {
@ -118,36 +105,15 @@ func (pi *PeerInfo) ClearDeadlines(now time.Time, givePermit bool) int {
}
func (pi *PeerInfo) Remove() {
pi.removeOnce.Do(func() {
close(pi.removed)
close(pi.tasks)
})
}
func (pi *PeerInfo) Async(f func()) {
select {
case <-pi.removed: // noop if peer removed
case pi.tasks <- f:
fmt.Printf("check: %d, %d\n", len(pi.tasks), cap(pi.tasks))
if len(pi.tasks) == cap(pi.tasks) { // if channel full - loose old messages
for i := 0; i < cap(pi.tasks)/2; i++ {
select {
case <-pi.tasks:
default:
}
}
fmt.Printf("lost some messages\n")
}
}
pi.lock.Lock()
defer pi.lock.Unlock()
pi.removed = true
}
func (pi *PeerInfo) Removed() bool {
select {
case <-pi.removed:
return true
default:
return false
}
pi.lock.RLock()
defer pi.lock.RUnlock()
return pi.removed
}
// ConvertH256ToPeerID() ensures the return type is enode.ID rather than [32]byte
@ -545,7 +511,10 @@ func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNod
}
log.Trace(fmt.Sprintf("[%s] Start with peer", peerID))
peerInfo := NewPeerInfo(peer, rw)
peerInfo := &PeerInfo{
peer: peer,
rw: rw,
}
defer ss.GoodPeers.Delete(peerID)
err := handShake(ctx, ss.GetStatus(), peerID, rw, protocol, protocol, func(bestHash common.Hash) error {
@ -652,19 +621,13 @@ func (ss *SentryServerImpl) removePeer(peerID enode.ID) {
}
}
func (ss *SentryServerImpl) writePeer(logPrefix string, peerInfo *PeerInfo, msgcode uint64, data []byte, ttl time.Duration) {
peerInfo.Async(func() {
func (ss *SentryServerImpl) writePeer(peerInfo *PeerInfo, msgcode uint64, data []byte) error {
err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(data)), Payload: bytes.NewReader(data)})
if err != nil {
peerInfo.Remove()
ss.GoodPeers.Delete(peerInfo.ID())
log.Debug(logPrefix, "msgcode", msgcode, "err", err)
} else {
if ttl > 0 {
peerInfo.AddDeadline(time.Now().Add(ttl))
}
}
})
return err
}
func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash, peerID enode.ID) error {
@ -747,9 +710,13 @@ func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *prot
if !found {
break
}
ss.writePeer("sendMessageByMinBlock", peerInfo, msgcode, inreq.Data.Data, 30*time.Second)
if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil {
lastErr = fmt.Errorf("sendMessageByMinBlock to peer %s: %w", peerInfo.ID(), err)
} else {
peerInfo.AddDeadline(time.Now().Add(30 * time.Second))
reply.Peers = []*proto_types.H256{gointerfaces.ConvertHashToH256(peerInfo.ID())}
}
}
return reply, lastErr
}
@ -775,7 +742,9 @@ func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sent
return reply, nil
}
ss.writePeer("sendMessageById", peerInfo, msgcode, inreq.Data.Data, 0)
if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil {
return reply, fmt.Errorf("sendMessageById to peer %s: %w", peerID, err)
}
reply.Peers = []*proto_types.H256{inreq.PeerId}
return reply, nil
}
@ -805,7 +774,10 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p
i := 0
var lastErr error
ss.rangePeers(func(peerInfo *PeerInfo) bool {
ss.writePeer("sendMessageToRandomPeers", peerInfo, msgcode, req.Data.Data, 0)
if err := ss.writePeer(peerInfo, msgcode, req.Data.Data); err != nil {
lastErr = fmt.Errorf("sendMessageToRandomPeers to peer %s: %w", peerInfo.ID(), err)
return true
}
reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID()))
i++
return i < sendToAmount
@ -825,7 +797,10 @@ func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sen
var lastErr error
ss.rangePeers(func(peerInfo *PeerInfo) bool {
ss.writePeer("SendMessageToAll", peerInfo, msgcode, req.Data, 0)
if err := ss.writePeer(peerInfo, msgcode, req.Data); err != nil {
lastErr = fmt.Errorf("SendMessageToAll to peer %s: %w", peerInfo.ID(), err)
return true
}
reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID()))
return true
})