prysm-pulse/beacon-chain/sync/broadcast_bls_changes_test.go
Delweng 71b08a50b7
beacon-chain/sync: fix some datarace in go test (#13039)
* beacon-chain/sync: adjust dataflow incase of datarace

Signed-off-by: jsvisa <delweng@gmail.com>

* beacon-chain/p2p: use atomic..Bool instead of bool

Signed-off-by: jsvisa <delweng@gmail.com>

* beacon-chain/sync: use channel to control concurrent

Signed-off-by: jsvisa <delweng@gmail.com>

* Revert "beacon-chain/sync: use channel to control concurrent"

This reverts commit 500d5b1ecdeef175e3a5970867d8176abd0763d7.

Signed-off-by: jsvisa <delweng@gmail.com>

* Revert "beacon-chain/sync: adjust dataflow incase of datarace"

This reverts commit 0c819e96cafb89ccba314ca98a8a0fdfcbb70b8c.

Signed-off-by: jsvisa <delweng@gmail.com>

* beacon-chain/sync: waitForChain is invoked inside registerHandlers

Signed-off-by: jsvisa <delweng@gmail.com>

* beacon-chain/sync: no need to registerHandler for waitForChainStart testcase

Signed-off-by: jsvisa <delweng@gmail.com>

* beacon-chain/sync: set p2p.Digest before goroutine

Signed-off-by: jsvisa <delweng@gmail.com>

---------

Signed-off-by: jsvisa <delweng@gmail.com>
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2023-10-17 17:30:10 +00:00

159 lines
5.6 KiB
Go

package sync
import (
"context"
"testing"
"time"
mockChain "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/signing"
testingdb "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing"
doublylinkedtree "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/doubly-linked-tree"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/blstoexec"
mockp2p "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
mockSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync/testing"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/testing/assert"
"github.com/prysmaticlabs/prysm/v4/testing/require"
"github.com/prysmaticlabs/prysm/v4/testing/util"
"github.com/prysmaticlabs/prysm/v4/time/slots"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func TestBroadcastBLSChanges(t *testing.T) {
params.SetupTestConfigCleanup(t)
c := params.BeaconConfig()
c.CapellaForkEpoch = c.BellatrixForkEpoch.Add(2)
params.OverrideBeaconConfig(c)
chainService := &mockChain.ChainService{
Genesis: time.Now(),
ValidatorsRoot: [32]byte{'A'},
}
s := NewService(context.Background(),
WithP2P(mockp2p.NewTestP2P(t)),
WithInitialSync(&mockSync.Sync{IsSyncing: false}),
WithChainService(chainService),
WithOperationNotifier(chainService.OperationNotifier()),
WithBlsToExecPool(blstoexec.NewPool()),
)
var emptySig [96]byte
s.cfg.blsToExecPool.InsertBLSToExecChange(&ethpb.SignedBLSToExecutionChange{
Message: &ethpb.BLSToExecutionChange{
ValidatorIndex: 10,
FromBlsPubkey: make([]byte, 48),
ToExecutionAddress: make([]byte, 20),
},
Signature: emptySig[:],
})
capellaStart, err := slots.EpochStart(params.BeaconConfig().CapellaForkEpoch)
require.NoError(t, err)
s.broadcastBLSChanges(capellaStart + 1)
}
func TestRateBLSChanges(t *testing.T) {
logHook := logTest.NewGlobal()
params.SetupTestConfigCleanup(t)
c := params.BeaconConfig()
c.CapellaForkEpoch = c.BellatrixForkEpoch.Add(2)
params.OverrideBeaconConfig(c)
chainService := &mockChain.ChainService{
Genesis: time.Now(),
ValidatorsRoot: [32]byte{'A'},
}
p1 := mockp2p.NewTestP2P(t)
s := NewService(context.Background(),
WithP2P(p1),
WithInitialSync(&mockSync.Sync{IsSyncing: false}),
WithChainService(chainService),
WithOperationNotifier(chainService.OperationNotifier()),
WithBlsToExecPool(blstoexec.NewPool()),
)
beaconDB := testingdb.SetupDB(t)
s.cfg.stateGen = stategen.New(beaconDB, doublylinkedtree.New())
s.cfg.beaconDB = beaconDB
s.initCaches()
st, keys := util.DeterministicGenesisStateCapella(t, 256)
s.cfg.chain = &mockChain.ChainService{
ValidatorsRoot: [32]byte{'A'},
Genesis: time.Now().Add(-time.Second * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Duration(10)),
State: st,
}
for i := 0; i < 200; i++ {
message := &ethpb.BLSToExecutionChange{
ValidatorIndex: primitives.ValidatorIndex(i),
FromBlsPubkey: keys[i+1].PublicKey().Marshal(),
ToExecutionAddress: bytesutil.PadTo([]byte("address"), 20),
}
epoch := params.BeaconConfig().CapellaForkEpoch + 1
domain, err := signing.Domain(st.Fork(), epoch, params.BeaconConfig().DomainBLSToExecutionChange, st.GenesisValidatorsRoot())
assert.NoError(t, err)
htr, err := signing.SigningData(message.HashTreeRoot, domain)
assert.NoError(t, err)
signed := &ethpb.SignedBLSToExecutionChange{
Message: message,
Signature: keys[i+1].Sign(htr[:]).Marshal(),
}
s.cfg.blsToExecPool.InsertBLSToExecChange(signed)
}
require.Equal(t, false, p1.BroadcastCalled.Load())
slot, err := slots.EpochStart(params.BeaconConfig().CapellaForkEpoch)
require.NoError(t, err)
s.broadcastBLSChanges(slot)
time.Sleep(100 * time.Millisecond) // Need a sleep for the go routine to be ready
require.Equal(t, true, p1.BroadcastCalled.Load())
require.LogsDoNotContain(t, logHook, "could not")
p1.BroadcastCalled.Store(false)
time.Sleep(500 * time.Millisecond) // Need a sleep for the second batch to be broadcast
require.Equal(t, true, p1.BroadcastCalled.Load())
require.LogsDoNotContain(t, logHook, "could not")
}
func TestBroadcastBLSBatch_changes_slice(t *testing.T) {
message := &ethpb.BLSToExecutionChange{
FromBlsPubkey: make([]byte, 48),
ToExecutionAddress: make([]byte, 20),
}
signed := &ethpb.SignedBLSToExecutionChange{
Message: message,
Signature: make([]byte, 96),
}
changes := make([]*ethpb.SignedBLSToExecutionChange, 200)
for i := 0; i < len(changes); i++ {
changes[i] = signed
}
p1 := mockp2p.NewTestP2P(t)
chainService := &mockChain.ChainService{
Genesis: time.Now(),
ValidatorsRoot: [32]byte{'A'},
}
s := NewService(context.Background(),
WithP2P(p1),
WithInitialSync(&mockSync.Sync{IsSyncing: false}),
WithChainService(chainService),
WithOperationNotifier(chainService.OperationNotifier()),
WithBlsToExecPool(blstoexec.NewPool()),
)
beaconDB := testingdb.SetupDB(t)
s.cfg.stateGen = stategen.New(beaconDB, doublylinkedtree.New())
s.cfg.beaconDB = beaconDB
s.initCaches()
st, _ := util.DeterministicGenesisStateCapella(t, 32)
s.cfg.chain = &mockChain.ChainService{
ValidatorsRoot: [32]byte{'A'},
Genesis: time.Now().Add(-time.Second * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Duration(10)),
State: st,
}
s.broadcastBLSBatch(s.ctx, &changes)
require.Equal(t, 200-128, len(changes))
}