test success flow

This commit is contained in:
alex.sharov 2021-08-09 14:15:30 +07:00
parent fe9a2ef810
commit b15af6c774
2 changed files with 12 additions and 13 deletions

View File

@ -98,7 +98,16 @@ func (f *Fetch) ConnectSentries() {
}
}
func (f *Fetch) ConnectCore() {
go func() { f.stateChangesLoop(f.ctx, f.stateChangesClient) }()
go func() {
for {
select {
case <-f.ctx.Done():
return
default:
}
f.handleStateChanges(f.ctx, f.stateChangesClient)
}
}()
}
func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) {
@ -357,17 +366,7 @@ func (f *Fetch) handleNewPeer(req *sentry.PeersReply) error {
return nil
}
func (f *Fetch) stateChangesLoop(ctx context.Context, client remote.KVClient) {
for {
select {
case <-ctx.Done():
return
default:
}
f.stateChangesStream(ctx, client)
}
}
func (f *Fetch) stateChangesStream(ctx context.Context, client remote.KVClient) {
func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient) {
streamCtx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := client.StateChanges(streamCtx, &remote.StateChangeRequest{WithStorage: false, WithTransactions: true}, grpc.WaitForReady(true))

View File

@ -158,7 +158,7 @@ func TestOnNewBlock(t *testing.T) {
}
pool := &PoolMock{}
fetch := NewFetch(ctx, nil, genesisHash, networkId, nil, pool, stateChanges)
fetch.stateChangesStream(ctx, stateChanges)
fetch.handleStateChanges(ctx, stateChanges)
assert.Equal(t, 1, len(pool.OnNewBlockCalls()))
assert.Equal(t, 3, len(pool.OnNewBlockCalls()[0].MinedTxs.txs))
}