prysm-pulse/beacon-chain/sync/initial-sync/blocks_queue_test.go
Victor Farazdagi c2615168d9
Applies assertion funcs to sync tests (#6603)
* applies assertion funcs to sync/initial-sync tests
* applies assertion funcs to sync/initial-sync tests
* gazelle
* Merge branch 'master' into sync-apply-testutils-assertions
* gazelle
* applies assertion funcs to sync/initial-sync tests
* applies assertion funcs to sync/initial-sync tests
* applies assertion funcs to sync/initial-sync tests
* applies assertion funcs to sync/initial-sync tests
* applies assertion funcs to sync/initial-sync tests
* Merge branch 'master' into sync-apply-testutils-assertions
2020-07-15 04:41:11 +00:00

285 lines
7.8 KiB
Go

package initialsync
import (
"context"
"fmt"
"testing"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
)
func TestBlocksQueueInitStartStop(t *testing.T) {
blockBatchLimit := uint64(flags.Get().BlockBatchLimit)
mc, p2p, _ := initializeTestServices(t, []uint64{}, []*peerData{})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
p2p: p2p,
})
t.Run("stop without start", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
queue := newBlocksQueue(ctx, &blocksQueueConfig{
headFetcher: mc,
highestExpectedSlot: blockBatchLimit,
})
assert.ErrorContains(t, errQueueTakesTooLongToStop.Error(), queue.stop())
})
t.Run("use default fetcher", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
queue := newBlocksQueue(ctx, &blocksQueueConfig{
headFetcher: mc,
highestExpectedSlot: blockBatchLimit,
})
assert.NoError(t, queue.start())
})
t.Run("stop timeout", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
queue := newBlocksQueue(ctx, &blocksQueueConfig{
headFetcher: mc,
highestExpectedSlot: blockBatchLimit,
})
assert.NoError(t, queue.start())
assert.ErrorContains(t, errQueueTakesTooLongToStop.Error(), queue.stop())
})
t.Run("check for leaked goroutines", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
highestExpectedSlot: blockBatchLimit,
})
assert.NoError(t, queue.start())
// Blocks up until all resources are reclaimed (or timeout is called)
assert.NoError(t, queue.stop())
select {
case <-queue.fetchedBlocks:
default:
t.Error("queue.fetchedBlocks channel is leaked")
}
select {
case <-fetcher.fetchResponses:
default:
t.Error("fetcher.fetchResponses channel is leaked")
}
})
t.Run("re-starting of stopped queue", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
highestExpectedSlot: blockBatchLimit,
})
assert.NoError(t, queue.start())
assert.NoError(t, queue.stop())
assert.ErrorContains(t, errQueueCtxIsDone.Error(), queue.start())
})
t.Run("multiple stopping attempts", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
highestExpectedSlot: blockBatchLimit,
})
assert.NoError(t, queue.start())
assert.NoError(t, queue.stop())
assert.NoError(t, queue.stop())
})
t.Run("cancellation", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
highestExpectedSlot: blockBatchLimit,
})
assert.NoError(t, queue.start())
cancel()
assert.NoError(t, queue.stop())
})
}
func TestBlocksQueueLoop(t *testing.T) {
tests := []struct {
name string
highestExpectedSlot uint64
expectedBlockSlots []uint64
peers []*peerData
}{
{
name: "Single peer with all blocks",
highestExpectedSlot: 251, // will be auto-fixed to 256 (to 8th epoch), by queue
expectedBlockSlots: makeSequence(1, 256),
peers: []*peerData{
{
blocks: makeSequence(1, 320),
finalizedEpoch: 8,
headSlot: 320,
},
},
},
{
name: "Multiple peers with all blocks",
highestExpectedSlot: 256,
expectedBlockSlots: makeSequence(1, 256),
peers: []*peerData{
{
blocks: makeSequence(1, 320),
finalizedEpoch: 8,
headSlot: 320,
},
{
blocks: makeSequence(1, 320),
finalizedEpoch: 8,
headSlot: 320,
},
{
blocks: makeSequence(1, 320),
finalizedEpoch: 8,
headSlot: 320,
},
{
blocks: makeSequence(1, 320),
finalizedEpoch: 8,
headSlot: 320,
},
{
blocks: makeSequence(1, 320),
finalizedEpoch: 8,
headSlot: 320,
},
},
},
{
name: "Multiple peers with skipped slots",
highestExpectedSlot: 576,
expectedBlockSlots: append(makeSequence(1, 64), makeSequence(500, 576)...), // up to 18th epoch
peers: []*peerData{
{
blocks: append(makeSequence(1, 64), makeSequence(500, 640)...),
finalizedEpoch: 18,
headSlot: 640,
},
{
blocks: append(makeSequence(1, 64), makeSequence(500, 640)...),
finalizedEpoch: 18,
headSlot: 640,
},
{
blocks: append(makeSequence(1, 64), makeSequence(500, 640)...),
finalizedEpoch: 18,
headSlot: 640,
},
},
},
{
name: "Multiple peers with failures",
highestExpectedSlot: 128,
expectedBlockSlots: makeSequence(1, 256),
peers: []*peerData{
{
blocks: makeSequence(1, 320),
finalizedEpoch: 8,
headSlot: 320,
failureSlots: makeSequence(32*3+1, 32*3+32),
},
{
blocks: makeSequence(1, 320),
finalizedEpoch: 8,
headSlot: 320,
failureSlots: makeSequence(1, 32*3),
},
{
blocks: makeSequence(1, 320),
finalizedEpoch: 8,
headSlot: 320,
},
{
blocks: makeSequence(1, 320),
finalizedEpoch: 8,
headSlot: 320,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mc, p2p, beaconDB := initializeTestServices(t, tt.expectedBlockSlots, tt.peers)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
headFetcher: mc,
p2p: p2p,
})
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
headFetcher: mc,
highestExpectedSlot: tt.highestExpectedSlot,
})
assert.NoError(t, queue.start())
processBlock := func(block *eth.SignedBeaconBlock) error {
if !beaconDB.HasBlock(ctx, bytesutil.ToBytes32(block.Block.ParentRoot)) {
return fmt.Errorf("beacon node doesn't have a block in db with root %#x", block.Block.ParentRoot)
}
root, err := stateutil.BlockRoot(block.Block)
if err != nil {
return err
}
if err := mc.ReceiveBlock(ctx, block, root); err != nil {
return err
}
return nil
}
var blocks []*eth.SignedBeaconBlock
for fetchedBlocks := range queue.fetchedBlocks {
for _, block := range fetchedBlocks {
if err := processBlock(block); err != nil {
continue
}
blocks = append(blocks, block)
}
}
assert.NoError(t, queue.stop())
if queue.headFetcher.HeadSlot() < tt.highestExpectedSlot {
t.Errorf("Not enough slots synced, want: %v, got: %v",
len(tt.expectedBlockSlots), queue.headFetcher.HeadSlot())
}
assert.Equal(t, len(tt.expectedBlockSlots), len(blocks), "Processes wrong number of blocks")
var receivedBlockSlots []uint64
for _, blk := range blocks {
receivedBlockSlots = append(receivedBlockSlots, blk.Block.Slot)
}
missing := sliceutil.NotUint64(sliceutil.IntersectionUint64(tt.expectedBlockSlots, receivedBlockSlots), tt.expectedBlockSlots)
if len(missing) > 0 {
t.Errorf("Missing blocks at slots %v", missing)
}
})
}
}