More test

This commit is contained in:
Alexey Sharp 2021-07-14 17:14:36 +01:00
parent e058e2c0b8
commit d685fd2715
3 changed files with 20 additions and 8 deletions

View File

@ -19,6 +19,7 @@ package txpool
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"io" "io"
"log" "log"
"sync" "sync"
@ -168,6 +169,7 @@ func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) {
} }
func (f *Fetch) handleInboundMessage(req *sentry.InboundMessage, sentryClient sentry.SentryClient) error { func (f *Fetch) handleInboundMessage(req *sentry.InboundMessage, sentryClient sentry.SentryClient) error {
fmt.Printf("got inbound message\n")
return nil return nil
} }

View File

@ -18,6 +18,7 @@ package txpool
import ( import (
"context" "context"
"sync"
"testing" "testing"
"github.com/ledgerwatch/erigon-lib/direct" "github.com/ledgerwatch/erigon-lib/direct"
@ -34,5 +35,16 @@ func TestFetch(t *testing.T) {
sentryClient := direct.NewSentryClientDirect(direct.ETH66, mock) sentryClient := direct.NewSentryClientDirect(direct.ETH66, mock)
fetch := NewFetch(ctx, []sentry.SentryClient{sentryClient}, genesisHash, networkId, forks) fetch := NewFetch(ctx, []sentry.SentryClient{sentryClient}, genesisHash, networkId, forks)
var wg sync.WaitGroup
fetch.SetWaitGroup(&wg)
fetch.Start() fetch.Start()
// Send one transaction id
wg.Add(1)
errs := mock.Send(&sentry.InboundMessage{Id: sentry.MessageId_NEW_POOLED_TRANSACTION_HASHES_66, Data: nil, PeerId: PeerId})
for i, err := range errs {
if err != nil {
t.Errorf("sending new pool tx hashes 66 (%d): %v", i, err)
}
}
wg.Wait()
} }

View File

@ -18,8 +18,8 @@ package txpool
import ( import (
"context" "context"
"sync"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
"google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/emptypb"
) )
@ -27,19 +27,19 @@ import (
type MockSentry struct { type MockSentry struct {
sentry.UnimplementedSentryServer sentry.UnimplementedSentryServer
streams map[sentry.MessageId][]sentry.Sentry_MessagesServer streams map[sentry.MessageId][]sentry.Sentry_MessagesServer
StreamWg sync.WaitGroup peersStreams []sentry.Sentry_PeersServer
peersStream sentry.Sentry_PeersServer
sentMessages []*sentry.OutboundMessageData sentMessages []*sentry.OutboundMessageData
ctx context.Context ctx context.Context
} }
func NewMockSentry(ctx context.Context) *MockSentry { func NewMockSentry(ctx context.Context) *MockSentry {
return &MockSentry{} return &MockSentry{ctx: ctx}
} }
var PeerId = gointerfaces.ConvertBytesToH512([]byte("12345"))
// Stream returns stream, waiting if necessary // Stream returns stream, waiting if necessary
func (ms *MockSentry) Send(req *sentry.InboundMessage) (errs []error) { func (ms *MockSentry) Send(req *sentry.InboundMessage) (errs []error) {
ms.StreamWg.Wait()
for _, stream := range ms.streams[req.Id] { for _, stream := range ms.streams[req.Id] {
if err := stream.Send(req); err != nil { if err := stream.Send(req); err != nil {
errs = append(errs, err) errs = append(errs, err)
@ -84,7 +84,6 @@ func (ms *MockSentry) Messages(req *sentry.MessagesRequest, stream sentry.Sentry
for _, id := range req.Ids { for _, id := range req.Ids {
ms.streams[id] = append(ms.streams[id], stream) ms.streams[id] = append(ms.streams[id], stream)
} }
ms.StreamWg.Done()
select { select {
case <-ms.ctx.Done(): case <-ms.ctx.Done():
return nil return nil
@ -97,8 +96,7 @@ func (ms *MockSentry) PeerCount(_ context.Context, req *sentry.PeerCountRequest)
} }
func (ms *MockSentry) Peers(req *sentry.PeersRequest, stream sentry.Sentry_PeersServer) error { func (ms *MockSentry) Peers(req *sentry.PeersRequest, stream sentry.Sentry_PeersServer) error {
ms.peersStream = stream ms.peersStreams = append(ms.peersStreams, stream)
ms.StreamWg.Done()
select { select {
case <-ms.ctx.Done(): case <-ms.ctx.Done():
return nil return nil