Txpool: broadcast transaction along with announcements (#3135)

* Txpool: broadcast transaction along with announcements

* Fix panic

* Add TransactionMsg

* Change terminology in the logs

* Fixes from erigon-lib

* Rebroadcast txs promoted to pending subpool

* Deduplicate promoted hashes, fix basefee promotion

* Make sending more resilient, fix promoted

* Split Broadcast and Announce

* Downgrade to Debug messages

* Limit number of retries in SendMessageByMinBlock

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
This commit is contained in:
ledgerwatch 2021-12-16 22:32:05 +00:00 committed by GitHub
parent c596bb4306
commit feb79883ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 39 additions and 38 deletions

View File

@ -80,7 +80,7 @@ func RecvUploadMessageLoop(ctx context.Context,
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue
} }
log.Warn("[RecvUploadMessage]", "err", err) log.Debug("[RecvUploadMessage]", "err", err)
continue continue
} }
} }
@ -450,7 +450,7 @@ func (cs *ControlServerImpl) blockHeaders66(ctx context.Context, in *proto_sentr
func (cs *ControlServerImpl) blockHeaders(ctx context.Context, pkt eth.BlockHeadersPacket, rlpStream *rlp.Stream, peerID *proto_types.H256, sentry direct.SentryClient) error { func (cs *ControlServerImpl) blockHeaders(ctx context.Context, pkt eth.BlockHeadersPacket, rlpStream *rlp.Stream, peerID *proto_types.H256, sentry direct.SentryClient) error {
// Stream is at the BlockHeadersPacket, which is list of headers // Stream is at the BlockHeadersPacket, which is list of headers
if _, err := rlpStream.List(); err != nil { if _, err := rlpStream.List(); err != nil {
return fmt.Errorf("decode 2 BlockHeadersPacket65: %w", err) return fmt.Errorf("decode 2 BlockHeadersPacket66: %w", err)
} }
// Extract headers from the block // Extract headers from the block
var highestBlock uint64 var highestBlock uint64
@ -458,7 +458,7 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, pkt eth.BlockHead
for _, header := range pkt { for _, header := range pkt {
headerRaw, err := rlpStream.Raw() headerRaw, err := rlpStream.Raw()
if err != nil { if err != nil {
return fmt.Errorf("decode 3 BlockHeadersPacket65: %w", err) return fmt.Errorf("decode 3 BlockHeadersPacket66: %w", err)
} }
number := header.Number.Uint64() number := header.Number.Uint64()
if number > highestBlock { if number > highestBlock {
@ -630,7 +630,7 @@ func (cs *ControlServerImpl) getBlockHeaders66(ctx context.Context, inreq *proto
_, err = sentry.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{}) _, err = sentry.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{})
if err != nil { if err != nil {
if !isPeerNotFoundErr(err) { if !isPeerNotFoundErr(err) {
return fmt.Errorf("send header response 65: %w", err) return fmt.Errorf("send header response 66: %w", err)
} }
return fmt.Errorf("send header response 66: %w", err) return fmt.Errorf("send header response 66: %w", err)
} }

View File

@ -698,35 +698,31 @@ func (ss *SentryServerImpl) findPeer(minBlock uint64) (*PeerInfo, bool) {
func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *proto_sentry.SendMessageByMinBlockRequest) (*proto_sentry.SentPeers, error) { func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *proto_sentry.SendMessageByMinBlockRequest) (*proto_sentry.SentPeers, error) {
reply := &proto_sentry.SentPeers{} reply := &proto_sentry.SentPeers{}
peerInfo, found := ss.findPeer(inreq.MinBlock)
if !found {
return reply, nil
}
msgcode := eth.FromProto[ss.Protocol.Version][inreq.Data.Id] msgcode := eth.FromProto[ss.Protocol.Version][inreq.Data.Id]
if msgcode != eth.GetBlockHeadersMsg && if msgcode != eth.GetBlockHeadersMsg &&
msgcode != eth.GetBlockBodiesMsg && msgcode != eth.GetBlockBodiesMsg &&
msgcode != eth.GetPooledTransactionsMsg { msgcode != eth.GetPooledTransactionsMsg {
return reply, fmt.Errorf("sendMessageByMinBlock not implemented for message Id: %s", inreq.Data.Id) return reply, fmt.Errorf("sendMessageByMinBlock not implemented for message Id: %s", inreq.Data.Id)
} }
if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil {
return reply, fmt.Errorf("sendMessageByMinBlock to peer %s: %w", peerInfo.ID(), err) var lastErr error
for retry := 0; retry < 16 && len(reply.Peers) == 0; retry++ { // limit number of retries
peerInfo, found := ss.findPeer(inreq.MinBlock)
if !found {
break
} }
if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil {
lastErr = fmt.Errorf("sendMessageByMinBlock to peer %s: %w", peerInfo.ID(), err)
} else {
peerInfo.AddDeadline(time.Now().Add(30 * time.Second)) peerInfo.AddDeadline(time.Now().Add(30 * time.Second))
reply.Peers = []*proto_types.H256{gointerfaces.ConvertHashToH256(peerInfo.ID())} reply.Peers = []*proto_types.H256{gointerfaces.ConvertHashToH256(peerInfo.ID())}
return reply, nil }
}
return reply, lastErr
} }
func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sentry.SendMessageByIdRequest) (*proto_sentry.SentPeers, error) { func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sentry.SendMessageByIdRequest) (*proto_sentry.SentPeers, error) {
reply := &proto_sentry.SentPeers{} reply := &proto_sentry.SentPeers{}
peerID := ConvertH256ToPeerID(inreq.PeerId)
peerInfo := ss.getPeer(peerID)
if peerInfo == nil {
//TODO: enable after support peer to sentry mapping
//return reply, fmt.Errorf("peer not found: %s", peerID)
return reply, nil
}
msgcode := eth.FromProto[ss.Protocol.Version][inreq.Data.Id] msgcode := eth.FromProto[ss.Protocol.Version][inreq.Data.Id]
if msgcode != eth.GetBlockHeadersMsg && if msgcode != eth.GetBlockHeadersMsg &&
msgcode != eth.BlockHeadersMsg && msgcode != eth.BlockHeadersMsg &&
@ -739,6 +735,14 @@ func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sent
return reply, fmt.Errorf("sendMessageById not implemented for message Id: %s", inreq.Data.Id) return reply, fmt.Errorf("sendMessageById not implemented for message Id: %s", inreq.Data.Id)
} }
peerID := ConvertH256ToPeerID(inreq.PeerId)
peerInfo := ss.getPeer(peerID)
if peerInfo == nil {
//TODO: enable after support peer to sentry mapping
//return reply, fmt.Errorf("peer not found: %s", peerID)
return reply, nil
}
if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil { if err := ss.writePeer(peerInfo, msgcode, inreq.Data.Data); err != nil {
return reply, fmt.Errorf("sendMessageById to peer %s: %w", peerID, err) return reply, fmt.Errorf("sendMessageById to peer %s: %w", peerID, err)
} }
@ -752,7 +756,8 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p
msgcode := eth.FromProto[ss.Protocol.Version][req.Data.Id] msgcode := eth.FromProto[ss.Protocol.Version][req.Data.Id]
if msgcode != eth.NewBlockMsg && if msgcode != eth.NewBlockMsg &&
msgcode != eth.NewBlockHashesMsg && msgcode != eth.NewBlockHashesMsg &&
msgcode != eth.NewPooledTransactionHashesMsg { msgcode != eth.NewPooledTransactionHashesMsg &&
msgcode != eth.TransactionsMsg {
return reply, fmt.Errorf("sendMessageToRandomPeers not implemented for message Id: %s", req.Data.Id) return reply, fmt.Errorf("sendMessageToRandomPeers not implemented for message Id: %s", req.Data.Id)
} }
@ -768,20 +773,17 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p
// Send the block to a subset of our peers // Send the block to a subset of our peers
sendToAmount := int(math.Sqrt(float64(amount))) sendToAmount := int(math.Sqrt(float64(amount)))
i := 0 i := 0
var innerErr error var lastErr error
ss.rangePeers(func(peerInfo *PeerInfo) bool { ss.rangePeers(func(peerInfo *PeerInfo) bool {
if err := ss.writePeer(peerInfo, msgcode, req.Data.Data); err != nil { if err := ss.writePeer(peerInfo, msgcode, req.Data.Data); err != nil {
innerErr = err lastErr = fmt.Errorf("sendMessageToRandomPeers to peer %s: %w", peerInfo.ID(), err)
return true return true
} }
reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID())) reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID()))
i++ i++
return i < sendToAmount return i < sendToAmount
}) })
if innerErr != nil { return reply, lastErr
return reply, fmt.Errorf("sendMessageToRandomPeers to peer %w", innerErr)
}
return reply, nil
} }
func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sentry.OutboundMessageData) (*proto_sentry.SentPeers, error) { func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sentry.OutboundMessageData) (*proto_sentry.SentPeers, error) {
@ -794,19 +796,16 @@ func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sen
return reply, fmt.Errorf("sendMessageToAll not implemented for message Id: %s", req.Id) return reply, fmt.Errorf("sendMessageToAll not implemented for message Id: %s", req.Id)
} }
var innerErr error var lastErr error
ss.rangePeers(func(peerInfo *PeerInfo) bool { ss.rangePeers(func(peerInfo *PeerInfo) bool {
if err := ss.writePeer(peerInfo, msgcode, req.Data); err != nil { if err := ss.writePeer(peerInfo, msgcode, req.Data); err != nil {
innerErr = err lastErr = fmt.Errorf("SendMessageToAll to peer %s: %w", peerInfo.ID(), err)
return true return true
} }
reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID())) reply.Peers = append(reply.Peers, gointerfaces.ConvertHashToH256(peerInfo.ID()))
return true return true
}) })
if innerErr != nil { return reply, lastErr
return reply, fmt.Errorf("sendMessageToRandomPeers to peer %w", innerErr)
}
return reply, nil
} }
func (ss *SentryServerImpl) HandShake(context.Context, *emptypb.Empty) (*proto_sentry.HandShakeReply, error) { func (ss *SentryServerImpl) HandShake(context.Context, *emptypb.Empty) (*proto_sentry.HandShakeReply, error) {

2
go.mod
View File

@ -37,7 +37,7 @@ require (
github.com/json-iterator/go v1.1.12 github.com/json-iterator/go v1.1.12
github.com/julienschmidt/httprouter v1.3.0 github.com/julienschmidt/httprouter v1.3.0
github.com/kevinburke/go-bindata v3.21.0+incompatible github.com/kevinburke/go-bindata v3.21.0+incompatible
github.com/ledgerwatch/erigon-lib v0.0.0-20211214133332-bb6dfef7c845 github.com/ledgerwatch/erigon-lib v0.0.0-20211216213333-d79d43f9085d
github.com/ledgerwatch/log/v3 v3.4.0 github.com/ledgerwatch/log/v3 v3.4.0
github.com/ledgerwatch/secp256k1 v1.0.0 github.com/ledgerwatch/secp256k1 v1.0.0
github.com/logrusorgru/aurora/v3 v3.0.0 github.com/logrusorgru/aurora/v3 v3.0.0

6
go.sum
View File

@ -617,8 +617,10 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20211214133332-bb6dfef7c845 h1:K7Zg42nG8GGnrkALkHECx+RQgpJ2neluY5PrcwpDZfQ= github.com/ledgerwatch/erigon-lib v0.0.0-20211216211604-7f6eb71c4c77 h1:zOn25ZPWoHEjTxYaKGzWMvBZHCdWYmWNP1xGSE0aFVI=
github.com/ledgerwatch/erigon-lib v0.0.0-20211214133332-bb6dfef7c845/go.mod h1:lyGP3i0x4CeabdKZ4beycD5xZfHWZwJsAX+70OfGj4Y= github.com/ledgerwatch/erigon-lib v0.0.0-20211216211604-7f6eb71c4c77/go.mod h1:lyGP3i0x4CeabdKZ4beycD5xZfHWZwJsAX+70OfGj4Y=
github.com/ledgerwatch/erigon-lib v0.0.0-20211216213333-d79d43f9085d h1:yPC/BYbfbuantLNUHc6x9f16Yap5EiyzyFBCY/eOJdE=
github.com/ledgerwatch/erigon-lib v0.0.0-20211216213333-d79d43f9085d/go.mod h1:lyGP3i0x4CeabdKZ4beycD5xZfHWZwJsAX+70OfGj4Y=
github.com/ledgerwatch/log/v3 v3.4.0 h1:SEIOcv5a2zkG3PmoT5jeTU9m/0nEUv0BJS5bzsjwKCI= github.com/ledgerwatch/log/v3 v3.4.0 h1:SEIOcv5a2zkG3PmoT5jeTU9m/0nEUv0BJS5bzsjwKCI=
github.com/ledgerwatch/log/v3 v3.4.0/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY= github.com/ledgerwatch/log/v3 v3.4.0/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ= github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=