txPool: propagate on peer connect (#2335)

This commit is contained in:
Alex Sharov 2021-07-11 07:01:16 +00:00 committed by GitHub
parent e1c17e035a
commit 188dfb14b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 501 additions and 96 deletions

View File

@ -3,8 +3,10 @@ package download
import (
"context"
"math/big"
"strings"
proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/protocols/eth"
@ -101,6 +103,9 @@ func (cs *ControlServerImpl) BroadcastNewBlock(ctx context.Context, block *types
}
if _, err = sentry.SendMessageToRandomPeers(ctx, req65, &grpc.EmptyCallOption{}); err != nil {
if isPeerNotFoundErr(err) {
continue
}
log.Error("broadcastNewBlock", "error", err)
}
@ -115,29 +120,35 @@ func (cs *ControlServerImpl) BroadcastNewBlock(ctx context.Context, block *types
}
}
if _, err = sentry.SendMessageToRandomPeers(ctx, req66, &grpc.EmptyCallOption{}); err != nil {
if isPeerNotFoundErr(err) {
continue
}
log.Error("broadcastNewBlock", "error", err)
}
continue
}
}
}
func (cs *ControlServerImpl) BroadcastNewTxs(ctx context.Context, txs []types.Transaction) {
func (cs *ControlServerImpl) BroadcastPooledTxs(ctx context.Context, txs []common.Hash) {
if len(txs) == 0 {
return
}
cs.lock.RLock()
defer cs.lock.RUnlock()
for len(txs) > 0 {
pendingLen := maxTxPacketSize / common.HashLength
pending := make([]common.Hash, 0, pendingLen)
for i := 0; i < pendingLen && i < len(txs); i++ {
pending = append(pending, txs[i].Hash())
pending = append(pending, txs[i])
}
txs = txs[len(pending):]
data, err := rlp.EncodeToBytes(eth.NewPooledTransactionHashesPacket(pending))
if err != nil {
log.Error("broadcastNewBlock", "error", err)
log.Error("BroadcastPooledTxs", "error", err)
}
var req66, req65 *proto_sentry.SendMessageToRandomPeersRequest
for _, sentry := range cs.sentries {
@ -158,7 +169,10 @@ func (cs *ControlServerImpl) BroadcastNewTxs(ctx context.Context, txs []types.Tr
}
if _, err = sentry.SendMessageToRandomPeers(ctx, req65, &grpc.EmptyCallOption{}); err != nil {
log.Error("broadcastNewBlock", "error", err)
if isPeerNotFoundErr(err) {
continue
}
log.Error("BroadcastPooledTxs", "error", err)
}
case eth.ETH66:
@ -172,10 +186,81 @@ func (cs *ControlServerImpl) BroadcastNewTxs(ctx context.Context, txs []types.Tr
}
}
if _, err = sentry.SendMessageToRandomPeers(ctx, req66, &grpc.EmptyCallOption{}); err != nil {
log.Error("broadcastNewBlock", "error", err)
if isPeerNotFoundErr(err) {
continue
}
log.Error("BroadcastPooledTxs", "error", err)
}
continue
}
}
}
}
func (cs *ControlServerImpl) PropagatePooledTxsToPeersList(ctx context.Context, peers []*types2.H512, txs []common.Hash) {
if len(txs) == 0 {
return
}
cs.lock.RLock()
defer cs.lock.RUnlock()
for len(txs) > 0 {
pendingLen := maxTxPacketSize / common.HashLength
pending := make([]common.Hash, 0, pendingLen)
for i := 0; i < pendingLen && i < len(txs); i++ {
pending = append(pending, txs[i])
}
txs = txs[len(pending):]
data, err := rlp.EncodeToBytes(eth.NewPooledTransactionHashesPacket(pending))
if err != nil {
log.Error("PropagatePooledTxsToPeersList", "error", err)
}
for _, sentry := range cs.sentries {
if !sentry.Ready() {
continue
}
for _, peer := range peers {
switch sentry.Protocol() {
case eth.ETH65:
req65 := &proto_sentry.SendMessageByIdRequest{
PeerId: peer,
Data: &proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_65,
Data: data,
},
}
if _, err = sentry.SendMessageById(ctx, req65, &grpc.EmptyCallOption{}); err != nil {
if isPeerNotFoundErr(err) {
continue
}
log.Error("broadcastNewBlock", "error", err)
}
case eth.ETH66:
req66 := &proto_sentry.SendMessageByIdRequest{
PeerId: peer,
Data: &proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66,
Data: data,
},
}
if _, err = sentry.SendMessageById(ctx, req66, &grpc.EmptyCallOption{}); err != nil {
if isPeerNotFoundErr(err) {
continue
}
log.Error("PropagatePooledTxsToPeersList", "error", err)
}
}
}
}
}
}
func isPeerNotFoundErr(err error) bool {
return strings.Contains(err.Error(), "peer not found")
}

View File

@ -154,7 +154,6 @@ func RecvMessageLoop(ctx context.Context,
}
if err := RecvMessage(ctx, sentry, cs.HandleInboundMessage, wg); err != nil {
log.Error("[RecvMessage]", "err", err)
}
}
}
@ -631,6 +630,9 @@ func (cs *ControlServerImpl) getBlockHeaders66(ctx context.Context, inreq *proto
}
_, err = sentry.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{})
if err != nil {
if !isPeerNotFoundErr(err) {
return fmt.Errorf("send header response 65: %v", err)
}
return fmt.Errorf("send header response 66: %v", err)
}
//log.Info(fmt.Sprintf("[%s] GetBlockHeaderMsg{hash=%x, number=%d, amount=%d, skip=%d, reverse=%t, responseLen=%d}", string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), query.Origin.Hash, query.Origin.Number, query.Amount, query.Skip, query.Reverse, len(b)))
@ -666,7 +668,9 @@ func (cs *ControlServerImpl) getBlockHeaders65(ctx context.Context, inreq *proto
}
_, err = sentry.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{})
if err != nil {
return fmt.Errorf("send header response 65: %v", err)
if !isPeerNotFoundErr(err) {
return fmt.Errorf("send header response 65: %v", err)
}
}
//log.Info(fmt.Sprintf("[%s] GetBlockHeaderMsg{hash=%x, number=%d, amount=%d, skip=%d, reverse=%t, responseLen=%d}", string(gointerfaces.ConvertH512ToBytes(inreq.PeerId)), query.Origin.Hash, query.Origin.Number, query.Amount, query.Skip, query.Reverse, len(b)))
return nil

View File

@ -466,8 +466,9 @@ func grpcSentryServer(ctx context.Context, sentryAddr string, ss *SentryServerIm
func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNodeInfo func() *eth.NodeInfo, cfg *p2p.Config, protocol uint) *SentryServerImpl {
ss := &SentryServerImpl{
ctx: ctx,
p2p: cfg,
ctx: ctx,
p2p: cfg,
peersStreams: NewPeersStreams(),
}
if protocol != eth.ETH65 && protocol != eth.ETH66 {
@ -481,7 +482,7 @@ func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNod
DialCandidates: dialCandidates,
Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
peerID := peer.ID().String()
if _, ok := ss.Peers.Load(peerID); ok {
if _, ok := ss.GoodPeers.Load(peerID); ok {
log.Debug(fmt.Sprintf("[%s] Peer already has connection", peerID))
return nil
}
@ -492,15 +493,15 @@ func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNod
rw: rw,
}
defer ss.Peers.Delete(peerID)
defer ss.GoodPeers.Delete(peerID)
err := handShake(ctx, ss.GetStatus(), peerID, rw, protocol, protocol, func(bestHash common.Hash) error {
ss.Peers.Store(peerID, peerInfo)
ss.GoodPeers.Store(peerID, peerInfo)
ss.sendNewPeerToClients(gointerfaces.ConvertBytesToH512([]byte(peerID)))
return ss.startSync(ctx, bestHash, peerID)
})
if err != nil {
return fmt.Errorf("handshake to peer %s: %v", peerID, err)
}
ss.Peers.Store(peerID, peerInfo) // TODO: This means potentially setting this twice, first time few lines above
log.Debug(fmt.Sprintf("[%s] Received status message OK", peerID), "name", peer.Name())
if err := runPeer(
@ -520,7 +521,7 @@ func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNod
return readNodeInfo()
},
PeerInfo: func(id enode.ID) interface{} {
p, ok := ss.Peers.Load(id.String())
p, ok := ss.GoodPeers.Load(id.String())
if !ok {
return nil
}
@ -552,17 +553,18 @@ func Sentry(datadir string, sentryAddr string, discoveryDNS []string, cfg *p2p.C
type SentryServerImpl struct {
proto_sentry.UnimplementedSentryServer
ctx context.Context
Protocol p2p.Protocol
discoveryDNS []string
Peers sync.Map
statusData *proto_sentry.StatusData
P2pServer *p2p.Server
TxSubscribed uint32 // Set to non-zero if downloader is subscribed to transaction messages
lock sync.RWMutex
streams map[proto_sentry.MessageId]*StreamsList
streamsLock sync.RWMutex
p2p *p2p.Config
ctx context.Context
Protocol p2p.Protocol
discoveryDNS []string
GoodPeers sync.Map
statusData *proto_sentry.StatusData
P2pServer *p2p.Server
TxSubscribed uint32 // Set to non-zero if downloader is subscribed to transaction messages
lock sync.RWMutex
messageStreams map[proto_sentry.MessageId]*MessageStreams
messageStreamsLock sync.RWMutex
peersStreams *PeersStreams
p2p *p2p.Config
}
func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash, peerID string) error {
@ -610,7 +612,6 @@ func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash,
}); err != nil {
return err
}
}
return nil
}
@ -618,19 +619,19 @@ func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash,
func (ss *SentryServerImpl) PenalizePeer(_ context.Context, req *proto_sentry.PenalizePeerRequest) (*empty.Empty, error) {
//log.Warn("Received penalty", "kind", req.GetPenalty().Descriptor().FullName, "from", fmt.Sprintf("%s", req.GetPeerId()))
strId := string(gointerfaces.ConvertH512ToBytes(req.PeerId))
if x, ok := ss.Peers.Load(strId); ok {
if x, ok := ss.GoodPeers.Load(strId); ok {
peerInfo := x.(*PeerInfo)
if peerInfo != nil {
peerInfo.Remove()
}
}
ss.Peers.Delete(strId)
ss.GoodPeers.Delete(strId)
return &empty.Empty{}, nil
}
func (ss *SentryServerImpl) PeerMinBlock(_ context.Context, req *proto_sentry.PeerMinBlockRequest) (*empty.Empty, error) {
peerID := string(gointerfaces.ConvertH512ToBytes(req.PeerId))
x, _ := ss.Peers.Load(peerID)
x, _ := ss.GoodPeers.Load(peerID)
peerInfo, _ := x.(*PeerInfo)
if peerInfo == nil {
return &empty.Empty{}, nil
@ -649,9 +650,9 @@ func (ss *SentryServerImpl) findPeer(minBlock uint64) (string, *PeerInfo, bool)
var foundPeerInfo *PeerInfo
var maxPermits int
now := time.Now()
ss.Peers.Range(func(key, value interface{}) bool {
ss.GoodPeers.Range(func(key, value interface{}) bool {
peerID := key.(string)
x, _ := ss.Peers.Load(peerID)
x, _ := ss.GoodPeers.Load(peerID)
peerInfo, _ := x.(*PeerInfo)
if peerInfo == nil {
return true
@ -685,13 +686,13 @@ func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *prot
return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageByMinBlock not implemented for message Id: %s", inreq.Data.Id)
}
if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(inreq.Data.Data)), Payload: bytes.NewReader(inreq.Data.Data)}); err != nil {
if x, ok := ss.Peers.Load(peerID); ok {
if x, ok := ss.GoodPeers.Load(peerID); ok {
peerInfo := x.(*PeerInfo)
if peerInfo != nil {
peerInfo.Remove()
}
}
ss.Peers.Delete(peerID)
ss.GoodPeers.Delete(peerID)
return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageByMinBlock to peer %s: %v", peerID, err)
}
peerInfo.AddDeadline(time.Now().Add(30 * time.Second))
@ -700,7 +701,7 @@ func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *prot
func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sentry.SendMessageByIdRequest) (*proto_sentry.SentPeers, error) {
peerID := string(gointerfaces.ConvertH512ToBytes(inreq.PeerId))
x, ok := ss.Peers.Load(peerID)
x, ok := ss.GoodPeers.Load(peerID)
if !ok {
return &proto_sentry.SentPeers{}, fmt.Errorf("peer not found: %s", peerID)
}
@ -711,19 +712,20 @@ func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sent
msgcode != eth.BlockBodiesMsg &&
msgcode != eth.GetReceiptsMsg &&
msgcode != eth.ReceiptsMsg &&
msgcode != eth.NewPooledTransactionHashesMsg &&
msgcode != eth.PooledTransactionsMsg &&
msgcode != eth.GetPooledTransactionsMsg {
return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageById not implemented for message Id: %s", inreq.Data.Id)
}
if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(inreq.Data.Data)), Payload: bytes.NewReader(inreq.Data.Data)}); err != nil {
if x, ok := ss.Peers.Load(peerID); ok {
if x, ok := ss.GoodPeers.Load(peerID); ok {
peerInfo := x.(*PeerInfo)
if peerInfo != nil {
peerInfo.Remove()
}
}
ss.Peers.Delete(peerID)
ss.GoodPeers.Delete(peerID)
return &proto_sentry.SentPeers{}, fmt.Errorf("sendMessageById to peer %s: %v", peerID, err)
}
return &proto_sentry.SentPeers{Peers: []*proto_types.H512{inreq.PeerId}}, nil
@ -738,7 +740,7 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p
}
amount := uint64(0)
ss.Peers.Range(func(key, value interface{}) bool {
ss.GoodPeers.Range(func(key, value interface{}) bool {
amount++
return true
})
@ -751,7 +753,7 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p
i := 0
var innerErr error
reply := &proto_sentry.SentPeers{Peers: []*proto_types.H512{}}
ss.Peers.Range(func(key, value interface{}) bool {
ss.GoodPeers.Range(func(key, value interface{}) bool {
peerID := key.(string)
peerInfo, _ := value.(*PeerInfo)
if peerInfo == nil {
@ -759,7 +761,7 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p
}
if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(req.Data.Data)), Payload: bytes.NewReader(req.Data.Data)}); err != nil {
peerInfo.Remove()
ss.Peers.Delete(peerID)
ss.GoodPeers.Delete(peerID)
innerErr = err
return true
}
@ -781,7 +783,7 @@ func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sen
var innerErr error
reply := &proto_sentry.SentPeers{Peers: []*proto_types.H512{}}
ss.Peers.Range(func(key, value interface{}) bool {
ss.GoodPeers.Range(func(key, value interface{}) bool {
peerID := key.(string)
peerInfo, _ := value.(*PeerInfo)
if peerInfo == nil {
@ -789,7 +791,7 @@ func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sen
}
if err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(req.Data)), Payload: bytes.NewReader(req.Data)}); err != nil {
peerInfo.Remove()
ss.Peers.Delete(peerID)
ss.GoodPeers.Delete(peerID)
innerErr = err
return true
}
@ -844,9 +846,9 @@ func (ss *SentryServerImpl) SetStatus(_ context.Context, statusData *proto_sentr
}
func (ss *SentryServerImpl) SimplePeerCount() (pc int) {
ss.Peers.Range(func(key, value interface{}) bool {
ss.GoodPeers.Range(func(key, value interface{}) bool {
peerID := key.(string)
x, _ := ss.Peers.Load(peerID)
x, _ := ss.GoodPeers.Load(peerID)
peerInfo, _ := x.(*PeerInfo)
if peerInfo == nil {
return true
@ -878,9 +880,9 @@ func (ss *SentryServerImpl) GetStatus() *proto_sentry.StatusData {
}
func (ss *SentryServerImpl) send(msgID proto_sentry.MessageId, peerID string, b []byte) {
ss.streamsLock.RLock()
defer ss.streamsLock.RUnlock()
errs := ss.streams[msgID].Broadcast(&proto_sentry.InboundMessage{
ss.messageStreamsLock.RLock()
defer ss.messageStreamsLock.RUnlock()
errs := ss.messageStreams[msgID].Broadcast(&proto_sentry.InboundMessage{
PeerId: gointerfaces.ConvertBytesToH512([]byte(peerID)),
Id: msgID,
Data: b,
@ -891,25 +893,25 @@ func (ss *SentryServerImpl) send(msgID proto_sentry.MessageId, peerID string, b
}
func (ss *SentryServerImpl) hasSubscribers(msgID proto_sentry.MessageId) bool {
ss.streamsLock.RLock()
defer ss.streamsLock.RUnlock()
return ss.streams[msgID] != nil && ss.streams[msgID].Len() > 0
ss.messageStreamsLock.RLock()
defer ss.messageStreamsLock.RUnlock()
return ss.messageStreams[msgID] != nil && ss.messageStreams[msgID].Len() > 0
// log.Error("Sending msg to core P2P failed", "msg", proto_sentry.MessageId_name[int32(streamMsg.msgId)], "error", err)
}
func (ss *SentryServerImpl) addStream(ids []proto_sentry.MessageId, server proto_sentry.Sentry_MessagesServer) func() {
ss.streamsLock.Lock()
defer ss.streamsLock.Unlock()
if ss.streams == nil {
ss.streams = map[proto_sentry.MessageId]*StreamsList{}
func (ss *SentryServerImpl) addMessagesStream(ids []proto_sentry.MessageId, server proto_sentry.Sentry_MessagesServer) func() {
ss.messageStreamsLock.Lock()
defer ss.messageStreamsLock.Unlock()
if ss.messageStreams == nil {
ss.messageStreams = map[proto_sentry.MessageId]*MessageStreams{}
}
cleanStack := make([]func(), len(ids))
for i, id := range ids {
m, ok := ss.streams[id]
m, ok := ss.messageStreams[id]
if !ok {
m = NewStreamsList()
ss.streams[id] = m
ss.messageStreams[id] = m
}
cleanStack[i] = m.Add(server)
@ -923,7 +925,7 @@ func (ss *SentryServerImpl) addStream(ids []proto_sentry.MessageId, server proto
func (ss *SentryServerImpl) Messages(req *proto_sentry.MessagesRequest, server proto_sentry.Sentry_MessagesServer) error {
log.Debug(fmt.Sprintf("[Messages] new subscriber to: %s", req.Ids))
clean := ss.addStream(req.Ids, server)
clean := ss.addMessagesStream(req.Ids, server)
defer clean()
select {
case <-ss.ctx.Done():
@ -933,18 +935,18 @@ func (ss *SentryServerImpl) Messages(req *proto_sentry.MessagesRequest, server p
}
}
// StreamsList - it's safe to use this class as non-pointer
type StreamsList struct {
// MessageStreams - it's safe to use this class as non-pointer
type MessageStreams struct {
sync.RWMutex
id uint
streams map[uint]proto_sentry.Sentry_MessagesServer
}
func NewStreamsList() *StreamsList {
return &StreamsList{}
func NewStreamsList() *MessageStreams {
return &MessageStreams{}
}
func (s *StreamsList) Add(stream proto_sentry.Sentry_MessagesServer) (remove func()) {
func (s *MessageStreams) Add(stream proto_sentry.Sentry_MessagesServer) (remove func()) {
s.Lock()
defer s.Unlock()
if s.streams == nil {
@ -956,7 +958,7 @@ func (s *StreamsList) Add(stream proto_sentry.Sentry_MessagesServer) (remove fun
return func() { s.remove(id) }
}
func (s *StreamsList) doBroadcast(reply *proto_sentry.InboundMessage) (ids []uint, errs []error) {
func (s *MessageStreams) doBroadcast(reply *proto_sentry.InboundMessage) (ids []uint, errs []error) {
s.RLock()
defer s.RUnlock()
for id, stream := range s.streams {
@ -973,7 +975,7 @@ func (s *StreamsList) doBroadcast(reply *proto_sentry.InboundMessage) (ids []uin
return
}
func (s *StreamsList) Broadcast(reply *proto_sentry.InboundMessage) (errs []error) {
func (s *MessageStreams) Broadcast(reply *proto_sentry.InboundMessage) (errs []error) {
var ids []uint
ids, errs = s.doBroadcast(reply)
if len(ids) > 0 {
@ -986,13 +988,99 @@ func (s *StreamsList) Broadcast(reply *proto_sentry.InboundMessage) (errs []erro
return errs
}
func (s *StreamsList) Len() int {
func (s *MessageStreams) Len() int {
s.RLock()
defer s.RUnlock()
return len(s.streams)
}
func (s *StreamsList) remove(id uint) {
func (s *MessageStreams) remove(id uint) {
s.Lock()
defer s.Unlock()
_, ok := s.streams[id]
if !ok { // double-unsubscribe support
return
}
delete(s.streams, id)
}
func (ss *SentryServerImpl) sendNewPeerToClients(peerID *proto_types.H512) {
if err := ss.peersStreams.Broadcast(&proto_sentry.PeersReply{PeerId: peerID, Event: proto_sentry.PeersReply_Connect}); err != nil {
log.Error("Sending new peer notice to core P2P failed", "error", err)
}
}
func (ss *SentryServerImpl) Peers(req *proto_sentry.PeersRequest, server proto_sentry.Sentry_PeersServer) error {
clean := ss.peersStreams.Add(server)
defer clean()
select {
case <-ss.ctx.Done():
return nil
case <-server.Context().Done():
return nil
}
}
// PeersStreams - it's safe to use this class as non-pointer
type PeersStreams struct {
sync.RWMutex
id uint
streams map[uint]proto_sentry.Sentry_PeersServer
}
func NewPeersStreams() *PeersStreams {
return &PeersStreams{}
}
func (s *PeersStreams) Add(stream proto_sentry.Sentry_PeersServer) (remove func()) {
s.Lock()
defer s.Unlock()
if s.streams == nil {
s.streams = make(map[uint]proto_sentry.Sentry_PeersServer)
}
s.id++
id := s.id
s.streams[id] = stream
return func() { s.remove(id) }
}
func (s *PeersStreams) doBroadcast(reply *proto_sentry.PeersReply) (ids []uint, errs []error) {
s.RLock()
defer s.RUnlock()
for id, stream := range s.streams {
err := stream.Send(reply)
if err != nil {
select {
case <-stream.Context().Done():
ids = append(ids, id)
default:
}
errs = append(errs, err)
}
}
return
}
func (s *PeersStreams) Broadcast(reply *proto_sentry.PeersReply) (errs []error) {
var ids []uint
ids, errs = s.doBroadcast(reply)
if len(ids) > 0 {
s.Lock()
defer s.Unlock()
}
for _, id := range ids {
delete(s.streams, id)
}
return errs
}
func (s *PeersStreams) Len() int {
s.RLock()
defer s.RUnlock()
return len(s.streams)
}
func (s *PeersStreams) remove(id uint) {
s.Lock()
defer s.Unlock()
_, ok := s.streams[id]

View File

@ -235,6 +235,14 @@ func (m *txSortedMap) Flatten() types.Transactions {
return txs
}
// AppendHashes to given buffer and return it
func (m *txSortedMap) AppendHashes(buf []common.Hash) []common.Hash {
for _, tx := range m.items {
buf = append(buf, tx.Hash())
}
return buf
}
// LastElement returns the last element of a flattened list, thus, the
// transaction with the highest nonce
func (m *txSortedMap) LastElement() types.Transaction {
@ -399,6 +407,11 @@ func (l *txList) Flatten() types.Transactions {
return l.txs.Flatten()
}
// AppendHashes to given buffer and return it
func (l *txList) AppendHashes(buf []common.Hash) []common.Hash {
return l.txs.AppendHashes(buf)
}
// LastElement returns the last element of a flattened list, thus, the
// transaction with the highest nonce
func (l *txList) LastElement() types.Transaction {

View File

@ -526,6 +526,32 @@ func (pool *TxPool) Pending() (types.TransactionsGroupedBySender, error) {
return pending, nil
}
// AppendHashes to given buffer and return it
func (pool *TxPool) AppendHashes(buf []common.Hash) []common.Hash {
if !pool.IsStarted() {
return buf
}
pool.mu.Lock()
defer pool.mu.Unlock()
for _, list := range pool.pending {
buf = list.AppendHashes(buf)
}
return buf
}
// AppendLocalHashes to given buffer and return it
func (pool *TxPool) AppendLocalHashes(buf []common.Hash) []common.Hash {
if !pool.IsStarted() {
return buf
}
pool.mu.Lock()
defer pool.mu.Unlock()
for txHash := range pool.all.locals {
buf = append(buf, txHash)
}
return buf
}
// Locals retrieves the accounts currently considered local by the pool.
func (pool *TxPool) Locals() []common.Address {
pool.mu.Lock()

View File

@ -389,7 +389,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
backend.sentryServers = append(backend.sentryServers, server65)
backend.sentries = append(backend.sentries, remote.NewSentryClientDirect(eth.ETH65, server65))
go func() {
logEvery := time.NewTicker(60 * time.Second)
logEvery := time.NewTicker(120 * time.Second)
defer logEvery.Stop()
var logItems []interface{}
@ -403,7 +403,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
for _, srv := range backend.sentryServers {
logItems = append(logItems, eth.ProtocolToString[srv.Protocol.Version], strconv.Itoa(srv.SimplePeerCount()))
}
log.Info("[p2p] Peers", logItems...)
log.Info("[p2p] GoodPeers", logItems...)
}
}
}()
@ -440,7 +440,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
return nil, err
}
go txpropagate.BroadcastNewTxsToNetworks(backend.downloadCtx, backend.txPool, backend.downloadServer)
go txpropagate.BroadcastPendingTxsToNetwork(backend.downloadCtx, backend.txPool, backend.txPoolP2PServer.RecentPeers, backend.downloadServer)
go func() {
defer debug.LogPanic()

View File

@ -81,7 +81,6 @@ func SpawnTxPool(s *StageState, tx ethdb.RwTx, cfg TxPoolCfg, quitCh <-chan stru
}
}
return nil
}
func incrementalTxPoolUpdate(logPrefix string, from, to uint64, pool *core.TxPool, tx ethdb.RwTx, quitCh <-chan struct{}) error {

View File

@ -724,7 +724,7 @@ type nodeStats struct {
Syncing bool `json:"syncing"`
Mining bool `json:"mining"`
Hashrate int `json:"hashrate"`
Peers int `json:"peers"`
GoodPeers int `json:"peers"`
GasPrice int `json:"gasPrice"`
Uptime int `json:"uptime"`
}
@ -760,7 +760,7 @@ func (s *Service) reportStats(conn *connWrapper) error {
Active: true,
Mining: mining,
Hashrate: hashrate,
Peers: s.server.PeerCount(),
GoodPeers: s.server.PeerCount(),
GasPrice: gasprice,
Syncing: syncing,
Uptime: 100,

2
go.mod
View File

@ -40,7 +40,7 @@ require (
github.com/json-iterator/go v1.1.11
github.com/julienschmidt/httprouter v1.3.0
github.com/kevinburke/go-bindata v3.21.0+incompatible
github.com/ledgerwatch/erigon-lib v0.0.0-20210701162843-a58e7da26f29
github.com/ledgerwatch/erigon-lib v0.0.0-20210709133046-4df3c6b79da0
github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d
github.com/logrusorgru/aurora v2.0.3+incompatible
github.com/mattn/go-colorable v0.1.7

2
go.sum
View File

@ -634,6 +634,8 @@ github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20210701162843-a58e7da26f29 h1:qITgajvqFhBFsqmcE31u/90/ZKe0aFCJON2VgOrm96k=
github.com/ledgerwatch/erigon-lib v0.0.0-20210701162843-a58e7da26f29/go.mod h1:y8vIF0uAO6raqzgKZ5AILLXC+7gfr8nyRLbacmZrZ8Y=
github.com/ledgerwatch/erigon-lib v0.0.0-20210709133046-4df3c6b79da0 h1:ccfmrKMAnxPE3p6z/h1qlvos8wjMrOcB95ThfR37RjQ=
github.com/ledgerwatch/erigon-lib v0.0.0-20210709133046-4df3c6b79da0/go.mod h1:y8vIF0uAO6raqzgKZ5AILLXC+7gfr8nyRLbacmZrZ8Y=
github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d h1:/IKMrJdfRsoYNc36PXqP4xMH3vhW/8IQyBKGQbKZUno=
github.com/ledgerwatch/secp256k1 v0.0.0-20210626115225-cd5cd00ed72d/go.mod h1:SPmqJFciiF/Q0mPt2jVs2dTr/1TZBTIA+kPMmKgBAak=
github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw=

View File

@ -767,7 +767,7 @@ running:
}
case op := <-srv.peerOp:
// This channel is used by Peers and PeerCount.
// This channel is used by GoodPeers and PeerCount.
op(peers)
srv.peerOpDone <- struct{}{}
@ -1080,7 +1080,7 @@ func (srv *Server) runPeer(p *Peer) {
// Broadcast peer drop to external subscribers. This needs to be
// after the send to delpeer so subscribers have a consistent view of
// the peer set (i.e. Server.Peers() doesn't include the peer when the
// the peer set (i.e. Server.GoodPeers() doesn't include the peer when the
// event is received.
srv.peerFeed.Send(&PeerEvent{
Type: PeerEventTypeDrop,

View File

@ -116,7 +116,7 @@ func TestServerListen(t *testing.T) {
}
peers := srv.Peers()
if !reflect.DeepEqual(peers, []*Peer{peer}) {
t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer})
t.Errorf("GoodPeers mismatch: got %v, want %v", peers, []*Peer{peer})
}
case <-time.After(1 * time.Second):
t.Error("server did not accept within one second")
@ -169,7 +169,7 @@ func TestServerDial(t *testing.T) {
}
peers := srv.Peers()
if !reflect.DeepEqual(peers, []*Peer{peer}) {
t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer})
t.Errorf("GoodPeers mismatch: got %v, want %v", peers, []*Peer{peer})
}
// Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags

View File

@ -163,6 +163,35 @@ func (c *SentryReceiveClientDirect) Context() context.Context {
return c.ctx
}
// implements proto_sentry.Sentry_ReceivePeersServer
type SentryReceivePeersServerDirect struct {
ch chan *proto_sentry.PeersReply
ctx context.Context
grpc.ServerStream
}
func (s *SentryReceivePeersServerDirect) Send(m *proto_sentry.PeersReply) error {
s.ch <- m
return nil
}
func (s *SentryReceivePeersServerDirect) Context() context.Context {
return s.ctx
}
type SentryReceivePeersClientDirect struct {
ch chan *proto_sentry.PeersReply
ctx context.Context
grpc.ClientStream
}
func (c *SentryReceivePeersClientDirect) Recv() (*proto_sentry.PeersReply, error) {
m := <-c.ch
return m, nil
}
func (c *SentryReceivePeersClientDirect) Context() context.Context {
return c.ctx
}
func (c *SentryClientDirect) Messages(ctx context.Context, in *proto_sentry.MessagesRequest, opts ...grpc.CallOption) (proto_sentry.Sentry_MessagesClient, error) {
in.Ids = filterIds(in.Ids, c.Protocol())
messageCh := make(chan *proto_sentry.InboundMessage, 16384)
@ -176,6 +205,18 @@ func (c *SentryClientDirect) Messages(ctx context.Context, in *proto_sentry.Mess
return &SentryReceiveClientDirect{messageCh: messageCh, ctx: ctx}, nil
}
func (c *SentryClientDirect) Peers(ctx context.Context, in *proto_sentry.PeersRequest, opts ...grpc.CallOption) (proto_sentry.Sentry_PeersClient, error) {
messageCh := make(chan *proto_sentry.PeersReply, 16384)
streamServer := &SentryReceivePeersServerDirect{ch: messageCh, ctx: ctx}
go func() {
if err := c.server.Peers(in, streamServer); err != nil {
log.Error("ReceiveMessages returned", "error", err)
}
close(messageCh)
}()
return &SentryReceivePeersClientDirect{ch: messageCh, ctx: ctx}, nil
}
func filterIds(in []proto_sentry.MessageId, protocol uint) (filtered []proto_sentry.MessageId) {
for _, id := range in {
if _, ok := eth.FromProto[protocol][id]; ok {

View File

@ -290,7 +290,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
stagedsync.StageTxPoolCfg(mock.DB, txPool, func() {
mock.StreamWg.Add(1)
go txpool.RecvTxMessageLoop(mock.Ctx, mock.SentryClient, mock.downloader, mock.TxPoolP2PServer.HandleInboundMessage, &mock.ReceiveWg)
go txpropagate.BroadcastNewTxsToNetworks(mock.Ctx, txPool, mock.downloader)
go txpropagate.BroadcastPendingTxsToNetwork(mock.Ctx, txPool, mock.TxPoolP2PServer.RecentPeers, mock.downloader)
mock.StreamWg.Wait()
mock.TxPoolP2PServer.TxFetcher.Start()
}),

View File

@ -322,6 +322,9 @@ func NewStagedSync2(
go func(i int) {
txpool.RecvTxMessageLoop(ctx, txPoolServer.Sentries[i], controlServer, txPoolServer.HandleInboundMessage, nil)
}(i)
go func(i int) {
txpool.RecvPeersLoop(ctx, txPoolServer.Sentries[i], controlServer, txPoolServer.RecentPeers, nil)
}(i)
}
txPoolServer.TxFetcher.Start()
}),

View File

@ -2,29 +2,82 @@ package txpropagate
import (
"context"
"sync"
"time"
"github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon/cmd/sentry/download"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/debug"
"github.com/ledgerwatch/erigon/core"
)
const txChanSize int = 4096
func BroadcastNewTxsToNetworks(ctx context.Context, txPool *core.TxPool, s *download.ControlServerImpl) {
// BroadcastPendingTxsToNetwork - does send to p2p:
// - new txs
// - all pooled txs to recently connected peers
// - all local pooled txs to random peers periodically
func BroadcastPendingTxsToNetwork(ctx context.Context, txPool *core.TxPool, recentPeers *RecentlyConnectedPeers, s *download.ControlServerImpl) {
defer debug.LogPanic()
txsCh := make(chan core.NewTxsEvent, txChanSize)
txsSub := txPool.SubscribeNewTxsEvent(txsCh)
defer txsSub.Unsubscribe()
syncToNewPeersEvery := time.NewTicker(2 * time.Minute)
defer syncToNewPeersEvery.Stop()
broadcastLocalTransactionsEvery := time.NewTicker(5 * time.Minute)
defer broadcastLocalTransactionsEvery.Stop()
pooledTxHashes := make([]common.Hash, 128)
for {
select {
case e := <-txsCh:
s.BroadcastNewTxs(context.Background(), e.Txs)
case <-txsSub.Err():
return
case <-ctx.Done():
return
case e := <-txsCh: // new txs
pooledTxHashes = pooledTxHashes[:0]
for i := range e.Txs {
pooledTxHashes = append(pooledTxHashes, e.Txs[i].Hash())
}
s.BroadcastPooledTxs(ctx, pooledTxHashes)
case <-syncToNewPeersEvery.C: // new peer
newPeers := recentPeers.GetAndClean()
if len(newPeers) == 0 {
continue
}
pooledTxHashes = txPool.AppendHashes(pooledTxHashes[:0])
s.PropagatePooledTxsToPeersList(ctx, newPeers, pooledTxHashes)
case <-broadcastLocalTransactionsEvery.C: // periodically broadcast local txs to random peers
pooledTxHashes = txPool.AppendLocalHashes(pooledTxHashes[:0])
s.BroadcastPooledTxs(ctx, pooledTxHashes)
}
}
}
type RecentlyConnectedPeers struct {
lock sync.RWMutex
peers []*types.H512
}
func (l *RecentlyConnectedPeers) Len() int {
l.lock.RLock()
defer l.lock.RUnlock()
return len(l.peers)
}
func (l *RecentlyConnectedPeers) AddPeer(p *types.H512) {
l.lock.Lock()
defer l.lock.Unlock()
l.peers = append(l.peers, p)
}
func (l *RecentlyConnectedPeers) GetAndClean() []*types.H512 {
l.lock.Lock()
defer l.lock.Unlock()
peers := l.peers
l.peers = nil
return peers
}

View File

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"math/rand"
"strings"
"sync"
"time"
@ -21,6 +22,7 @@ import (
"github.com/ledgerwatch/erigon/log"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/remote"
"github.com/ledgerwatch/erigon/turbo/stages/txpropagate"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -28,17 +30,19 @@ import (
// P2PServer - receiving and sending messages to Sentries
type P2PServer struct {
ctx context.Context
Sentries []remote.SentryClient
TxPool *core.TxPool
TxFetcher *fetcher.TxFetcher
ctx context.Context
Sentries []remote.SentryClient
TxPool *core.TxPool
TxFetcher *fetcher.TxFetcher
RecentPeers *txpropagate.RecentlyConnectedPeers
}
func NewP2PServer(ctx context.Context, sentries []remote.SentryClient, txPool *core.TxPool) (*P2PServer, error) {
cs := &P2PServer{
ctx: ctx,
Sentries: sentries,
TxPool: txPool,
ctx: ctx,
Sentries: sentries,
TxPool: txPool,
RecentPeers: &txpropagate.RecentlyConnectedPeers{},
}
return cs, nil
@ -148,7 +152,6 @@ func (tp *P2PServer) getPooledTransactions65(ctx context.Context, inreq *proto_s
func (tp *P2PServer) SendTxsRequest(ctx context.Context, peerID string, hashes []common.Hash) []byte {
var outreq65, outreq66 *proto_sentry.SendMessageByIdRequest
var lastErr error
// if sentry not found peers to send such message, try next one. stop if found.
for i, ok, next := tp.randSentryIndex(); ok; i, ok = next() {
@ -172,7 +175,10 @@ func (tp *P2PServer) SendTxsRequest(ctx context.Context, peerID string, hashes [
}
if sentPeers, err1 := tp.Sentries[i].SendMessageById(ctx, outreq65, &grpc.EmptyCallOption{}); err1 != nil {
lastErr = err1
if isPeerNotFoundErr(err1) {
continue
}
log.Error("[SendTxsRequest]", "err", err1)
} else if sentPeers != nil && len(sentPeers.Peers) != 0 {
return gointerfaces.ConvertH512ToBytes(sentPeers.Peers[0])
}
@ -194,15 +200,16 @@ func (tp *P2PServer) SendTxsRequest(ctx context.Context, peerID string, hashes [
}
if sentPeers, err1 := tp.Sentries[i].SendMessageById(ctx, outreq66, &grpc.EmptyCallOption{}); err1 != nil {
lastErr = err1
if isPeerNotFoundErr(err1) {
continue
}
log.Error("[SendTxsRequest]", "err", err1)
} else if sentPeers != nil && len(sentPeers.Peers) != 0 {
return gointerfaces.ConvertH512ToBytes(sentPeers.Peers[0])
}
}
}
if lastErr != nil {
log.Error("Could not sent get pooled txs request to any sentry", "error", lastErr)
}
return nil
}
@ -333,3 +340,87 @@ func RecvTxMessage(ctx context.Context,
}
}
}
func RecvPeersLoop(ctx context.Context,
sentry remote.SentryClient,
cs *download.ControlServerImpl,
recentPeers *txpropagate.RecentlyConnectedPeers,
wg *sync.WaitGroup,
) {
for {
select {
case <-ctx.Done():
return
default:
}
if err := download.SentryHandshake(ctx, sentry, cs); err != nil {
log.Error("[RecvPeers] sentry not ready yet", "err", err)
time.Sleep(time.Second)
continue
}
if err := RecvPeers(ctx, sentry, recentPeers, wg); err != nil {
log.Error("[RecvPeers]", "err", err)
}
}
}
// RecvPeers
// wg is used only in tests to avoid time.Sleep. For non-test code wg == nil
func RecvPeers(ctx context.Context,
sentry remote.SentryClient,
recentPeers *txpropagate.RecentlyConnectedPeers,
wg *sync.WaitGroup,
) (err error) {
defer func() { err = debug.ReportPanicAndRecover() }() // avoid crash because Erigon's core does many things
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := sentry.Peers(streamCtx, &proto_sentry.PeersRequest{})
if err != nil {
select {
case <-ctx.Done():
return
default:
}
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
return
}
if errors.Is(err, io.EOF) {
return
}
return err
}
var req *proto_sentry.PeersReply
for req, err = stream.Recv(); ; req, err = stream.Recv() {
if err != nil {
select {
case <-ctx.Done():
return
default:
}
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
return
}
if errors.Is(err, io.EOF) {
return
}
return err
}
if req == nil {
return
}
switch req.Event {
case proto_sentry.PeersReply_Connect:
recentPeers.AddPeer(req.PeerId)
}
if wg != nil {
wg.Done()
}
}
}
func isPeerNotFoundErr(err error) bool {
return strings.Contains(err.Error(), "peer not found")
}