From feb79883ac17f03d39e7055e4667f4f839388c36 Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Thu, 16 Dec 2021 22:32:05 +0000 Subject: [PATCH] Txpool: broadcast transaction along with announcements (#3135) * Txpool: broadcast transaction along with announcements * Fix panic * Add TransactionMsg * Change terminology in the logs * Fixes from erigon-lib * Rebroadcast txs promoted to pending subpool * Deduplicate promoted hashes, fix basefee promotion * Make sending more resilient, fix promoted * Split Broadcast and Announce * Downgrade to Debug messages * Limit number of retries in SendMessageByMinBlock Co-authored-by: Alexey Sharp --- cmd/sentry/sentry/downloader.go | 8 ++--- cmd/sentry/sentry/sentry.go | 61 ++++++++++++++++----------------- go.mod | 2 +- go.sum | 6 ++-- 4 files changed, 39 insertions(+), 38 deletions(-) diff --git a/cmd/sentry/sentry/downloader.go b/cmd/sentry/sentry/downloader.go index 9d1411079..26035189d 100644 --- a/cmd/sentry/sentry/downloader.go +++ b/cmd/sentry/sentry/downloader.go @@ -80,7 +80,7 @@ func RecvUploadMessageLoop(ctx context.Context, time.Sleep(time.Second) continue } - log.Warn("[RecvUploadMessage]", "err", err) + log.Debug("[RecvUploadMessage]", "err", err) continue } } @@ -450,7 +450,7 @@ func (cs *ControlServerImpl) blockHeaders66(ctx context.Context, in *proto_sentr func (cs *ControlServerImpl) blockHeaders(ctx context.Context, pkt eth.BlockHeadersPacket, rlpStream *rlp.Stream, peerID *proto_types.H256, sentry direct.SentryClient) error { // Stream is at the BlockHeadersPacket, which is list of headers if _, err := rlpStream.List(); err != nil { - return fmt.Errorf("decode 2 BlockHeadersPacket65: %w", err) + return fmt.Errorf("decode 2 BlockHeadersPacket66: %w", err) } // Extract headers from the block var highestBlock uint64 @@ -458,7 +458,7 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, pkt eth.BlockHead for _, header := range pkt { headerRaw, err := rlpStream.Raw() if err != nil { - return fmt.Errorf("decode 3 BlockHeadersPacket65: %w", err) + return fmt.Errorf("decode 3 BlockHeadersPacket66: %w", err) } number := header.Number.Uint64() if number > highestBlock { @@ -630,7 +630,7 @@ func (cs *ControlServerImpl) getBlockHeaders66(ctx context.Context, inreq *proto _, err = sentry.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{}) if err != nil { if !isPeerNotFoundErr(err) { - return fmt.Errorf("send header response 65: %w", err) + return fmt.Errorf("send header response 66: %w", err) } return fmt.Errorf("send header response 66: %w", err) } diff --git a/cmd/sentry/sentry/sentry.go b/cmd/sentry/sentry/sentry.go index f70a839d4..e938d5986 100644 --- a/cmd/sentry/sentry/sentry.go +++ b/cmd/sentry/sentry/sentry.go @@ -698,35 +698,31 @@ func (ss *SentryServerImpl) findPeer(minBlock uint64) (*PeerInfo, bool) { func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *proto_sentry.SendMessageByMinBlockRequest) (*proto_sentry.SentPeers, error) { reply := &proto_sentry.SentPeers{} - - peerInfo, found := ss.findPeer(inreq.MinBlock) - if !found { - return reply, nil - } msgcode := eth.FromProto[ss.Protocol.Version][inreq.Data.Id] if msgcode != eth.GetBlockHeadersMsg && msgcode != eth.GetBlockBodiesMsg && msgcode != eth.GetPooledTransactionsMsg { return reply, fmt.Errorf("sendMessageByMinBlock not implemented for message Id: %s", inreq.Data.Id) } - if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil { - return reply, fmt.Errorf("sendMessageByMinBlock to peer %s: %w", peerInfo.ID(), err) + + var lastErr error + for retry := 0; retry < 16 && len(reply.Peers) == 0; retry++ { // limit number of retries + peerInfo, found := ss.findPeer(inreq.MinBlock) + if !found { + break + } + if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil { + lastErr = fmt.Errorf("sendMessageByMinBlock to peer %s: %w", peerInfo.ID(), err) + } else { + peerInfo.AddDeadline(time.Now().Add(30 * time.Second)) + reply.Peers = []*proto_types.H256{gointerfaces.ConvertHashToH256(peerInfo.ID())} + } } - peerInfo.AddDeadline(time.Now().Add(30 * time.Second)) - reply.Peers = []*proto_types.H256{gointerfaces.ConvertHashToH256(peerInfo.ID())} - return reply, nil + return reply, lastErr } func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sentry.SendMessageByIdRequest) (*proto_sentry.SentPeers, error) { reply := &proto_sentry.SentPeers{} - - peerID := ConvertH256ToPeerID(inreq.PeerId) - peerInfo := ss.getPeer(peerID) - if peerInfo == nil { - //TODO: enable after support peer to sentry mapping - //return reply, fmt.Errorf("peer not found: %s", peerID) - return reply, nil - } msgcode := eth.FromProto[ss.Protocol.Version][inreq.Data.Id] if msgcode != eth.GetBlockHeadersMsg && msgcode != eth.BlockHeadersMsg && @@ -739,6 +735,14 @@ func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sent return reply, fmt.Errorf("sendMessageById not implemented for message Id: %s", inreq.Data.Id) } + peerID := ConvertH256ToPeerID(inreq.PeerId) + peerInfo := ss.getPeer(peerID) + if peerInfo == nil { + //TODO: enable after support peer to sentry mapping + //return reply, fmt.Errorf("peer not found: %s", peerID) + return reply, nil + } + if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil { return reply, fmt.Errorf("sendMessageById to peer %s: %w", peerID, err) } @@ -752,7 +756,8 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p msgcode := eth.FromProto[ss.Protocol.Version][req.Data.Id] if msgcode != eth.NewBlockMsg && msgcode != eth.NewBlockHashesMsg && - msgcode != eth.NewPooledTransactionHashesMsg { + msgcode != eth.NewPooledTransactionHashesMsg && + msgcode != eth.TransactionsMsg { return reply, fmt.Errorf("sendMessageToRandomPeers not implemented for message Id: %s", req.Data.Id) } @@ -768,20 +773,17 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p // Send the block to a subset of our peers sendToAmount := int(math.Sqrt(float64(amount))) i := 0 - var innerErr error + var lastErr error ss.rangePeers(func(peerInfo *PeerInfo) bool { if err := ss.writePeer(peerInfo, msgcode, req.Data.Data); err != nil { - innerErr = err + lastErr = fmt.Errorf("sendMessageToRandomPeers to peer %s: %w", peerInfo.ID(), err) return true } reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID())) i++ return i < sendToAmount }) - if innerErr != nil { - return reply, fmt.Errorf("sendMessageToRandomPeers to peer %w", innerErr) - } - return reply, nil + return reply, lastErr } func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sentry.OutboundMessageData) (*proto_sentry.SentPeers, error) { @@ -794,19 +796,16 @@ func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sen return reply, fmt.Errorf("sendMessageToAll not implemented for message Id: %s", req.Id) } - var innerErr error + var lastErr error ss.rangePeers(func(peerInfo *PeerInfo) bool { if err := ss.writePeer(peerInfo, msgcode, req.Data); err != nil { - innerErr = err + lastErr = fmt.Errorf("SendMessageToAll to peer %s: %w", peerInfo.ID(), err) return true } reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID())) return true }) - if innerErr != nil { - return reply, fmt.Errorf("sendMessageToRandomPeers to peer %w", innerErr) - } - return reply, nil + return reply, lastErr } func (ss *SentryServerImpl) HandShake(context.Context, *emptypb.Empty) (*proto_sentry.HandShakeReply, error) { diff --git a/go.mod b/go.mod index d9022f8f5..5871f7944 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/json-iterator/go v1.1.12 github.com/julienschmidt/httprouter v1.3.0 github.com/kevinburke/go-bindata v3.21.0+incompatible - github.com/ledgerwatch/erigon-lib v0.0.0-20211214133332-bb6dfef7c845 + github.com/ledgerwatch/erigon-lib v0.0.0-20211216213333-d79d43f9085d github.com/ledgerwatch/log/v3 v3.4.0 github.com/ledgerwatch/secp256k1 v1.0.0 github.com/logrusorgru/aurora/v3 v3.0.0 diff --git a/go.sum b/go.sum index 36144c821..5042effd3 100644 --- a/go.sum +++ b/go.sum @@ -617,8 +617,10 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20211214133332-bb6dfef7c845 h1:K7Zg42nG8GGnrkALkHECx+RQgpJ2neluY5PrcwpDZfQ= -github.com/ledgerwatch/erigon-lib v0.0.0-20211214133332-bb6dfef7c845/go.mod h1:lyGP3i0x4CeabdKZ4beycD5xZfHWZwJsAX+70OfGj4Y= +github.com/ledgerwatch/erigon-lib v0.0.0-20211216211604-7f6eb71c4c77 h1:zOn25ZPWoHEjTxYaKGzWMvBZHCdWYmWNP1xGSE0aFVI= +github.com/ledgerwatch/erigon-lib v0.0.0-20211216211604-7f6eb71c4c77/go.mod h1:lyGP3i0x4CeabdKZ4beycD5xZfHWZwJsAX+70OfGj4Y= +github.com/ledgerwatch/erigon-lib v0.0.0-20211216213333-d79d43f9085d h1:yPC/BYbfbuantLNUHc6x9f16Yap5EiyzyFBCY/eOJdE= +github.com/ledgerwatch/erigon-lib v0.0.0-20211216213333-d79d43f9085d/go.mod h1:lyGP3i0x4CeabdKZ4beycD5xZfHWZwJsAX+70OfGj4Y= github.com/ledgerwatch/log/v3 v3.4.0 h1:SEIOcv5a2zkG3PmoT5jeTU9m/0nEUv0BJS5bzsjwKCI= github.com/ledgerwatch/log/v3 v3.4.0/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY= github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=