From 585a2ad0a68148af645c058c00512b36d904cb34 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Mon, 9 Aug 2021 11:00:36 +0700 Subject: [PATCH] mos --- gointerfaces/remote/mocks.go | 205 +++++++++++++++++++++++++++++++++++ gointerfaces/test_util.go | 1 + txpool/fetch.go | 33 +++--- txpool/fetch_test.go | 5 +- 4 files changed, 228 insertions(+), 16 deletions(-) create mode 100644 gointerfaces/remote/mocks.go diff --git a/gointerfaces/remote/mocks.go b/gointerfaces/remote/mocks.go new file mode 100644 index 000000000..17890ba3d --- /dev/null +++ b/gointerfaces/remote/mocks.go @@ -0,0 +1,205 @@ +// Code generated by moq; DO NOT EDIT. +// github.com/matryer/moq + +package remote + +import ( + context "context" + types "github.com/ledgerwatch/erigon-lib/gointerfaces/types" + grpc "google.golang.org/grpc" + emptypb "google.golang.org/protobuf/types/known/emptypb" + sync "sync" +) + +// Ensure, that KVClientMock does implement KVClient. +// If this is not the case, regenerate this file with moq. +var _ KVClient = &KVClientMock{} + +// KVClientMock is a mock implementation of KVClient. +// +// func TestSomethingThatUsesKVClient(t *testing.T) { +// +// // make and configure a mocked KVClient +// mockedKVClient := &KVClientMock{ +// StateChangesFunc: func(ctx context.Context, in *StateChangeRequest, opts ...grpc.CallOption) (KV_StateChangesClient, error) { +// panic("mock out the StateChanges method") +// }, +// TxFunc: func(ctx context.Context, opts ...grpc.CallOption) (KV_TxClient, error) { +// panic("mock out the Tx method") +// }, +// VersionFunc: func(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*types.VersionReply, error) { +// panic("mock out the Version method") +// }, +// } +// +// // use mockedKVClient in code that requires KVClient +// // and then make assertions. +// +// } +type KVClientMock struct { + // StateChangesFunc mocks the StateChanges method. + StateChangesFunc func(ctx context.Context, in *StateChangeRequest, opts ...grpc.CallOption) (KV_StateChangesClient, error) + + // TxFunc mocks the Tx method. + TxFunc func(ctx context.Context, opts ...grpc.CallOption) (KV_TxClient, error) + + // VersionFunc mocks the Version method. + VersionFunc func(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*types.VersionReply, error) + + // calls tracks calls to the methods. + calls struct { + // StateChanges holds details about calls to the StateChanges method. + StateChanges []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // In is the in argument value. + In *StateChangeRequest + // Opts is the opts argument value. + Opts []grpc.CallOption + } + // Tx holds details about calls to the Tx method. + Tx []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // Opts is the opts argument value. + Opts []grpc.CallOption + } + // Version holds details about calls to the Version method. + Version []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // In is the in argument value. + In *emptypb.Empty + // Opts is the opts argument value. + Opts []grpc.CallOption + } + } + lockStateChanges sync.RWMutex + lockTx sync.RWMutex + lockVersion sync.RWMutex +} + +// StateChanges calls StateChangesFunc. +func (mock *KVClientMock) StateChanges(ctx context.Context, in *StateChangeRequest, opts ...grpc.CallOption) (KV_StateChangesClient, error) { + callInfo := struct { + Ctx context.Context + In *StateChangeRequest + Opts []grpc.CallOption + }{ + Ctx: ctx, + In: in, + Opts: opts, + } + mock.lockStateChanges.Lock() + mock.calls.StateChanges = append(mock.calls.StateChanges, callInfo) + mock.lockStateChanges.Unlock() + if mock.StateChangesFunc == nil { + var ( + kV_StateChangesClientOut KV_StateChangesClient + errOut error + ) + return kV_StateChangesClientOut, errOut + } + return mock.StateChangesFunc(ctx, in, opts...) +} + +// StateChangesCalls gets all the calls that were made to StateChanges. +// Check the length with: +// len(mockedKVClient.StateChangesCalls()) +func (mock *KVClientMock) StateChangesCalls() []struct { + Ctx context.Context + In *StateChangeRequest + Opts []grpc.CallOption +} { + var calls []struct { + Ctx context.Context + In *StateChangeRequest + Opts []grpc.CallOption + } + mock.lockStateChanges.RLock() + calls = mock.calls.StateChanges + mock.lockStateChanges.RUnlock() + return calls +} + +// Tx calls TxFunc. +func (mock *KVClientMock) Tx(ctx context.Context, opts ...grpc.CallOption) (KV_TxClient, error) { + callInfo := struct { + Ctx context.Context + Opts []grpc.CallOption + }{ + Ctx: ctx, + Opts: opts, + } + mock.lockTx.Lock() + mock.calls.Tx = append(mock.calls.Tx, callInfo) + mock.lockTx.Unlock() + if mock.TxFunc == nil { + var ( + kV_TxClientOut KV_TxClient + errOut error + ) + return kV_TxClientOut, errOut + } + return mock.TxFunc(ctx, opts...) +} + +// TxCalls gets all the calls that were made to Tx. +// Check the length with: +// len(mockedKVClient.TxCalls()) +func (mock *KVClientMock) TxCalls() []struct { + Ctx context.Context + Opts []grpc.CallOption +} { + var calls []struct { + Ctx context.Context + Opts []grpc.CallOption + } + mock.lockTx.RLock() + calls = mock.calls.Tx + mock.lockTx.RUnlock() + return calls +} + +// Version calls VersionFunc. +func (mock *KVClientMock) Version(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*types.VersionReply, error) { + callInfo := struct { + Ctx context.Context + In *emptypb.Empty + Opts []grpc.CallOption + }{ + Ctx: ctx, + In: in, + Opts: opts, + } + mock.lockVersion.Lock() + mock.calls.Version = append(mock.calls.Version, callInfo) + mock.lockVersion.Unlock() + if mock.VersionFunc == nil { + var ( + versionReplyOut *types.VersionReply + errOut error + ) + return versionReplyOut, errOut + } + return mock.VersionFunc(ctx, in, opts...) +} + +// VersionCalls gets all the calls that were made to Version. +// Check the length with: +// len(mockedKVClient.VersionCalls()) +func (mock *KVClientMock) VersionCalls() []struct { + Ctx context.Context + In *emptypb.Empty + Opts []grpc.CallOption +} { + var calls []struct { + Ctx context.Context + In *emptypb.Empty + Opts []grpc.CallOption + } + mock.lockVersion.RLock() + calls = mock.calls.Version + mock.lockVersion.RUnlock() + return calls +} diff --git a/gointerfaces/test_util.go b/gointerfaces/test_util.go index 500328691..44da34ad5 100644 --- a/gointerfaces/test_util.go +++ b/gointerfaces/test_util.go @@ -1,3 +1,4 @@ package gointerfaces //go:generate moq -stub -out ./sentry/mocks.go ./sentry SentryServer SentryClient +//go:generate moq -stub -out ./remote/mocks.go ./remote KVClient diff --git a/txpool/fetch.go b/txpool/fetch.go index 2bca9a6a1..0cb4cfe66 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -39,12 +39,13 @@ import ( // genesis hash and list of forks, but with zero max block and total difficulty // Sentry should have a logic not to overwrite statusData with messages from tx pool type Fetch struct { - ctx context.Context // Context used for cancellation and closing of the fetcher - sentryClients []sentry.SentryClient // sentry clients that will be used for accessing the network - statusData *sentry.StatusData // Status data used for "handshaking" with sentries - pool Pool // Transaction pool implementation - wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests) - logger log.Logger + ctx context.Context // Context used for cancellation and closing of the fetcher + sentryClients []sentry.SentryClient // sentry clients that will be used for accessing the network + statusData *sentry.StatusData // Status data used for "handshaking" with sentries + pool Pool // Transaction pool implementation + wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests) + stateChangesClient remote.KVClient + logger log.Logger } type Timings struct { @@ -68,6 +69,7 @@ func NewFetch(ctx context.Context, networkId uint64, forks []uint64, pool Pool, + stateChangesClient remote.KVClient, logger log.Logger, ) *Fetch { statusData := &sentry.StatusData{ @@ -81,11 +83,12 @@ func NewFetch(ctx context.Context, }, } return &Fetch{ - ctx: ctx, - sentryClients: sentryClients, - statusData: statusData, - pool: pool, - logger: logger, + ctx: ctx, + sentryClients: sentryClients, + statusData: statusData, + pool: pool, + logger: logger, + stateChangesClient: stateChangesClient, } } @@ -93,8 +96,8 @@ func (f *Fetch) SetWaitGroup(wg *sync.WaitGroup) { f.wg = wg } -// Start initialises connection to the sentry -func (f *Fetch) Start() { +// ConnectSentries initialises connection to the sentry +func (f *Fetch) ConnectSentries() { for i := range f.sentryClients { go func(i int) { f.receiveMessageLoop(f.sentryClients[i]) @@ -103,7 +106,9 @@ func (f *Fetch) Start() { f.receivePeerLoop(f.sentryClients[i]) }(i) } - //go func() { f.stateChangesLoop(f.ctx, nil) }() +} +func (f *Fetch) ConnectCore() { + go func() { f.stateChangesLoop(f.ctx, f.stateChangesClient) }() } func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) { diff --git a/txpool/fetch_test.go b/txpool/fetch_test.go index 502cb28c9..88ef1ae1e 100644 --- a/txpool/fetch_test.go +++ b/txpool/fetch_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/ledgerwatch/erigon-lib/direct" + "github.com/ledgerwatch/erigon-lib/gointerfaces/remote" "github.com/ledgerwatch/erigon-lib/gointerfaces/sentry" "github.com/ledgerwatch/erigon-lib/gointerfaces/types" "github.com/ledgerwatch/log/v3" @@ -43,11 +44,11 @@ func TestFetch(t *testing.T) { sentryClient := direct.NewSentryClientDirect(direct.ETH66, m) pool := &PoolMock{} - fetch := NewFetch(ctx, []sentry.SentryClient{sentryClient}, genesisHash, networkId, forks, pool, logger) + fetch := NewFetch(ctx, []sentry.SentryClient{sentryClient}, genesisHash, networkId, forks, pool, &remote.KVClientMock{}, logger) var wg sync.WaitGroup fetch.SetWaitGroup(&wg) m.StreamWg.Add(2) - fetch.Start() + fetch.ConnectSentries() m.StreamWg.Wait() // Send one transaction id wg.Add(1)