erigon-pulse/cmd/sentry/download/broadcast.go
TBC Dev e1c44cd19b
Change sentry peer_id from H512 pubkey to H256 keccak256(pubkey) (#3013)
* Rename protoHandshake.ID to protoHandshake.Pubkey

* Fix enode.ID comment descriptions

* Change sentry peer_id from H512 pubkey to H256 keccak256(pubkey)

* Simplify PeerInfo helpers
2021-11-22 05:39:31 +00:00

273 lines
7.1 KiB
Go

package download
import (
"context"
"errors"
"math/big"
"strings"
"syscall"
proto_sentry "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/protocols/eth"
"github.com/ledgerwatch/erigon/p2p"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
"github.com/ledgerwatch/log/v3"
"google.golang.org/grpc"
)
// Methods of sentry called by Core
const (
// This is the target size for the packs of transactions or announcements. A
// pack can get larger than this if a single transactions exceeds this size.
maxTxPacketSize = 100 * 1024
)
func (cs *ControlServerImpl) PropagateNewBlockHashes(ctx context.Context, announces []headerdownload.Announce) {
cs.lock.RLock()
defer cs.lock.RUnlock()
typedRequest := make(eth.NewBlockHashesPacket, len(announces))
for i := range announces {
typedRequest[i].Hash = announces[i].Hash
typedRequest[i].Number = announces[i].Number
}
data, err := rlp.EncodeToBytes(&typedRequest)
if err != nil {
log.Error("propagateNewBlockHashes", "error", err)
return
}
var req66 *proto_sentry.OutboundMessageData
for _, sentry := range cs.sentries {
if !sentry.Ready() {
continue
}
switch sentry.Protocol() {
case eth.ETH66:
if req66 == nil {
req66 = &proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_NEW_BLOCK_HASHES_66,
Data: data,
}
_, err = sentry.SendMessageToAll(ctx, req66, &grpc.EmptyCallOption{})
if err != nil {
log.Error("propagateNewBlockHashes", "error", err)
}
}
default:
//??
}
}
}
func (cs *ControlServerImpl) BroadcastNewBlock(ctx context.Context, block *types.Block, td *big.Int) {
cs.lock.RLock()
defer cs.lock.RUnlock()
data, err := rlp.EncodeToBytes(&eth.NewBlockPacket{
Block: block,
TD: td,
})
if err != nil {
log.Error("broadcastNewBlock", "error", err)
}
var req66 *proto_sentry.SendMessageToRandomPeersRequest
for _, sentry := range cs.sentries {
if !sentry.Ready() {
continue
}
switch sentry.Protocol() {
case eth.ETH66:
if req66 == nil {
req66 = &proto_sentry.SendMessageToRandomPeersRequest{
MaxPeers: 1024,
Data: &proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_NEW_BLOCK_66,
Data: data,
},
}
}
if _, err = sentry.SendMessageToRandomPeers(ctx, req66, &grpc.EmptyCallOption{}); err != nil {
if isPeerNotFoundErr(err) || networkTemporaryErr(err) {
log.Debug("broadcastNewBlock", "error", err)
continue
}
log.Error("broadcastNewBlock", "error", err)
}
}
}
}
func (cs *ControlServerImpl) BroadcastLocalPooledTxs(ctx context.Context, txs []common.Hash) {
if len(txs) == 0 {
return
}
cs.lock.RLock()
defer cs.lock.RUnlock()
initialAmount := len(txs)
avgPeersPerSent65 := 0
avgPeersPerSent66 := 0
initialTxs := txs
for len(txs) > 0 {
pendingLen := maxTxPacketSize / common.HashLength
pending := make([]common.Hash, 0, pendingLen)
for i := 0; i < pendingLen && i < len(txs); i++ {
pending = append(pending, txs[i])
}
txs = txs[len(pending):]
data, err := rlp.EncodeToBytes(eth.NewPooledTransactionHashesPacket(pending))
if err != nil {
log.Error("BroadcastLocalPooledTxs", "error", err)
}
var req66 *proto_sentry.OutboundMessageData
for _, sentry := range cs.sentries {
if !sentry.Ready() {
continue
}
switch sentry.Protocol() {
case eth.ETH66:
if req66 == nil {
req66 = &proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66,
Data: data,
}
}
peers, err := sentry.SendMessageToAll(ctx, req66, &grpc.EmptyCallOption{})
if err != nil {
if isPeerNotFoundErr(err) || networkTemporaryErr(err) {
log.Debug("BroadcastLocalPooledTxs", "error", err)
continue
}
log.Error("BroadcastLocalPooledTxs", "error", err)
}
avgPeersPerSent66 += len(peers.GetPeers())
}
}
}
if initialAmount == 1 {
log.Info("local tx propagated", "to_peers_amount", avgPeersPerSent65+avgPeersPerSent66, "tx_hash", initialTxs[0].String())
} else {
log.Info("local txs propagated", "to_peers_amount", avgPeersPerSent65+avgPeersPerSent66, "txs_amount", initialAmount)
}
}
func (cs *ControlServerImpl) BroadcastRemotePooledTxs(ctx context.Context, txs []common.Hash) {
if len(txs) == 0 {
return
}
cs.lock.RLock()
defer cs.lock.RUnlock()
for len(txs) > 0 {
pendingLen := maxTxPacketSize / common.HashLength
pending := make([]common.Hash, 0, pendingLen)
for i := 0; i < pendingLen && i < len(txs); i++ {
pending = append(pending, txs[i])
}
txs = txs[len(pending):]
data, err := rlp.EncodeToBytes(eth.NewPooledTransactionHashesPacket(pending))
if err != nil {
log.Error("BroadcastRemotePooledTxs", "error", err)
}
var req66 *proto_sentry.SendMessageToRandomPeersRequest
for _, sentry := range cs.sentries {
if !sentry.Ready() {
continue
}
switch sentry.Protocol() {
case eth.ETH66:
if req66 == nil {
req66 = &proto_sentry.SendMessageToRandomPeersRequest{
MaxPeers: 1024,
Data: &proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66,
Data: data,
},
}
}
if _, err = sentry.SendMessageToRandomPeers(ctx, req66, &grpc.EmptyCallOption{}); err != nil {
if isPeerNotFoundErr(err) || networkTemporaryErr(err) {
log.Debug("BroadcastRemotePooledTxs", "error", err)
continue
}
log.Error("BroadcastRemotePooledTxs", "error", err)
}
}
}
}
}
func (cs *ControlServerImpl) PropagatePooledTxsToPeersList(ctx context.Context, peers []*types2.H256, txs []common.Hash) {
if len(txs) == 0 {
return
}
cs.lock.RLock()
defer cs.lock.RUnlock()
for len(txs) > 0 {
pendingLen := maxTxPacketSize / common.HashLength
pending := make([]common.Hash, 0, pendingLen)
for i := 0; i < pendingLen && i < len(txs); i++ {
pending = append(pending, txs[i])
}
txs = txs[len(pending):]
data, err := rlp.EncodeToBytes(eth.NewPooledTransactionHashesPacket(pending))
if err != nil {
log.Error("PropagatePooledTxsToPeersList", "error", err)
}
for _, sentry := range cs.sentries {
if !sentry.Ready() {
continue
}
for _, peer := range peers {
switch sentry.Protocol() {
case eth.ETH66:
req66 := &proto_sentry.SendMessageByIdRequest{
PeerId: peer,
Data: &proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66,
Data: data,
},
}
if _, err = sentry.SendMessageById(ctx, req66, &grpc.EmptyCallOption{}); err != nil {
if isPeerNotFoundErr(err) || networkTemporaryErr(err) {
log.Debug("PropagatePooledTxsToPeersList", "error", err)
continue
}
log.Error("PropagatePooledTxsToPeersList", "error", err)
}
}
}
}
}
}
func networkTemporaryErr(err error) bool {
return errors.Is(err, syscall.EPIPE) || errors.Is(err, p2p.ErrShuttingDown)
}
func isPeerNotFoundErr(err error) bool {
return strings.Contains(err.Error(), "peer not found")
}