diff --git a/txpool/fetch.go b/txpool/fetch.go index 33553d7e5..540deff9e 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -98,7 +98,16 @@ func (f *Fetch) ConnectSentries() { } } func (f *Fetch) ConnectCore() { - go func() { f.stateChangesLoop(f.ctx, f.stateChangesClient) }() + go func() { + for { + select { + case <-f.ctx.Done(): + return + default: + } + f.handleStateChanges(f.ctx, f.stateChangesClient) + } + }() } func (f *Fetch) receiveMessageLoop(sentryClient sentry.SentryClient) { @@ -357,17 +366,7 @@ func (f *Fetch) handleNewPeer(req *sentry.PeersReply) error { return nil } -func (f *Fetch) stateChangesLoop(ctx context.Context, client remote.KVClient) { - for { - select { - case <-ctx.Done(): - return - default: - } - f.stateChangesStream(ctx, client) - } -} -func (f *Fetch) stateChangesStream(ctx context.Context, client remote.KVClient) { +func (f *Fetch) handleStateChanges(ctx context.Context, client remote.KVClient) { streamCtx, cancel := context.WithCancel(ctx) defer cancel() stream, err := client.StateChanges(streamCtx, &remote.StateChangeRequest{WithStorage: false, WithTransactions: true}, grpc.WaitForReady(true)) diff --git a/txpool/fetch_test.go b/txpool/fetch_test.go index 81202675e..651c09f56 100644 --- a/txpool/fetch_test.go +++ b/txpool/fetch_test.go @@ -158,7 +158,7 @@ func TestOnNewBlock(t *testing.T) { } pool := &PoolMock{} fetch := NewFetch(ctx, nil, genesisHash, networkId, nil, pool, stateChanges) - fetch.stateChangesStream(ctx, stateChanges) + fetch.handleStateChanges(ctx, stateChanges) assert.Equal(t, 1, len(pool.OnNewBlockCalls())) assert.Equal(t, 3, len(pool.OnNewBlockCalls()[0].MinedTxs.txs)) }