Rename sentry.ControlServerImpl to sentry.MultyClient and sentry.SentryServerImpl to sentry.GrpcServer #444

This commit is contained in:
Alex Sharov 2022-05-10 12:17:44 +07:00 committed by GitHub
parent 3e4fb5dd49
commit 2c26583f6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 141 additions and 137 deletions

View File

@ -1186,7 +1186,7 @@ func newSync(ctx context.Context, db kv.RwDB, miningConfig *params.MiningConfig)
br := getBlockReader(chainConfig)
blockDownloaderWindow := 65536
sentryControlServer, err := sentry.NewControlServer(db, "", chainConfig, genesisBlock.Hash(), engine, 1, nil, blockDownloaderWindow, br)
sentryControlServer, err := sentry.NewMultyClient(db, "", chainConfig, genesisBlock.Hash(), engine, 1, nil, blockDownloaderWindow, br)
if err != nil {
panic(err)
}

View File

@ -98,7 +98,7 @@ func TestSendRawTransaction(t *testing.T) {
//TODO: make propagation easy to test - now race
//time.Sleep(time.Second)
//sent := m.SentMessage(0)
//require.Equal(eth.ToProto[m.SentryClient.Protocol()][eth.NewPooledTransactionHashesMsg], sent.Id)
//require.Equal(eth.ToProto[m.MultyClient.Protocol()][eth.NewPooledTransactionHashesMsg], sent.Id)
}
func transaction(nonce uint64, gaslimit uint64, key *ecdsa.PrivateKey) types.Transaction {

View File

@ -28,7 +28,7 @@ const (
maxTxPacketSize = 100 * 1024
)
func (cs *ControlServerImpl) PropagateNewBlockHashes(ctx context.Context, announces []headerdownload.Announce) {
func (cs *MultyClient) PropagateNewBlockHashes(ctx context.Context, announces []headerdownload.Announce) {
cs.lock.RLock()
defer cs.lock.RUnlock()
typedRequest := make(eth.NewBlockHashesPacket, len(announces))
@ -72,7 +72,7 @@ func (cs *ControlServerImpl) PropagateNewBlockHashes(ctx context.Context, announ
}
}
func (cs *ControlServerImpl) BroadcastNewBlock(ctx context.Context, block *types.Block, td *big.Int) {
func (cs *MultyClient) BroadcastNewBlock(ctx context.Context, block *types.Block, td *big.Int) {
cs.lock.RLock()
defer cs.lock.RUnlock()
data, err := rlp.EncodeToBytes(&eth.NewBlockPacket{
@ -116,7 +116,7 @@ func (cs *ControlServerImpl) BroadcastNewBlock(ctx context.Context, block *types
}
}
func (cs *ControlServerImpl) BroadcastLocalPooledTxs(ctx context.Context, txs []common.Hash) {
func (cs *MultyClient) BroadcastLocalPooledTxs(ctx context.Context, txs []common.Hash) {
if len(txs) == 0 {
return
}
@ -179,7 +179,7 @@ func (cs *ControlServerImpl) BroadcastLocalPooledTxs(ctx context.Context, txs []
}
}
func (cs *ControlServerImpl) BroadcastRemotePooledTxs(ctx context.Context, txs []common.Hash) {
func (cs *MultyClient) BroadcastRemotePooledTxs(ctx context.Context, txs []common.Hash) {
if len(txs) == 0 {
return
}
@ -235,7 +235,7 @@ func (cs *ControlServerImpl) BroadcastRemotePooledTxs(ctx context.Context, txs [
}
}
func (cs *ControlServerImpl) PropagatePooledTxsToPeersList(ctx context.Context, peers []*types2.H512, txs []common.Hash) {
func (cs *MultyClient) PropagatePooledTxsToPeersList(ctx context.Context, peers []*types2.H512, txs []common.Hash) {
if len(txs) == 0 {
return
}

View File

@ -18,7 +18,7 @@ import (
// Methods of sentry called by Core
func (cs *ControlServerImpl) UpdateHead(ctx context.Context, height uint64, hash common.Hash, td *uint256.Int) {
func (cs *MultyClient) UpdateHead(ctx context.Context, height uint64, hash common.Hash, td *uint256.Int) {
cs.lock.Lock()
defer cs.lock.Unlock()
cs.headHeight = height
@ -36,7 +36,7 @@ func (cs *ControlServerImpl) UpdateHead(ctx context.Context, height uint64, hash
}
}
func (cs *ControlServerImpl) SendBodyRequest(ctx context.Context, req *bodydownload.BodyRequest) (peerID [64]byte, ok bool) {
func (cs *MultyClient) SendBodyRequest(ctx context.Context, req *bodydownload.BodyRequest) (peerID [64]byte, ok bool) {
// if sentry not found peers to send such message, try next one. stop if found.
for i, ok, next := cs.randSentryIndex(); ok; i, ok = next() {
if !cs.sentries[i].Ready() {
@ -78,7 +78,7 @@ func (cs *ControlServerImpl) SendBodyRequest(ctx context.Context, req *bodydownl
return [64]byte{}, false
}
func (cs *ControlServerImpl) SendHeaderRequest(ctx context.Context, req *headerdownload.HeaderRequest) (peerID [64]byte, ok bool) {
func (cs *MultyClient) SendHeaderRequest(ctx context.Context, req *headerdownload.HeaderRequest) (peerID [64]byte, ok bool) {
// if sentry not found peers to send such message, try next one. stop if found.
for i, ok, next := cs.randSentryIndex(); ok; i, ok = next() {
if !cs.sentries[i].Ready() {
@ -130,7 +130,7 @@ func (cs *ControlServerImpl) SendHeaderRequest(ctx context.Context, req *headerd
return [64]byte{}, false
}
func (cs *ControlServerImpl) randSentryIndex() (int, bool, func() (int, bool)) {
func (cs *MultyClient) randSentryIndex() (int, bool, func() (int, bool)) {
var i int
if len(cs.sentries) > 1 {
i = rand.Intn(len(cs.sentries) - 1)
@ -143,7 +143,7 @@ func (cs *ControlServerImpl) randSentryIndex() (int, bool, func() (int, bool)) {
}
// sending list of penalties to all sentries
func (cs *ControlServerImpl) Penalize(ctx context.Context, penalties []headerdownload.PenaltyItem) {
func (cs *MultyClient) Penalize(ctx context.Context, penalties []headerdownload.PenaltyItem) {
for i := range penalties {
outreq := proto_sentry.PenalizePeerRequest{
PeerId: gointerfaces.ConvertHashToH512(penalties[i].PeerID),

View File

@ -478,7 +478,7 @@ func runPeer(
}
}
func grpcSentryServer(ctx context.Context, sentryAddr string, ss *SentryServerImpl, healthCheck bool) (*grpc.Server, error) {
func grpcSentryServer(ctx context.Context, sentryAddr string, ss *GrpcServer, healthCheck bool) (*grpc.Server, error) {
// STARTING GRPC SERVER
log.Info("Starting Sentry gRPC server", "on", sentryAddr)
listenConfig := net.ListenConfig{
@ -510,8 +510,8 @@ func grpcSentryServer(ctx context.Context, sentryAddr string, ss *SentryServerIm
return grpcServer, nil
}
func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNodeInfo func() *eth.NodeInfo, cfg *p2p.Config, protocol uint) *SentryServerImpl {
ss := &SentryServerImpl{
func NewGrpcServer(ctx context.Context, dialCandidates enode.Iterator, readNodeInfo func() *eth.NodeInfo, cfg *p2p.Config, protocol uint) *GrpcServer {
ss := &GrpcServer{
ctx: ctx,
p2p: cfg,
peersStreams: NewPeersStreams(),
@ -576,7 +576,7 @@ func NewSentryServer(ctx context.Context, dialCandidates enode.Iterator, readNod
// Sentry creates and runs standalone sentry
func Sentry(ctx context.Context, datadir string, sentryAddr string, discoveryDNS []string, cfg *p2p.Config, protocolVersion uint, healthCheck bool) error {
dir.MustExist(datadir)
sentryServer := NewSentryServer(ctx, nil, func() *eth.NodeInfo { return nil }, cfg, protocolVersion)
sentryServer := NewGrpcServer(ctx, nil, func() *eth.NodeInfo { return nil }, cfg, protocolVersion)
sentryServer.discoveryDNS = discoveryDNS
grpcServer, err := grpcSentryServer(ctx, sentryAddr, sentryServer, healthCheck)
@ -590,7 +590,7 @@ func Sentry(ctx context.Context, datadir string, sentryAddr string, discoveryDNS
return nil
}
type SentryServerImpl struct {
type GrpcServer struct {
proto_sentry.UnimplementedSentryServer
ctx context.Context
Protocol p2p.Protocol
@ -607,7 +607,7 @@ type SentryServerImpl struct {
p2p *p2p.Config
}
func (ss *SentryServerImpl) rangePeers(f func(peerInfo *PeerInfo) bool) {
func (ss *GrpcServer) rangePeers(f func(peerInfo *PeerInfo) bool) {
ss.GoodPeers.Range(func(key, value interface{}) bool {
peerInfo, _ := value.(*PeerInfo)
if peerInfo == nil {
@ -617,7 +617,7 @@ func (ss *SentryServerImpl) rangePeers(f func(peerInfo *PeerInfo) bool) {
})
}
func (ss *SentryServerImpl) getPeer(peerID [64]byte) (peerInfo *PeerInfo) {
func (ss *GrpcServer) getPeer(peerID [64]byte) (peerInfo *PeerInfo) {
if value, ok := ss.GoodPeers.Load(peerID); ok {
peerInfo := value.(*PeerInfo)
if peerInfo != nil {
@ -628,7 +628,7 @@ func (ss *SentryServerImpl) getPeer(peerID [64]byte) (peerInfo *PeerInfo) {
return nil
}
func (ss *SentryServerImpl) removePeer(peerID [64]byte) {
func (ss *GrpcServer) removePeer(peerID [64]byte) {
if value, ok := ss.GoodPeers.LoadAndDelete(peerID); ok {
peerInfo := value.(*PeerInfo)
if peerInfo != nil {
@ -637,7 +637,7 @@ func (ss *SentryServerImpl) removePeer(peerID [64]byte) {
}
}
func (ss *SentryServerImpl) writePeer(logPrefix string, peerInfo *PeerInfo, msgcode uint64, data []byte, ttl time.Duration) {
func (ss *GrpcServer) writePeer(logPrefix string, peerInfo *PeerInfo, msgcode uint64, data []byte, ttl time.Duration) {
peerInfo.Async(func() {
err := peerInfo.rw.WriteMsg(p2p.Msg{Code: msgcode, Size: uint32(len(data)), Payload: bytes.NewReader(data)})
if err != nil {
@ -652,7 +652,7 @@ func (ss *SentryServerImpl) writePeer(logPrefix string, peerInfo *PeerInfo, msgc
})
}
func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash, peerID [64]byte) error {
func (ss *GrpcServer) startSync(ctx context.Context, bestHash common.Hash, peerID [64]byte) error {
switch ss.Protocol.Version {
case eth.ETH66:
b, err := rlp.EncodeToBytes(&eth.GetBlockHeadersPacket66{
@ -680,14 +680,14 @@ func (ss *SentryServerImpl) startSync(ctx context.Context, bestHash common.Hash,
return nil
}
func (ss *SentryServerImpl) PenalizePeer(_ context.Context, req *proto_sentry.PenalizePeerRequest) (*emptypb.Empty, error) {
func (ss *GrpcServer) PenalizePeer(_ context.Context, req *proto_sentry.PenalizePeerRequest) (*emptypb.Empty, error) {
//log.Warn("Received penalty", "kind", req.GetPenalty().Descriptor().FullName, "from", fmt.Sprintf("%s", req.GetPeerId()))
peerID := ConvertH512ToPeerID(req.PeerId)
ss.removePeer(peerID)
return &emptypb.Empty{}, nil
}
func (ss *SentryServerImpl) PeerMinBlock(_ context.Context, req *proto_sentry.PeerMinBlockRequest) (*emptypb.Empty, error) {
func (ss *GrpcServer) PeerMinBlock(_ context.Context, req *proto_sentry.PeerMinBlockRequest) (*emptypb.Empty, error) {
peerID := ConvertH512ToPeerID(req.PeerId)
if peerInfo := ss.getPeer(peerID); peerInfo != nil {
peerInfo.SetIncreasedHeight(req.MinBlock)
@ -695,7 +695,7 @@ func (ss *SentryServerImpl) PeerMinBlock(_ context.Context, req *proto_sentry.Pe
return &emptypb.Empty{}, nil
}
func (ss *SentryServerImpl) findPeer(minBlock uint64) (*PeerInfo, bool) {
func (ss *GrpcServer) findPeer(minBlock uint64) (*PeerInfo, bool) {
// Choose a peer that we can send this request to, with maximum number of permits
var foundPeerInfo *PeerInfo
var maxPermits int
@ -717,7 +717,7 @@ func (ss *SentryServerImpl) findPeer(minBlock uint64) (*PeerInfo, bool) {
return foundPeerInfo, maxPermits > 0
}
func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *proto_sentry.SendMessageByMinBlockRequest) (*proto_sentry.SentPeers, error) {
func (ss *GrpcServer) SendMessageByMinBlock(_ context.Context, inreq *proto_sentry.SendMessageByMinBlockRequest) (*proto_sentry.SentPeers, error) {
reply := &proto_sentry.SentPeers{}
msgcode := eth.FromProto[ss.Protocol.Version][inreq.Data.Id]
if msgcode != eth.GetBlockHeadersMsg &&
@ -738,7 +738,7 @@ func (ss *SentryServerImpl) SendMessageByMinBlock(_ context.Context, inreq *prot
return reply, lastErr
}
func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sentry.SendMessageByIdRequest) (*proto_sentry.SentPeers, error) {
func (ss *GrpcServer) SendMessageById(_ context.Context, inreq *proto_sentry.SendMessageByIdRequest) (*proto_sentry.SentPeers, error) {
reply := &proto_sentry.SentPeers{}
msgcode := eth.FromProto[ss.Protocol.Version][inreq.Data.Id]
if msgcode != eth.GetBlockHeadersMsg &&
@ -765,7 +765,7 @@ func (ss *SentryServerImpl) SendMessageById(_ context.Context, inreq *proto_sent
return reply, nil
}
func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *proto_sentry.SendMessageToRandomPeersRequest) (*proto_sentry.SentPeers, error) {
func (ss *GrpcServer) SendMessageToRandomPeers(ctx context.Context, req *proto_sentry.SendMessageToRandomPeersRequest) (*proto_sentry.SentPeers, error) {
reply := &proto_sentry.SentPeers{}
msgcode := eth.FromProto[ss.Protocol.Version][req.Data.Id]
@ -798,7 +798,7 @@ func (ss *SentryServerImpl) SendMessageToRandomPeers(ctx context.Context, req *p
return reply, lastErr
}
func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sentry.OutboundMessageData) (*proto_sentry.SentPeers, error) {
func (ss *GrpcServer) SendMessageToAll(ctx context.Context, req *proto_sentry.OutboundMessageData) (*proto_sentry.SentPeers, error) {
reply := &proto_sentry.SentPeers{}
msgcode := eth.FromProto[ss.Protocol.Version][req.Id]
@ -817,7 +817,7 @@ func (ss *SentryServerImpl) SendMessageToAll(ctx context.Context, req *proto_sen
return reply, lastErr
}
func (ss *SentryServerImpl) HandShake(context.Context, *emptypb.Empty) (*proto_sentry.HandShakeReply, error) {
func (ss *GrpcServer) HandShake(context.Context, *emptypb.Empty) (*proto_sentry.HandShakeReply, error) {
reply := &proto_sentry.HandShakeReply{}
switch ss.Protocol.Version {
case eth.ETH66:
@ -826,7 +826,7 @@ func (ss *SentryServerImpl) HandShake(context.Context, *emptypb.Empty) (*proto_s
return reply, nil
}
func (ss *SentryServerImpl) SetStatus(ctx context.Context, statusData *proto_sentry.StatusData) (*proto_sentry.SetStatusReply, error) {
func (ss *GrpcServer) SetStatus(ctx context.Context, statusData *proto_sentry.StatusData) (*proto_sentry.SetStatusReply, error) {
genesisHash := gointerfaces.ConvertH256ToHash(statusData.ForkData.Genesis)
ss.lock.Lock()
@ -869,7 +869,7 @@ func (ss *SentryServerImpl) SetStatus(ctx context.Context, statusData *proto_sen
return reply, nil
}
func (ss *SentryServerImpl) Peers(_ context.Context, _ *emptypb.Empty) (*proto_sentry.PeersReply, error) {
func (ss *GrpcServer) Peers(_ context.Context, _ *emptypb.Empty) (*proto_sentry.PeersReply, error) {
if ss.P2pServer == nil {
return nil, errors.New("p2p server was not started")
}
@ -898,7 +898,7 @@ func (ss *SentryServerImpl) Peers(_ context.Context, _ *emptypb.Empty) (*proto_s
return &reply, nil
}
func (ss *SentryServerImpl) SimplePeerCount() (pc int) {
func (ss *GrpcServer) SimplePeerCount() (pc int) {
ss.rangePeers(func(peerInfo *PeerInfo) bool {
pc++
return true
@ -906,7 +906,7 @@ func (ss *SentryServerImpl) SimplePeerCount() (pc int) {
return pc
}
func (ss *SentryServerImpl) PeerCount(_ context.Context, req *proto_sentry.PeerCountRequest) (*proto_sentry.PeerCountReply, error) {
func (ss *GrpcServer) PeerCount(_ context.Context, req *proto_sentry.PeerCountRequest) (*proto_sentry.PeerCountReply, error) {
return &proto_sentry.PeerCountReply{Count: uint64(ss.SimplePeerCount())}, nil
}
@ -920,13 +920,13 @@ func setupDiscovery(urls []string) (enode.Iterator, error) {
return client.NewIterator(urls...)
}
func (ss *SentryServerImpl) GetStatus() *proto_sentry.StatusData {
func (ss *GrpcServer) GetStatus() *proto_sentry.StatusData {
ss.lock.RLock()
defer ss.lock.RUnlock()
return ss.statusData
}
func (ss *SentryServerImpl) send(msgID proto_sentry.MessageId, peerID [64]byte, b []byte) {
func (ss *GrpcServer) send(msgID proto_sentry.MessageId, peerID [64]byte, b []byte) {
ss.messageStreamsLock.RLock()
defer ss.messageStreamsLock.RUnlock()
req := &proto_sentry.InboundMessage{
@ -950,14 +950,14 @@ func (ss *SentryServerImpl) send(msgID proto_sentry.MessageId, peerID [64]byte,
}
}
func (ss *SentryServerImpl) hasSubscribers(msgID proto_sentry.MessageId) bool {
func (ss *GrpcServer) hasSubscribers(msgID proto_sentry.MessageId) bool {
ss.messageStreamsLock.RLock()
defer ss.messageStreamsLock.RUnlock()
return ss.messageStreams[msgID] != nil && len(ss.messageStreams[msgID]) > 0
// log.Error("Sending msg to core P2P failed", "msg", proto_sentry.MessageId_name[int32(streamMsg.msgId)], "err", err)
}
func (ss *SentryServerImpl) addMessagesStream(ids []proto_sentry.MessageId, ch chan *proto_sentry.InboundMessage) func() {
func (ss *GrpcServer) addMessagesStream(ids []proto_sentry.MessageId, ch chan *proto_sentry.InboundMessage) func() {
ss.messageStreamsLock.Lock()
defer ss.messageStreamsLock.Unlock()
if ss.messageStreams == nil {
@ -985,7 +985,7 @@ func (ss *SentryServerImpl) addMessagesStream(ids []proto_sentry.MessageId, ch c
}
const MessagesQueueSize = 1024 // one such queue per client of .Messages stream
func (ss *SentryServerImpl) Messages(req *proto_sentry.MessagesRequest, server proto_sentry.Sentry_MessagesServer) error {
func (ss *GrpcServer) Messages(req *proto_sentry.MessagesRequest, server proto_sentry.Sentry_MessagesServer) error {
log.Trace("[Messages] new subscriber", "to", req.Ids)
ch := make(chan *proto_sentry.InboundMessage, MessagesQueueSize)
defer close(ch)
@ -1008,25 +1008,25 @@ func (ss *SentryServerImpl) Messages(req *proto_sentry.MessagesRequest, server p
}
// Close performs cleanup operations for the sentry
func (ss *SentryServerImpl) Close() {
func (ss *GrpcServer) Close() {
if ss.P2pServer != nil {
ss.P2pServer.Stop()
}
}
func (ss *SentryServerImpl) sendNewPeerToClients(peerID *proto_types.H512) {
func (ss *GrpcServer) sendNewPeerToClients(peerID *proto_types.H512) {
if err := ss.peersStreams.Broadcast(&proto_sentry.PeerEvent{PeerId: peerID, EventId: proto_sentry.PeerEvent_Connect}); err != nil {
log.Warn("Sending new peer notice to core P2P failed", "err", err)
}
}
func (ss *SentryServerImpl) sendGonePeerToClients(peerID *proto_types.H512) {
func (ss *GrpcServer) sendGonePeerToClients(peerID *proto_types.H512) {
if err := ss.peersStreams.Broadcast(&proto_sentry.PeerEvent{PeerId: peerID, EventId: proto_sentry.PeerEvent_Disconnect}); err != nil {
log.Warn("Sending gone peer notice to core P2P failed", "err", err)
}
}
func (ss *SentryServerImpl) PeerEvents(req *proto_sentry.PeerEventsRequest, server proto_sentry.Sentry_PeerEventsServer) error {
func (ss *GrpcServer) PeerEvents(req *proto_sentry.PeerEventsRequest, server proto_sentry.Sentry_PeerEventsServer) error {
clean := ss.peersStreams.Add(server)
defer clean()
select {
@ -1037,7 +1037,7 @@ func (ss *SentryServerImpl) PeerEvents(req *proto_sentry.PeerEventsRequest, serv
}
}
func (ss *SentryServerImpl) NodeInfo(_ context.Context, _ *emptypb.Empty) (*proto_types.NodeInfoReply, error) {
func (ss *GrpcServer) NodeInfo(_ context.Context, _ *emptypb.Empty) (*proto_types.NodeInfoReply, error) {
if ss.P2pServer == nil {
return nil, errors.New("p2p server was not started")
}

View File

@ -21,8 +21,8 @@ import (
"github.com/stretchr/testify/require"
)
func testSentryServer(db kv.Getter, genesis *core.Genesis, genesisHash common.Hash) *SentryServerImpl {
s := &SentryServerImpl{
func testSentryServer(db kv.Getter, genesis *core.Genesis, genesisHash common.Hash) *GrpcServer {
s := &GrpcServer{
ctx: context.Background(),
}
@ -74,7 +74,7 @@ func testForkIDSplit(t *testing.T, protocol uint) {
genesisProFork = gspecProFork.MustCommit(dbProFork)
)
var s1, s2 *SentryServerImpl
var s1, s2 *GrpcServer
err := dbNoFork.Update(context.Background(), func(tx kv.RwTx) error {
s1 = testSentryServer(tx, gspecNoFork, genesisNoFork.Hash())
@ -160,7 +160,7 @@ func TestSentryServerImpl_SetStatusInitPanic(t *testing.T) {
dbNoFork := memdb.NewTestDB(t)
gspecNoFork := &core.Genesis{Config: configNoFork}
genesisNoFork := gspecNoFork.MustCommit(dbNoFork)
ss := &SentryServerImpl{p2p: &p2p.Config{}}
ss := &GrpcServer{p2p: &p2p.Config{}}
_, err := ss.SetStatus(context.Background(), &proto_sentry.StatusData{
ForkData: &proto_sentry.Forks{Genesis: gointerfaces.ConvertHashToH256(genesisNoFork.Hash())},

View File

@ -41,7 +41,7 @@ import (
func RecvUploadMessageLoop(ctx context.Context,
sentry direct.SentryClient,
cs *ControlServerImpl,
cs *MultyClient,
wg *sync.WaitGroup,
) {
for {
@ -129,7 +129,7 @@ func RecvUploadMessage(ctx context.Context,
func RecvUploadHeadersMessageLoop(ctx context.Context,
sentry direct.SentryClient,
cs *ControlServerImpl,
cs *MultyClient,
wg *sync.WaitGroup,
) {
for {
@ -215,7 +215,7 @@ func RecvUploadHeadersMessage(ctx context.Context,
func RecvMessageLoop(ctx context.Context,
sentry direct.SentryClient,
cs *ControlServerImpl,
cs *MultyClient,
wg *sync.WaitGroup,
) {
for {
@ -320,12 +320,14 @@ func RecvMessage(
}
}
func SentrySetStatus(ctx context.Context, sentry direct.SentryClient, controlServer *ControlServerImpl) error {
func SentrySetStatus(ctx context.Context, sentry direct.SentryClient, controlServer *MultyClient) error {
_, err := sentry.SetStatus(ctx, makeStatusData(controlServer))
return err
}
type ControlServerImpl struct {
// MultyClient - does handle request/response/subscriptions to multiple sentries
// each sentry may support same or different p2p protocol
type MultyClient struct {
lock sync.RWMutex
Hd *headerdownload.HeaderDownload
Bd *bodydownload.BodyDownload
@ -343,9 +345,9 @@ type ControlServerImpl struct {
blockReader interfaces.HeaderAndCanonicalReader
}
func NewControlServer(db kv.RwDB, nodeName string, chainConfig *params.ChainConfig,
func NewMultyClient(db kv.RwDB, nodeName string, chainConfig *params.ChainConfig,
genesisHash common.Hash, engine consensus.Engine, networkID uint64, sentries []direct.SentryClient,
window int, blockReader interfaces.HeaderAndCanonicalReader) (*ControlServerImpl, error) {
window int, blockReader interfaces.HeaderAndCanonicalReader) (*MultyClient, error) {
hd := headerdownload.NewHeaderDownload(
512, /* anchorLimit */
1024*1024, /* linkLimit */
@ -358,7 +360,7 @@ func NewControlServer(db kv.RwDB, nodeName string, chainConfig *params.ChainConf
}
bd := bodydownload.NewBodyDownload(window /* outstandingLimit */, engine)
cs := &ControlServerImpl{
cs := &MultyClient{
nodeName: nodeName,
Hd: hd,
Bd: bd,
@ -379,7 +381,9 @@ func NewControlServer(db kv.RwDB, nodeName string, chainConfig *params.ChainConf
return cs, err
}
func (cs *ControlServerImpl) newBlockHashes66(ctx context.Context, req *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
func (cs *MultyClient) Sentries() []direct.SentryClient { return cs.sentries }
func (cs *MultyClient) newBlockHashes66(ctx context.Context, req *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
if !cs.Hd.RequestChaining() && !cs.Hd.FetchingNew() {
return nil
}
@ -424,7 +428,7 @@ func (cs *ControlServerImpl) newBlockHashes66(ctx context.Context, req *proto_se
return nil
}
func (cs *ControlServerImpl) blockHeaders66(ctx context.Context, in *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
func (cs *MultyClient) blockHeaders66(ctx context.Context, in *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
// Parse the entire packet from scratch
var pkt eth.BlockHeadersPacket66
if err := rlp.DecodeBytes(in.Data, &pkt); err != nil {
@ -444,7 +448,7 @@ func (cs *ControlServerImpl) blockHeaders66(ctx context.Context, in *proto_sentr
return cs.blockHeaders(ctx, pkt.BlockHeadersPacket, rlpStream, in.PeerId, sentry)
}
func (cs *ControlServerImpl) blockHeaders(ctx context.Context, pkt eth.BlockHeadersPacket, rlpStream *rlp.Stream, peerID *proto_types.H512, sentry direct.SentryClient) error {
func (cs *MultyClient) blockHeaders(ctx context.Context, pkt eth.BlockHeadersPacket, rlpStream *rlp.Stream, peerID *proto_types.H512, sentry direct.SentryClient) error {
// Stream is at the BlockHeadersPacket, which is list of headers
if _, err := rlpStream.List(); err != nil {
return fmt.Errorf("decode 2 BlockHeadersPacket66: %w", err)
@ -532,7 +536,7 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, pkt eth.BlockHead
return nil
}
func (cs *ControlServerImpl) newBlock66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
func (cs *MultyClient) newBlock66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
// Extract header from the block
rlpStream := rlp.NewStream(bytes.NewReader(inreq.Data), uint64(len(inreq.Data)))
_, err := rlpStream.List() // Now stream is at the beginning of the block record
@ -588,7 +592,7 @@ func (cs *ControlServerImpl) newBlock66(ctx context.Context, inreq *proto_sentry
return nil
}
func (cs *ControlServerImpl) blockBodies66(inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
func (cs *MultyClient) blockBodies66(inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
var request eth.BlockRawBodiesPacket66
if err := rlp.DecodeBytes(inreq.Data, &request); err != nil {
return fmt.Errorf("decode BlockBodiesPacket66: %w", err)
@ -598,11 +602,11 @@ func (cs *ControlServerImpl) blockBodies66(inreq *proto_sentry.InboundMessage, s
return nil
}
func (cs *ControlServerImpl) receipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
func (cs *MultyClient) receipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
return nil
}
func (cs *ControlServerImpl) getBlockHeaders66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
func (cs *MultyClient) getBlockHeaders66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
var query eth.GetBlockHeadersPacket66
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding getBlockHeaders66: %w, data: %x", err, inreq.Data)
@ -643,7 +647,7 @@ func (cs *ControlServerImpl) getBlockHeaders66(ctx context.Context, inreq *proto
return nil
}
func (cs *ControlServerImpl) getBlockBodies66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
func (cs *MultyClient) getBlockBodies66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
var query eth.GetBlockBodiesPacket66
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding getBlockBodies66: %w, data: %x", err, inreq.Data)
@ -680,7 +684,7 @@ func (cs *ControlServerImpl) getBlockBodies66(ctx context.Context, inreq *proto_
return nil
}
func (cs *ControlServerImpl) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
func (cs *MultyClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
var query eth.GetReceiptsPacket66
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data)
@ -720,7 +724,7 @@ func (cs *ControlServerImpl) getReceipts66(ctx context.Context, inreq *proto_sen
return nil
}
func (cs *ControlServerImpl) HandleInboundMessage(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
func (cs *MultyClient) HandleInboundMessage(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
switch inreq.Id {
// ========= eth 66 ==========
@ -745,7 +749,7 @@ func (cs *ControlServerImpl) HandleInboundMessage(ctx context.Context, inreq *pr
}
}
func makeStatusData(s *ControlServerImpl) *proto_sentry.StatusData {
func makeStatusData(s *MultyClient) *proto_sentry.StatusData {
return &proto_sentry.StatusData{
NetworkId: s.networkId,
TotalDifficulty: gointerfaces.ConvertUint256IntToH256(s.headTd),

View File

@ -116,11 +116,10 @@ type Ethereum struct {
minedBlocks chan *types.Block
// downloader fields
sentryCtx context.Context
sentryCancel context.CancelFunc
sentryControlServer *sentry.ControlServerImpl
sentryServers []*sentry.SentryServerImpl
sentries []direct.SentryClient
sentryCtx context.Context
sentryCancel context.CancelFunc
sentriesClient *sentry.MultyClient
sentryServers []*sentry.GrpcServer
stagedSync *stagedsync.Sync
@ -209,7 +208,6 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
genesisHash: genesis.Hash(),
waitForStageLoopStop: make(chan struct{}),
waitForMiningStop: make(chan struct{}),
sentries: []direct.SentryClient{},
notifications: &stagedsync.Notifications{
Events: privateapi.NewEvents(),
Accumulator: shards.NewAccumulator(chainConfig),
@ -222,13 +220,14 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal)
}
var sentries []direct.SentryClient
if len(stack.Config().P2P.SentryAddr) > 0 {
for _, addr := range stack.Config().P2P.SentryAddr {
sentryClient, err := sentry.GrpcClient(backend.sentryCtx, addr)
if err != nil {
return nil, err
}
backend.sentries = append(backend.sentries, sentryClient)
sentries = append(sentries, sentryClient)
}
} else {
var readNodeInfo = func() *eth.NodeInfo {
@ -248,9 +247,9 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
cfg66 := stack.Config().P2P
cfg66.NodeDatabase = filepath.Join(stack.Config().DataDir, "nodes", "eth66")
server66 := sentry.NewSentryServer(backend.sentryCtx, d66, readNodeInfo, &cfg66, eth.ETH66)
server66 := sentry.NewGrpcServer(backend.sentryCtx, d66, readNodeInfo, &cfg66, eth.ETH66)
backend.sentryServers = append(backend.sentryServers, server66)
backend.sentries = []direct.SentryClient{direct.NewSentryClientDirect(eth.ETH66, server66)}
sentries = []direct.SentryClient{direct.NewSentryClientDirect(eth.ETH66, server66)}
go func() {
logEvery := time.NewTicker(120 * time.Second)
@ -341,7 +340,7 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
return nil, err
}
backend.sentryControlServer, err = sentry.NewControlServer(chainKv, stack.Config().NodeName(), chainConfig, genesis.Hash(), backend.engine, backend.config.NetworkID, backend.sentries, config.BlockDownloaderWindow, blockReader)
backend.sentriesClient, err = sentry.NewMultyClient(chainKv, stack.Config().NodeName(), chainConfig, genesis.Hash(), backend.engine, backend.config.NetworkID, sentries, config.BlockDownloaderWindow, blockReader)
if err != nil {
return nil, err
}
@ -358,7 +357,7 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
backend.newTxs2 = make(chan types2.Hashes, 1024)
//defer close(newTxs)
backend.txPool2DB, backend.txPool2, backend.txPool2Fetch, backend.txPool2Send, backend.txPool2GrpcServer, err = txpooluitl.AllComponents(
ctx, txpoolCfg, kvcache.NewDummy(), backend.newTxs2, backend.chainDB, backend.sentries, stateDiffClient,
ctx, txpoolCfg, kvcache.NewDummy(), backend.newTxs2, backend.chainDB, backend.sentriesClient.Sentries(), stateDiffClient,
)
if err != nil {
return nil, err
@ -411,7 +410,7 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
// Initialize ethbackend
ethBackendRPC := privateapi.NewEthBackendServer(ctx, backend, backend.chainDB, backend.notifications.Events,
blockReader, chainConfig, backend.sentryControlServer.Hd.BeaconRequestList, backend.sentryControlServer.Hd.PayloadStatusCh,
blockReader, chainConfig, backend.sentriesClient.Hd.BeaconRequestList, backend.sentriesClient.Hd.PayloadStatusCh,
assembleBlockPOS, config.Miner.EnabledPOS)
miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi)
// If we enabled the proposer flag we initiates the block proposing thread
@ -463,15 +462,15 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
select {
case b := <-backend.minedBlocks:
//p2p
//backend.sentryControlServer.BroadcastNewBlock(context.Background(), b, b.Difficulty())
//backend.sentriesClient.BroadcastNewBlock(context.Background(), b, b.Difficulty())
//rpcdaemon
if err := miningRPC.(*privateapi.MiningServer).BroadcastMinedBlock(b); err != nil {
log.Error("txpool rpc mined block broadcast", "err", err)
}
if err := backend.sentryControlServer.Hd.AddMinedHeader(b.Header()); err != nil {
if err := backend.sentriesClient.Hd.AddMinedHeader(b.Header()); err != nil {
log.Error("add mined block to header downloader", "err", err)
}
if err := backend.sentryControlServer.Bd.AddMinedBlock(b); err != nil {
if err := backend.sentriesClient.Bd.AddMinedBlock(b); err != nil {
log.Error("add mined block to body downloader", "err", err)
}
@ -496,13 +495,13 @@ func New(stack *node.Node, config *ethconfig.Config, txpoolCfg txpool2.Config, l
backend.stagedSync, err = stages2.NewStagedSync(backend.sentryCtx, backend.log, backend.chainDB,
stack.Config().P2P, *config, chainConfig.TerminalTotalDifficulty,
backend.sentryControlServer, tmpdir, backend.notifications,
backend.sentriesClient, tmpdir, backend.notifications,
backend.downloaderClient, allSnapshots, config.SnapshotDir, headCh)
if err != nil {
return nil, err
}
backend.sentryControlServer.Hd.StartPoSDownloader(backend.sentryCtx, backend.sentryControlServer.SendHeaderRequest, backend.sentryControlServer.Penalize)
backend.sentriesClient.Hd.StartPoSDownloader(backend.sentryCtx, backend.sentriesClient.SendHeaderRequest, backend.sentriesClient.Penalize)
emptyBadHash := config.BadBlockHash == common.Hash{}
if !emptyBadHash {
@ -658,7 +657,7 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, mining *stagedsy
defer skipCycleEvery.Stop()
for range skipCycleEvery.C {
select {
case s.sentryControlServer.Hd.SkipCycleHack <- struct{}{}:
case s.sentriesClient.Hd.SkipCycleHack <- struct{}{}:
default:
}
}
@ -735,7 +734,7 @@ func (s *Ethereum) NetPeerCount() (uint64, error) {
var sentryPc uint64 = 0
log.Trace("sentry", "peer count", sentryPc)
for _, sc := range s.sentries {
for _, sc := range s.sentriesClient.Sentries() {
ctx := context.Background()
reply, err := sc.PeerCount(ctx, &proto_sentry.PeerCountRequest{})
if err != nil {
@ -749,13 +748,13 @@ func (s *Ethereum) NetPeerCount() (uint64, error) {
}
func (s *Ethereum) NodesInfo(limit int) (*remote.NodesInfoReply, error) {
if limit == 0 || limit > len(s.sentries) {
limit = len(s.sentries)
if limit == 0 || limit > len(s.sentriesClient.Sentries()) {
limit = len(s.sentriesClient.Sentries())
}
nodes := make([]*prototypes.NodeInfoReply, 0, limit)
for i := 0; i < limit; i++ {
sc := s.sentries[i]
sc := s.sentriesClient.Sentries()[i]
nodeInfo, err := sc.NodeInfo(context.Background(), nil)
if err != nil {
@ -773,10 +772,10 @@ func (s *Ethereum) NodesInfo(limit int) (*remote.NodesInfoReply, error) {
func (s *Ethereum) Peers(ctx context.Context) (*remote.PeersReply, error) {
var reply remote.PeersReply
for _, sentryClient := range s.sentries {
for _, sentryClient := range s.sentriesClient.Sentries() {
peers, err := sentryClient.Peers(ctx, &emptypb.Empty{})
if err != nil {
return nil, fmt.Errorf("Ethereum backend SentryClient.Peers error: %w", err)
return nil, fmt.Errorf("Ethereum backend MultyClient.Peers error: %w", err)
}
reply.Peers = append(reply.Peers, peers.Peers...)
}
@ -796,20 +795,21 @@ func (s *Ethereum) Protocols() []p2p.Protocol {
// Start implements node.Lifecycle, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start() error {
for i := range s.sentries {
sentries := s.sentriesClient.Sentries()
for i := range sentries {
go func(i int) {
sentry.RecvMessageLoop(s.sentryCtx, s.sentries[i], s.sentryControlServer, nil)
sentry.RecvMessageLoop(s.sentryCtx, sentries[i], s.sentriesClient, nil)
}(i)
go func(i int) {
sentry.RecvUploadMessageLoop(s.sentryCtx, s.sentries[i], s.sentryControlServer, nil)
sentry.RecvUploadMessageLoop(s.sentryCtx, sentries[i], s.sentriesClient, nil)
}(i)
go func(i int) {
sentry.RecvUploadHeadersMessageLoop(s.sentryCtx, s.sentries[i], s.sentryControlServer, nil)
sentry.RecvUploadHeadersMessageLoop(s.sentryCtx, sentries[i], s.sentriesClient, nil)
}(i)
}
time.Sleep(10 * time.Millisecond) // just to reduce logs order confusion
go stages2.StageLoop(s.sentryCtx, s.chainDB, s.stagedSync, s.sentryControlServer.Hd, s.notifications, s.sentryControlServer.UpdateHead, s.waitForStageLoopStop, s.config.SyncLoopThrottle)
go stages2.StageLoop(s.sentryCtx, s.chainDB, s.stagedSync, s.sentriesClient.Hd, s.notifications, s.sentriesClient.UpdateHead, s.waitForStageLoopStop, s.config.SyncLoopThrottle)
return nil
}
@ -870,6 +870,6 @@ func (s *Ethereum) SentryCtx() context.Context {
return s.sentryCtx
}
func (s *Ethereum) SentryControlServer() *sentry.ControlServerImpl {
return s.sentryControlServer
func (s *Ethereum) SentryControlServer() *sentry.MultyClient {
return s.sentriesClient
}

View File

@ -52,7 +52,7 @@ const (
// Service implements an Ethereum netstats reporting daemon that pushes local
// chain statistics up to a monitoring server.
type Service struct {
servers []*sentry.SentryServerImpl // Peer-to-peer server to retrieve networking infos
servers []*sentry.GrpcServer // Peer-to-peer server to retrieve networking infos
chaindb kv.RoDB
networkid uint64
engine consensus.Engine // Consensus engine to retrieve variadic block fields
@ -118,7 +118,7 @@ func (w *connWrapper) Close() error {
}
// New returns a monitoring service ready for stats reporting.
func New(node *node.Node, servers []*sentry.SentryServerImpl, chainDB kv.RoDB, engine consensus.Engine, url string, networkid uint64, quitCh <-chan struct{}, headCh chan *types.Block) error {
func New(node *node.Node, servers []*sentry.GrpcServer, chainDB kv.RoDB, engine consensus.Engine, url string, networkid uint64, quitCh <-chan struct{}, headCh chan *types.Block) error {
// Parse the netstats connection url
re := regexp.MustCompile("([^:@]*)(:([^@]*))?@(.+)")
parts := re.FindStringSubmatch(url)

View File

@ -51,30 +51,30 @@ import (
type MockSentry struct {
proto_sentry.UnimplementedSentryServer
Ctx context.Context
Log log.Logger
t *testing.T
cancel context.CancelFunc
DB kv.RwDB
tmpdir string
snapshotDir string
Engine consensus.Engine
ChainConfig *params.ChainConfig
Sync *stagedsync.Sync
MiningSync *stagedsync.Sync
PendingBlocks chan *types.Block
MinedBlocks chan *types.Block
downloader *sentry.ControlServerImpl
Key *ecdsa.PrivateKey
Genesis *types.Block
SentryClient direct.SentryClient
PeerId *ptypes.H512
UpdateHead func(Ctx context.Context, head uint64, hash common.Hash, td *uint256.Int)
streams map[proto_sentry.MessageId][]proto_sentry.Sentry_MessagesServer
sentMessages []*proto_sentry.OutboundMessageData
StreamWg sync.WaitGroup
ReceiveWg sync.WaitGroup
Address common.Address
Ctx context.Context
Log log.Logger
t *testing.T
cancel context.CancelFunc
DB kv.RwDB
tmpdir string
snapshotDir string
Engine consensus.Engine
ChainConfig *params.ChainConfig
Sync *stagedsync.Sync
MiningSync *stagedsync.Sync
PendingBlocks chan *types.Block
MinedBlocks chan *types.Block
sentriesClient *sentry.MultyClient
Key *ecdsa.PrivateKey
Genesis *types.Block
SentryClient direct.SentryClient
PeerId *ptypes.H512
UpdateHead func(Ctx context.Context, head uint64, hash common.Hash, td *uint256.Int)
streams map[proto_sentry.MessageId][]proto_sentry.Sentry_MessagesServer
sentMessages []*proto_sentry.OutboundMessageData
StreamWg sync.WaitGroup
ReceiveWg sync.WaitGroup
Address common.Address
Notifications *stagedsync.Notifications
@ -267,7 +267,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
blockDownloaderWindow := 65536
networkID := uint64(1)
mock.downloader, err = sentry.NewControlServer(mock.DB, "mock", mock.ChainConfig, mock.Genesis.Hash(), mock.Engine, networkID, sentries, blockDownloaderWindow, blockReader)
mock.sentriesClient, err = sentry.NewMultyClient(mock.DB, "mock", mock.ChainConfig, mock.Genesis.Hash(), mock.Engine, networkID, sentries, blockDownloaderWindow, blockReader)
if err != nil {
if t != nil {
t.Fatal(err)
@ -283,12 +283,12 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
mock.Sync = stagedsync.New(
stagedsync.DefaultStages(mock.Ctx, prune,
stagedsync.StageHeadersCfg(mock.DB, mock.downloader.Hd, mock.downloader.Bd, *mock.ChainConfig, sendHeaderRequest, propagateNewBlockHashes, penalize, cfg.BatchSize, false, allSnapshots, snapshotsDownloader, blockReader, mock.tmpdir, mock.Notifications.Events),
stagedsync.StageHeadersCfg(mock.DB, mock.sentriesClient.Hd, mock.sentriesClient.Bd, *mock.ChainConfig, sendHeaderRequest, propagateNewBlockHashes, penalize, cfg.BatchSize, false, allSnapshots, snapshotsDownloader, blockReader, mock.tmpdir, mock.Notifications.Events),
stagedsync.StageCumulativeIndexCfg(mock.DB),
stagedsync.StageBlockHashesCfg(mock.DB, mock.tmpdir, mock.ChainConfig),
stagedsync.StageBodiesCfg(
mock.DB,
mock.downloader.Bd,
mock.sentriesClient.Bd,
sendBodyRequest,
penalize,
blockPropagator,
@ -325,7 +325,7 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
stagedsync.DefaultPruneOrder,
)
mock.downloader.Hd.StartPoSDownloader(mock.Ctx, sendHeaderRequest, penalize)
mock.sentriesClient.Hd.StartPoSDownloader(mock.Ctx, sendHeaderRequest, penalize)
miningConfig := cfg.Miner
miningConfig.Enabled = true
@ -349,13 +349,13 @@ func MockWithEverything(t *testing.T, gspec *core.Genesis, key *ecdsa.PrivateKey
)
mock.StreamWg.Add(1)
go sentry.RecvMessageLoop(mock.Ctx, mock.SentryClient, mock.downloader, &mock.ReceiveWg)
go sentry.RecvMessageLoop(mock.Ctx, mock.SentryClient, mock.sentriesClient, &mock.ReceiveWg)
mock.StreamWg.Wait()
mock.StreamWg.Add(1)
go sentry.RecvUploadMessageLoop(mock.Ctx, mock.SentryClient, mock.downloader, &mock.ReceiveWg)
go sentry.RecvUploadMessageLoop(mock.Ctx, mock.SentryClient, mock.sentriesClient, &mock.ReceiveWg)
mock.StreamWg.Wait()
mock.StreamWg.Add(1)
go sentry.RecvUploadHeadersMessageLoop(mock.Ctx, mock.SentryClient, mock.downloader, &mock.ReceiveWg)
go sentry.RecvUploadHeadersMessageLoop(mock.Ctx, mock.SentryClient, mock.sentriesClient, &mock.ReceiveWg)
mock.StreamWg.Wait()
return mock
@ -488,20 +488,20 @@ func (ms *MockSentry) InsertChain(chain *core.ChainPack) error {
}); err != nil {
return err
}
if ms.downloader.Hd.IsBadHeader(chain.TopBlock.Hash()) {
if ms.sentriesClient.Hd.IsBadHeader(chain.TopBlock.Hash()) {
return fmt.Errorf("block %d %x was invalid", chain.TopBlock.NumberU64(), chain.TopBlock.Hash())
}
return nil
}
func (ms *MockSentry) SendPayloadRequest(message *engineapi.PayloadMessage) {
ms.downloader.Hd.BeaconRequestList.AddPayloadRequest(message)
ms.sentriesClient.Hd.BeaconRequestList.AddPayloadRequest(message)
}
func (ms *MockSentry) SendForkChoiceRequest(message *engineapi.ForkChoiceMessage) {
ms.downloader.Hd.BeaconRequestList.AddForkChoiceRequest(message)
ms.sentriesClient.Hd.BeaconRequestList.AddForkChoiceRequest(message)
}
func (ms *MockSentry) ReceivePayloadStatus() privateapi.PayloadStatus {
return <-ms.downloader.Hd.PayloadStatusCh
return <-ms.sentriesClient.Hd.PayloadStatusCh
}

View File

@ -84,7 +84,7 @@ func StageLoop(
log.Error("Staged Sync", "err", err)
if recoveryErr := hd.RecoverFromDb(db); recoveryErr != nil {
log.Error("Failed to recover header downloader", "err", recoveryErr)
log.Error("Failed to recover header sentriesClient", "err", recoveryErr)
}
time.Sleep(500 * time.Millisecond) // just to avoid too much similar errors in logs
continue
@ -247,7 +247,7 @@ func NewStagedSync(
p2pCfg p2p.Config,
cfg ethconfig.Config,
terminalTotalDifficulty *big.Int,
controlServer *sentry.ControlServerImpl,
controlServer *sentry.MultyClient,
tmpdir string,
notifications *stagedsync.Notifications,
snapshotDownloader proto_downloader.DownloaderClient,