mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2024-12-24 20:47:16 +00:00
save
This commit is contained in:
parent
12e91b39fa
commit
5ad09bfef3
@ -55,7 +55,20 @@ type PeerInfo struct {
|
||||
deadlines []time.Time // Request deadlines
|
||||
height uint64
|
||||
rw p2p.MsgReadWriter
|
||||
removed bool
|
||||
|
||||
removed chan struct{} // close this channel on remove
|
||||
removeOnce sync.Once
|
||||
tasks chan func()
|
||||
}
|
||||
|
||||
func NewPeerInfo(peer *p2p.Peer, rw p2p.MsgReadWriter) *PeerInfo {
|
||||
p := &PeerInfo{peer: peer, rw: rw, removed: make(chan struct{}), tasks: make(chan func(), 32)}
|
||||
go func() { // each peer has own worker, then slow
|
||||
for f := range p.tasks {
|
||||
f()
|
||||
}
|
||||
}()
|
||||
return p
|
||||
}
|
||||
|
||||
func (pi *PeerInfo) ID() enode.ID {
|
||||
@ -105,15 +118,36 @@ func (pi *PeerInfo) ClearDeadlines(now time.Time, givePermit bool) int {
|
||||
}
|
||||
|
||||
func (pi *PeerInfo) Remove() {
|
||||
pi.lock.Lock()
|
||||
defer pi.lock.Unlock()
|
||||
pi.removed = true
|
||||
pi.removeOnce.Do(func() {
|
||||
close(pi.removed)
|
||||
close(pi.tasks)
|
||||
})
|
||||
}
|
||||
|
||||
func (pi *PeerInfo) Async(f func()) {
|
||||
select {
|
||||
case <-pi.removed: // noop if peer removed
|
||||
case pi.tasks <- f:
|
||||
fmt.Printf("check: %d, %d\n", len(pi.tasks), cap(pi.tasks))
|
||||
if len(pi.tasks) == cap(pi.tasks) { // if channel full - loose old messages
|
||||
for i := 0; i < cap(pi.tasks)/2; i++ {
|
||||
select {
|
||||
case <-pi.tasks:
|
||||
default:
|
||||
}
|
||||
}
|
||||
fmt.Printf("lost some messages\n")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (pi *PeerInfo) Removed() bool {
|
||||
pi.lock.RLock()
|
||||
defer pi.lock.RUnlock()
|
||||
return pi.removed
|
||||
select {
|
||||
case <-pi.removed:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// ConvertH256ToPeerID() ensures the return type is enode.ID rather than [32]byte
|
||||
@ -511,10 +545,7 @@ func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNod
|
||||
}
|
||||
log.Trace(fmt.Sprintf("[%s] Start with peer", peerID))
|
||||
|
||||
peerInfo := &PeerInfo{
|
||||
peer: peer,
|
||||
rw: rw,
|
||||
}
|
||||
peerInfo := NewPeerInfo(peer, rw)
|
||||
|
||||
defer ss.GoodPeers.Delete(peerID)
|
||||
err := handShake(ctx, ss.GetStatus(), peerID, rw, protocol, protocol, func(bestHash common.Hash) error {
|
||||
@ -621,13 +652,19 @@ func (ss *SentryServerImpl) removePeer(peerID enode.ID) {
|
||||
}
|
||||
}
|
||||
|
||||
func (ss *SentryServerImpl) writePeer(peerInfo *PeerInfo, msgcode uint64, data []byte) error {
|
||||
err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(data)), Payload: bytes.NewReader(data)})
|
||||
if err != nil {
|
||||
peerInfo.Remove()
|
||||
ss.GoodPeers.Delete(peerInfo.ID())
|
||||
}
|
||||
return err
|
||||
func (ss *SentryServerImpl) writePeer(logPrefix string, peerInfo *PeerInfo, msgcode uint64, data []byte, ttl time.Duration) {
|
||||
peerInfo.Async(func() {
|
||||
err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(data)), Payload: bytes.NewReader(data)})
|
||||
if err != nil {
|
||||
peerInfo.Remove()
|
||||
ss.GoodPeers.Delete(peerInfo.ID())
|
||||
log.Debug(logPrefix, "msgcode", msgcode, "err", err)
|
||||
} else {
|
||||
if ttl > 0 {
|
||||
peerInfo.AddDeadline(time.Now().Add(ttl))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash, peerID enode.ID) error {
|
||||
@ -710,12 +747,8 @@ func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *prot
|
||||
if !found {
|
||||
break
|
||||
}
|
||||
if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil {
|
||||
lastErr = fmt.Errorf("sendMessageByMinBlock to peer %s: %w", peerInfo.ID(), err)
|
||||
} else {
|
||||
peerInfo.AddDeadline(time.Now().Add(30 * time.Second))
|
||||
reply.Peers = []*proto_types.H256{gointerfaces.ConvertHashToH256(peerInfo.ID())}
|
||||
}
|
||||
ss.writePeer("sendMessageByMinBlock", peerInfo, msgcode, inreq.Data.Data, 30*time.Second)
|
||||
reply.Peers = []*proto_types.H256{gointerfaces.ConvertHashToH256(peerInfo.ID())}
|
||||
}
|
||||
return reply, lastErr
|
||||
}
|
||||
@ -742,9 +775,7 @@ func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sent
|
||||
return reply, nil
|
||||
}
|
||||
|
||||
if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil {
|
||||
return reply, fmt.Errorf("sendMessageById to peer %s: %w", peerID, err)
|
||||
}
|
||||
ss.writePeer("sendMessageById", peerInfo, msgcode, inreq.Data.Data, 0)
|
||||
reply.Peers = []*proto_types.H256{inreq.PeerId}
|
||||
return reply, nil
|
||||
}
|
||||
@ -774,10 +805,7 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p
|
||||
i := 0
|
||||
var lastErr error
|
||||
ss.rangePeers(func(peerInfo *PeerInfo) bool {
|
||||
if err := ss.writePeer(peerInfo, msgcode, req.Data.Data); err != nil {
|
||||
lastErr = fmt.Errorf("sendMessageToRandomPeers to peer %s: %w", peerInfo.ID(), err)
|
||||
return true
|
||||
}
|
||||
ss.writePeer("sendMessageToRandomPeers", peerInfo, msgcode, req.Data.Data, 0)
|
||||
reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID()))
|
||||
i++
|
||||
return i < sendToAmount
|
||||
@ -797,10 +825,7 @@ func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sen
|
||||
|
||||
var lastErr error
|
||||
ss.rangePeers(func(peerInfo *PeerInfo) bool {
|
||||
if err := ss.writePeer(peerInfo, msgcode, req.Data); err != nil {
|
||||
lastErr = fmt.Errorf("SendMessageToAll to peer %s: %w", peerInfo.ID(), err)
|
||||
return true
|
||||
}
|
||||
ss.writePeer("SendMessageToAll", peerInfo, msgcode, req.Data, 0)
|
||||
reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID()))
|
||||
return true
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user