This commit is contained in:
alex.sharov 2021-08-09 11:00:36 +07:00
parent b9b5a7eea3
commit 585a2ad0a6
4 changed files with 228 additions and 16 deletions

View File

@ -0,0 +1,205 @@
// Code generated by moq; DO NOT EDIT.
// github.com/matryer/moq
package remote
import (
context "context"
types "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
grpc "google.golang.org/grpc"
emptypb "google.golang.org/protobuf/types/known/emptypb"
sync "sync"
)
// Ensure, that KVClientMock does implement KVClient.
// If this is not the case, regenerate this file with moq.
var _ KVClient = &KVClientMock{}
// KVClientMock is a mock implementation of KVClient.
//
// func TestSomethingThatUsesKVClient(t *testing.T) {
//
// // make and configure a mocked KVClient
// mockedKVClient := &KVClientMock{
// StateChangesFunc: func(ctx context.Context, in *StateChangeRequest, opts ...grpc.CallOption) (KV_StateChangesClient, error) {
// panic("mock out the StateChanges method")
// },
// TxFunc: func(ctx context.Context, opts ...grpc.CallOption) (KV_TxClient, error) {
// panic("mock out the Tx method")
// },
// VersionFunc: func(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*types.VersionReply, error) {
// panic("mock out the Version method")
// },
// }
//
// // use mockedKVClient in code that requires KVClient
// // and then make assertions.
//
// }
type KVClientMock struct {
// StateChangesFunc mocks the StateChanges method.
StateChangesFunc func(ctx context.Context, in *StateChangeRequest, opts ...grpc.CallOption) (KV_StateChangesClient, error)
// TxFunc mocks the Tx method.
TxFunc func(ctx context.Context, opts ...grpc.CallOption) (KV_TxClient, error)
// VersionFunc mocks the Version method.
VersionFunc func(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*types.VersionReply, error)
// calls tracks calls to the methods.
calls struct {
// StateChanges holds details about calls to the StateChanges method.
StateChanges []struct {
// Ctx is the ctx argument value.
Ctx context.Context
// In is the in argument value.
In *StateChangeRequest
// Opts is the opts argument value.
Opts []grpc.CallOption
}
// Tx holds details about calls to the Tx method.
Tx []struct {
// Ctx is the ctx argument value.
Ctx context.Context
// Opts is the opts argument value.
Opts []grpc.CallOption
}
// Version holds details about calls to the Version method.
Version []struct {
// Ctx is the ctx argument value.
Ctx context.Context
// In is the in argument value.
In *emptypb.Empty
// Opts is the opts argument value.
Opts []grpc.CallOption
}
}
lockStateChanges sync.RWMutex
lockTx sync.RWMutex
lockVersion sync.RWMutex
}
// StateChanges calls StateChangesFunc.
func (mock *KVClientMock) StateChanges(ctx context.Context, in *StateChangeRequest, opts ...grpc.CallOption) (KV_StateChangesClient, error) {
callInfo := struct {
Ctx context.Context
In *StateChangeRequest
Opts []grpc.CallOption
}{
Ctx: ctx,
In: in,
Opts: opts,
}
mock.lockStateChanges.Lock()
mock.calls.StateChanges = append(mock.calls.StateChanges, callInfo)
mock.lockStateChanges.Unlock()
if mock.StateChangesFunc == nil {
var (
kV_StateChangesClientOut KV_StateChangesClient
errOut error
)
return kV_StateChangesClientOut, errOut
}
return mock.StateChangesFunc(ctx, in, opts...)
}
// StateChangesCalls gets all the calls that were made to StateChanges.
// Check the length with:
// len(mockedKVClient.StateChangesCalls())
func (mock *KVClientMock) StateChangesCalls() []struct {
Ctx context.Context
In *StateChangeRequest
Opts []grpc.CallOption
} {
var calls []struct {
Ctx context.Context
In *StateChangeRequest
Opts []grpc.CallOption
}
mock.lockStateChanges.RLock()
calls = mock.calls.StateChanges
mock.lockStateChanges.RUnlock()
return calls
}
// Tx calls TxFunc.
func (mock *KVClientMock) Tx(ctx context.Context, opts ...grpc.CallOption) (KV_TxClient, error) {
callInfo := struct {
Ctx context.Context
Opts []grpc.CallOption
}{
Ctx: ctx,
Opts: opts,
}
mock.lockTx.Lock()
mock.calls.Tx = append(mock.calls.Tx, callInfo)
mock.lockTx.Unlock()
if mock.TxFunc == nil {
var (
kV_TxClientOut KV_TxClient
errOut error
)
return kV_TxClientOut, errOut
}
return mock.TxFunc(ctx, opts...)
}
// TxCalls gets all the calls that were made to Tx.
// Check the length with:
// len(mockedKVClient.TxCalls())
func (mock *KVClientMock) TxCalls() []struct {
Ctx context.Context
Opts []grpc.CallOption
} {
var calls []struct {
Ctx context.Context
Opts []grpc.CallOption
}
mock.lockTx.RLock()
calls = mock.calls.Tx
mock.lockTx.RUnlock()
return calls
}
// Version calls VersionFunc.
func (mock *KVClientMock) Version(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*types.VersionReply, error) {
callInfo := struct {
Ctx context.Context
In *emptypb.Empty
Opts []grpc.CallOption
}{
Ctx: ctx,
In: in,
Opts: opts,
}
mock.lockVersion.Lock()
mock.calls.Version = append(mock.calls.Version, callInfo)
mock.lockVersion.Unlock()
if mock.VersionFunc == nil {
var (
versionReplyOut *types.VersionReply
errOut error
)
return versionReplyOut, errOut
}
return mock.VersionFunc(ctx, in, opts...)
}
// VersionCalls gets all the calls that were made to Version.
// Check the length with:
// len(mockedKVClient.VersionCalls())
func (mock *KVClientMock) VersionCalls() []struct {
Ctx context.Context
In *emptypb.Empty
Opts []grpc.CallOption
} {
var calls []struct {
Ctx context.Context
In *emptypb.Empty
Opts []grpc.CallOption
}
mock.lockVersion.RLock()
calls = mock.calls.Version
mock.lockVersion.RUnlock()
return calls
}

View File

@ -1,3 +1,4 @@
package gointerfaces
//go:generate moq -stub -out ./sentry/mocks.go ./sentry SentryServer SentryClient
//go:generate moq -stub -out ./remote/mocks.go ./remote KVClient

View File

@ -39,12 +39,13 @@ import (
// genesis hash and list of forks, but with zero max block and total difficulty
// Sentry should have a logic not to overwrite statusData with messages from tx pool
type Fetch struct {
ctx context.Context // Context used for cancellation and closing of the fetcher
sentryClients []sentry.SentryClient // sentry clients that will be used for accessing the network
statusData *sentry.StatusData // Status data used for "handshaking" with sentries
pool Pool // Transaction pool implementation
wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests)
logger log.Logger
ctx context.Context // Context used for cancellation and closing of the fetcher
sentryClients []sentry.SentryClient // sentry clients that will be used for accessing the network
statusData *sentry.StatusData // Status data used for "handshaking" with sentries
pool Pool // Transaction pool implementation
wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests)
stateChangesClient remote.KVClient
logger log.Logger
}
type Timings struct {
@ -68,6 +69,7 @@ func NewFetch(ctx context.Context,
networkId uint64,
forks []uint64,
pool Pool,
stateChangesClient remote.KVClient,
logger log.Logger,
) *Fetch {
statusData := &sentry.StatusData{
@ -81,11 +83,12 @@ func NewFetch(ctx context.Context,
},
}
return &Fetch{
ctx: ctx,
sentryClients: sentryClients,
statusData: statusData,
pool: pool,
logger: logger,
ctx: ctx,
sentryClients: sentryClients,
statusData: statusData,
pool: pool,
logger: logger,
stateChangesClient: stateChangesClient,
}
}
@ -93,8 +96,8 @@ func (f *Fetch) SetWaitGroup(wg *sync.WaitGroup) {
f.wg = wg
}
// Start initialises connection to the sentry
func (f *Fetch) Start() {
// ConnectSentries initialises connection to the sentry
func (f *Fetch) ConnectSentries() {
for i := range f.sentryClients {
go func(i int) {
f.receiveMessageLoop(f.sentryClients[i])
@ -103,7 +106,9 @@ func (f *Fetch) Start() {
f.receivePeerLoop(f.sentryClients[i])
}(i)
}
//go func() { f.stateChangesLoop(f.ctx, nil) }()
}
func (f *Fetch) ConnectCore() {
go func() { f.stateChangesLoop(f.ctx, f.stateChangesClient) }()
}
func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) {

View File

@ -23,6 +23,7 @@ import (
"testing"
"github.com/ledgerwatch/erigon-lib/direct"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/gointerfaces/sentry"
"github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/log/v3"
@ -43,11 +44,11 @@ func TestFetch(t *testing.T) {
sentryClient := direct.NewSentryClientDirect(direct.ETH66, m)
pool := &PoolMock{}
fetch := NewFetch(ctx, []sentry.SentryClient{sentryClient}, genesisHash, networkId, forks, pool, logger)
fetch := NewFetch(ctx, []sentry.SentryClient{sentryClient}, genesisHash, networkId, forks, pool, &remote.KVClientMock{}, logger)
var wg sync.WaitGroup
fetch.SetWaitGroup(&wg)
m.StreamWg.Add(2)
fetch.Start()
fetch.ConnectSentries()
m.StreamWg.Wait()
// Send one transaction id
wg.Add(1)