mirror of
https://gitlab.com/pulsechaincom/erigon-pulse.git
synced 2025-01-16 15:48:46 +00:00
81ea5bab78
* Initial commit * Add sentry gRPC interface * p2psentry directory * Update README.md * Update README.md * Update README.md * Add go package * Correct syntax * add external downloader interface (#2) * Add txpool (#3) * Add private API (#4) * Invert control.proto, add PeerMinBlock, Separare incoming Tx message into a separate stream (#5) Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Separate upload messages into its own stream (#6) Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> * Only send changed accounts to listeners (#7) * Txpool interface doc (#9) * More additions * More additions * Fix locking * Intermediate * Fix separation of phases * Intermediate * Fix test * More transformations * New simplified way of downloading headers * Fix hard-coded header sync * Fixed syncing near the tip of the chain * Add architecture diagram source and picture (#10) * More fixes * rename tip to link * Use preverified hashes instead of preverified headers * Fix preverified hashes generation * more parametrisation * Continue parametrisation * Fix grpc data limit, interruption of headers stage * Add ropsten preverified hashes * Typed hashes (#11) * Typed hashes * Fix PeerId * 64-bit tx nonce * Disable penalties * Add goerli settings, bootstrap nodes * Try to fix goerly sync * Remove interfaces * Add proper golang packages, max_block into p2p sentry Status * Prepare for proto overhaul * Squashed 'interfaces/' content from commit ce36053c2 git-subtree-dir: interfaces git-subtree-split: ce36053c24db2f56e48ac752808de60afa1dfb4b * Change EtherReply to address * Adaptations to new types * Switch to new types * Fixes * Fix formatting * Fix lint * Lint fixes, reverse order in types * Fix lint * Fix lint * Fix lint * Fix test * Not supporting eth/66 yet * Fix shutdown * Fix lint * Fix lint * Fix lint * return stopped check Co-authored-by: Artem Vorotnikov <artem@vorotnikov.me> Co-authored-by: b00ris <b00ris@mail.ru> Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local> Co-authored-by: lightclient <14004106+lightclient@users.noreply.github.com> Co-authored-by: canepat <16927169+canepat@users.noreply.github.com>
115 lines
4.8 KiB
Go
115 lines
4.8 KiB
Go
package download
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/golang/protobuf/ptypes/empty"
|
|
proto_sentry "github.com/ledgerwatch/turbo-geth/gointerfaces/sentry"
|
|
"github.com/ledgerwatch/turbo-geth/log"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/protobuf/types/known/emptypb"
|
|
)
|
|
|
|
// Contains implementations of SentryServer, SentryClient, ControlClient, and ControlServer, that may be linked to each other
|
|
// SentryClient is linked directly to the SentryServer, for example, so any function call on the instance of the SentryClient
|
|
// cause invocations directly on the corresponding instance of the SentryServer. However, the link between SentryClient and
|
|
// SentryServer is established outside of the constructor. This means that the reference from the SentyClient to the corresponding
|
|
// SentryServer can be injected at any point in time.
|
|
|
|
// SentryClientDirect implements SentryClient interface by connecting the instance of the client directly with the corresponding
|
|
// instance of SentryServer
|
|
type SentryClientDirect struct {
|
|
server proto_sentry.SentryServer
|
|
}
|
|
|
|
// SetServer injects a reference to the SentryServer into the client
|
|
func (scd *SentryClientDirect) SetServer(sentryServer proto_sentry.SentryServer) {
|
|
scd.server = sentryServer
|
|
}
|
|
|
|
func (scd *SentryClientDirect) PenalizePeer(ctx context.Context, in *proto_sentry.PenalizePeerRequest, opts ...grpc.CallOption) (*empty.Empty, error) {
|
|
return scd.server.PenalizePeer(ctx, in)
|
|
}
|
|
|
|
func (scd *SentryClientDirect) PeerMinBlock(ctx context.Context, in *proto_sentry.PeerMinBlockRequest, opts ...grpc.CallOption) (*empty.Empty, error) {
|
|
return scd.server.PeerMinBlock(ctx, in)
|
|
}
|
|
|
|
func (scd *SentryClientDirect) SendMessageByMinBlock(ctx context.Context, in *proto_sentry.SendMessageByMinBlockRequest, opts ...grpc.CallOption) (*proto_sentry.SentPeers, error) {
|
|
return scd.server.SendMessageByMinBlock(ctx, in)
|
|
}
|
|
|
|
func (scd *SentryClientDirect) SendMessageById(ctx context.Context, in *proto_sentry.SendMessageByIdRequest, opts ...grpc.CallOption) (*proto_sentry.SentPeers, error) {
|
|
return scd.server.SendMessageById(ctx, in)
|
|
}
|
|
|
|
func (scd *SentryClientDirect) SendMessageToRandomPeers(ctx context.Context, in *proto_sentry.SendMessageToRandomPeersRequest, opts ...grpc.CallOption) (*proto_sentry.SentPeers, error) {
|
|
return scd.server.SendMessageToRandomPeers(ctx, in)
|
|
}
|
|
|
|
func (scd *SentryClientDirect) SendMessageToAll(ctx context.Context, in *proto_sentry.OutboundMessageData, opts ...grpc.CallOption) (*proto_sentry.SentPeers, error) {
|
|
return scd.server.SendMessageToAll(ctx, in)
|
|
}
|
|
|
|
func (scd *SentryClientDirect) SetStatus(ctx context.Context, in *proto_sentry.StatusData, opts ...grpc.CallOption) (*emptypb.Empty, error) {
|
|
return scd.server.SetStatus(ctx, in)
|
|
}
|
|
|
|
// implements proto_sentry.Sentry_ReceiveMessagesServer
|
|
type SentryReceiveServerDirect struct {
|
|
messageCh chan *proto_sentry.InboundMessage
|
|
grpc.ServerStream
|
|
}
|
|
|
|
func (s *SentryReceiveServerDirect) Send(m *proto_sentry.InboundMessage) error {
|
|
s.messageCh <- m
|
|
return nil
|
|
}
|
|
|
|
type SentryReceiveClientDirect struct {
|
|
messageCh chan *proto_sentry.InboundMessage
|
|
grpc.ClientStream
|
|
}
|
|
|
|
func (c *SentryReceiveClientDirect) Recv() (*proto_sentry.InboundMessage, error) {
|
|
m := <-c.messageCh
|
|
return m, nil
|
|
}
|
|
|
|
func (scd *SentryClientDirect) ReceiveMessages(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (proto_sentry.Sentry_ReceiveMessagesClient, error) {
|
|
messageCh := make(chan *proto_sentry.InboundMessage, 16384)
|
|
streamServer := &SentryReceiveServerDirect{messageCh: messageCh}
|
|
go func() {
|
|
if err := scd.server.ReceiveMessages(&empty.Empty{}, streamServer); err != nil {
|
|
log.Error("ReceiveMessages returned", "error", err)
|
|
}
|
|
close(messageCh)
|
|
}()
|
|
return &SentryReceiveClientDirect{messageCh: messageCh}, nil
|
|
}
|
|
|
|
func (scd *SentryClientDirect) ReceiveUploadMessages(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (proto_sentry.Sentry_ReceiveUploadMessagesClient, error) {
|
|
messageCh := make(chan *proto_sentry.InboundMessage, 16384)
|
|
streamServer := &SentryReceiveServerDirect{messageCh: messageCh}
|
|
go func() {
|
|
if err := scd.server.ReceiveUploadMessages(&empty.Empty{}, streamServer); err != nil {
|
|
log.Error("ReceiveUploadMessages returned", "error", err)
|
|
}
|
|
close(messageCh)
|
|
}()
|
|
return &SentryReceiveClientDirect{messageCh: messageCh}, nil
|
|
}
|
|
|
|
func (scd *SentryClientDirect) ReceiveTxMessages(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (proto_sentry.Sentry_ReceiveTxMessagesClient, error) {
|
|
messageCh := make(chan *proto_sentry.InboundMessage, 16384)
|
|
streamServer := &SentryReceiveServerDirect{messageCh: messageCh}
|
|
go func() {
|
|
if err := scd.server.ReceiveTxMessages(&empty.Empty{}, streamServer); err != nil {
|
|
log.Error("ReceiveTxMessages returned", "error", err)
|
|
}
|
|
close(messageCh)
|
|
}()
|
|
return &SentryReceiveClientDirect{messageCh: messageCh}, nil
|
|
}
|