beacon-node/rpc: fix go test datarace (#13018)

* beacon-chain/p2p: ust atomic.Bool instead of bool

Signed-off-by: jsvisa <delweng@gmail.com>

* beacon-chain/p2p,rpc: read mock.BroadcastMessages with lock

Signed-off-by: jsvisa <delweng@gmail.com>

* beacon-chain/p2p,rpc: read attestation with lock

Signed-off-by: jsvisa <delweng@gmail.com>

* beacon-chain/rpc: fix typo

Signed-off-by: jsvisa <delweng@gmail.com>

* beacon-chain/p2p: typo

Signed-off-by: jsvisa <delweng@gmail.com>

---------

Signed-off-by: jsvisa <delweng@gmail.com>
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
This commit is contained in:
Delweng 2023-10-22 23:12:55 +08:00 committed by GitHub
parent f91efafe24
commit 29f8880638
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 53 additions and 31 deletions

View File

@ -2,6 +2,8 @@ package testing
import (
"context"
"sync"
"sync/atomic"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"google.golang.org/protobuf/proto"
@ -9,33 +11,53 @@ import (
// MockBroadcaster implements p2p.Broadcaster for testing.
type MockBroadcaster struct {
BroadcastCalled bool
BroadcastCalled atomic.Bool
BroadcastMessages []proto.Message
BroadcastAttestations []*ethpb.Attestation
msgLock sync.Mutex
attLock sync.Mutex
}
// Broadcast records a broadcast occurred.
func (m *MockBroadcaster) Broadcast(_ context.Context, msg proto.Message) error {
m.BroadcastCalled = true
m.BroadcastCalled.Store(true)
m.msgLock.Lock()
defer m.msgLock.Unlock()
m.BroadcastMessages = append(m.BroadcastMessages, msg)
return nil
}
// BroadcastAttestation records a broadcast occurred.
func (m *MockBroadcaster) BroadcastAttestation(_ context.Context, _ uint64, a *ethpb.Attestation) error {
m.BroadcastCalled = true
m.BroadcastCalled.Store(true)
m.attLock.Lock()
defer m.attLock.Unlock()
m.BroadcastAttestations = append(m.BroadcastAttestations, a)
return nil
}
// BroadcastSyncCommitteeMessage records a broadcast occurred.
func (m *MockBroadcaster) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ *ethpb.SyncCommitteeMessage) error {
m.BroadcastCalled = true
m.BroadcastCalled.Store(true)
return nil
}
// BroadcastBlob broadcasts a blob for mock.
func (m *MockBroadcaster) BroadcastBlob(context.Context, uint64, *ethpb.SignedBlobSidecar) error {
m.BroadcastCalled = true
m.BroadcastCalled.Store(true)
return nil
}
// NumMessages returns the number of messages broadcasted.
func (m *MockBroadcaster) NumMessages() int {
m.msgLock.Lock()
defer m.msgLock.Unlock()
return len(m.BroadcastMessages)
}
// NumAttestations returns the number of attestations broadcasted.
func (m *MockBroadcaster) NumAttestations() int {
m.attLock.Lock()
defer m.attLock.Unlock()
return len(m.BroadcastAttestations)
}

View File

@ -236,8 +236,8 @@ func TestSubmitAttestations(t *testing.T) {
s.SubmitAttestations(writer, request)
assert.Equal(t, http.StatusOK, writer.Code)
assert.Equal(t, true, broadcaster.BroadcastCalled)
assert.Equal(t, 1, len(broadcaster.BroadcastAttestations))
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, 1, broadcaster.NumAttestations())
assert.Equal(t, "0x03", hexutil.Encode(broadcaster.BroadcastAttestations[0].AggregationBits))
assert.Equal(t, "0x8146f4397bfd8fd057ebbcd6a67327bdc7ed5fb650533edcb6377b650dea0b6da64c14ecd60846d5c0a0cd43893d6972092500f82c9d8a955e2b58c5ed3cbe885d84008ace6bd86ba9e23652f58e2ec207cec494c916063257abf285b9b15b15", hexutil.Encode(broadcaster.BroadcastAttestations[0].Signature))
assert.Equal(t, primitives.Slot(0), broadcaster.BroadcastAttestations[0].Data.Slot)
@ -263,8 +263,8 @@ func TestSubmitAttestations(t *testing.T) {
s.SubmitAttestations(writer, request)
assert.Equal(t, http.StatusOK, writer.Code)
assert.Equal(t, true, broadcaster.BroadcastCalled)
assert.Equal(t, 2, len(broadcaster.BroadcastAttestations))
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, 2, broadcaster.NumAttestations())
assert.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
})
t.Run("no body", func(t *testing.T) {
@ -390,7 +390,7 @@ func TestSubmitVoluntaryExit(t *testing.T) {
pendingExits, err := s.VoluntaryExitsPool.PendingExits()
require.NoError(t, err)
require.Equal(t, 1, len(pendingExits))
assert.Equal(t, true, broadcaster.BroadcastCalled)
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
})
t.Run("across fork", func(t *testing.T) {
params.SetupTestConfigCleanup(t)
@ -422,7 +422,7 @@ func TestSubmitVoluntaryExit(t *testing.T) {
pendingExits, err := s.VoluntaryExitsPool.PendingExits()
require.NoError(t, err)
require.Equal(t, 1, len(pendingExits))
assert.Equal(t, true, broadcaster.BroadcastCalled)
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
})
t.Run("no body", func(t *testing.T) {
request := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
@ -534,7 +534,7 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
assert.Equal(t, "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2", hexutil.Encode(msgsInPool[0].BlockRoot))
assert.Equal(t, primitives.ValidatorIndex(1), msgsInPool[0].ValidatorIndex)
assert.Equal(t, "0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505", hexutil.Encode(msgsInPool[0].Signature))
assert.Equal(t, true, broadcaster.BroadcastCalled)
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
})
t.Run("multiple", func(t *testing.T) {
broadcaster := &p2pMock.MockBroadcaster{}
@ -565,7 +565,7 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
msgsInPool, err = s.CoreService.SyncCommitteePool.SyncCommitteeMessages(2)
require.NoError(t, err)
require.Equal(t, 1, len(msgsInPool))
assert.Equal(t, true, broadcaster.BroadcastCalled)
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
})
t.Run("invalid", func(t *testing.T) {
broadcaster := &p2pMock.MockBroadcaster{}
@ -595,7 +595,7 @@ func TestSubmitSyncCommitteeSignatures(t *testing.T) {
msgsInPool, err := s.CoreService.SyncCommitteePool.SyncCommitteeMessages(1)
require.NoError(t, err)
assert.Equal(t, 0, len(msgsInPool))
assert.Equal(t, false, broadcaster.BroadcastCalled)
assert.Equal(t, false, broadcaster.BroadcastCalled.Load())
})
t.Run("empty", func(t *testing.T) {
s := &Server{}
@ -756,7 +756,7 @@ func TestSubmitSignedBLSToExecutionChanges_Ok(t *testing.T) {
s.SubmitBLSToExecutionChanges(writer, request)
assert.Equal(t, http.StatusOK, writer.Code)
time.Sleep(100 * time.Millisecond) // Delay to let the routine start
assert.Equal(t, true, broadcaster.BroadcastCalled)
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, numValidators, len(broadcaster.BroadcastMessages))
poolChanges, err := s.BLSChangesPool.PendingBLSToExecChanges()
@ -874,7 +874,7 @@ func TestSubmitSignedBLSToExecutionChanges_Bellatrix(t *testing.T) {
assert.Equal(t, http.StatusOK, writer.Code)
// Check that we didn't broadcast the messages but did in fact fill in
// the pool
assert.Equal(t, false, broadcaster.BroadcastCalled)
assert.Equal(t, false, broadcaster.BroadcastCalled.Load())
poolChanges, err := s.BLSChangesPool.PendingBLSToExecChanges()
require.Equal(t, len(poolChanges), len(signedChanges))
@ -979,7 +979,7 @@ func TestSubmitSignedBLSToExecutionChanges_Failures(t *testing.T) {
assert.Equal(t, http.StatusBadRequest, writer.Code)
time.Sleep(10 * time.Millisecond) // Delay to allow the routine to start
require.StringContains(t, "One or more BLSToExecutionChange failed validation", writer.Body.String())
assert.Equal(t, true, broadcaster.BroadcastCalled)
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, numValidators, len(broadcaster.BroadcastMessages)+1)
poolChanges, err := s.BLSChangesPool.PendingBLSToExecChanges()

View File

@ -245,8 +245,8 @@ func TestSubmitAttesterSlashing_Ok(t *testing.T) {
pendingSlashings := s.SlashingsPool.PendingAttesterSlashings(ctx, bs, true)
require.Equal(t, 1, len(pendingSlashings))
assert.DeepEqual(t, migration.V1AttSlashingToV1Alpha1(slashing), pendingSlashings[0])
assert.Equal(t, true, broadcaster.BroadcastCalled)
require.Equal(t, 1, len(broadcaster.BroadcastMessages))
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
require.Equal(t, 1, broadcaster.NumMessages())
_, ok := broadcaster.BroadcastMessages[0].(*ethpbv1alpha1.AttesterSlashing)
assert.Equal(t, true, ok)
}
@ -325,8 +325,8 @@ func TestSubmitAttesterSlashing_AcrossFork(t *testing.T) {
pendingSlashings := s.SlashingsPool.PendingAttesterSlashings(ctx, bs, true)
require.Equal(t, 1, len(pendingSlashings))
assert.DeepEqual(t, migration.V1AttSlashingToV1Alpha1(slashing), pendingSlashings[0])
assert.Equal(t, true, broadcaster.BroadcastCalled)
require.Equal(t, 1, len(broadcaster.BroadcastMessages))
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
require.Equal(t, 1, broadcaster.NumMessages())
_, ok := broadcaster.BroadcastMessages[0].(*ethpbv1alpha1.AttesterSlashing)
assert.Equal(t, true, ok)
}
@ -372,7 +372,7 @@ func TestSubmitAttesterSlashing_InvalidSlashing(t *testing.T) {
_, err = s.SubmitAttesterSlashing(ctx, slashing)
require.ErrorContains(t, "Invalid attester slashing", err)
assert.Equal(t, false, broadcaster.BroadcastCalled)
assert.Equal(t, false, broadcaster.BroadcastCalled.Load())
}
func TestSubmitProposerSlashing_Ok(t *testing.T) {
@ -442,8 +442,8 @@ func TestSubmitProposerSlashing_Ok(t *testing.T) {
pendingSlashings := s.SlashingsPool.PendingProposerSlashings(ctx, bs, true)
require.Equal(t, 1, len(pendingSlashings))
assert.DeepEqual(t, migration.V1ProposerSlashingToV1Alpha1(slashing), pendingSlashings[0])
assert.Equal(t, true, broadcaster.BroadcastCalled)
require.Equal(t, 1, len(broadcaster.BroadcastMessages))
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
require.Equal(t, 1, broadcaster.NumMessages())
_, ok := broadcaster.BroadcastMessages[0].(*ethpbv1alpha1.ProposerSlashing)
assert.Equal(t, true, ok)
}
@ -514,8 +514,8 @@ func TestSubmitProposerSlashing_AcrossFork(t *testing.T) {
pendingSlashings := s.SlashingsPool.PendingProposerSlashings(ctx, bs, true)
require.Equal(t, 1, len(pendingSlashings))
assert.DeepEqual(t, migration.V1ProposerSlashingToV1Alpha1(slashing), pendingSlashings[0])
assert.Equal(t, true, broadcaster.BroadcastCalled)
require.Equal(t, 1, len(broadcaster.BroadcastMessages))
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
require.Equal(t, 1, broadcaster.NumMessages())
_, ok := broadcaster.BroadcastMessages[0].(*ethpbv1alpha1.ProposerSlashing)
assert.Equal(t, true, ok)
}
@ -554,5 +554,5 @@ func TestSubmitProposerSlashing_InvalidSlashing(t *testing.T) {
_, err = s.SubmitProposerSlashing(ctx, slashing)
require.ErrorContains(t, "Invalid proposer slashing", err)
assert.Equal(t, false, broadcaster.BroadcastCalled)
assert.Equal(t, false, broadcaster.BroadcastCalled.Load())
}

View File

@ -43,7 +43,7 @@ func TestServer_SubmitProposerSlashing(t *testing.T) {
_, err = bs.SubmitProposerSlashing(ctx, slashing)
require.NoError(t, err)
assert.Equal(t, true, mb.BroadcastCalled, "Expected broadcast to be called")
assert.Equal(t, true, mb.BroadcastCalled.Load(), "Expected broadcast to be called")
}
func TestServer_SubmitAttesterSlashing(t *testing.T) {
@ -74,7 +74,7 @@ func TestServer_SubmitAttesterSlashing(t *testing.T) {
// slashed indices.
_, err = bs.SubmitAttesterSlashing(ctx, slashing)
require.NoError(t, err)
assert.Equal(t, true, mb.BroadcastCalled, "Expected broadcast to be called when flag is set")
assert.Equal(t, true, mb.BroadcastCalled.Load(), "Expected broadcast to be called when flag is set")
}
func TestServer_SubmitProposerSlashing_DontBroadcast(t *testing.T) {
@ -111,7 +111,7 @@ func TestServer_SubmitProposerSlashing_DontBroadcast(t *testing.T) {
t.Errorf("Wanted %v, received %v", wanted, res)
}
assert.Equal(t, false, mb.BroadcastCalled, "Expected broadcast not to be called by default")
assert.Equal(t, false, mb.BroadcastCalled.Load(), "Expected broadcast not to be called by default")
slashing, err = util.GenerateProposerSlashingForValidator(st, privs[5], primitives.ValidatorIndex(5))
require.NoError(t, err)
@ -158,7 +158,7 @@ func TestServer_SubmitAttesterSlashing_DontBroadcast(t *testing.T) {
if !proto.Equal(wanted, res) {
t.Errorf("Wanted %v, received %v", wanted, res)
}
assert.Equal(t, false, mb.BroadcastCalled, "Expected broadcast not to be called by default")
assert.Equal(t, false, mb.BroadcastCalled.Load(), "Expected broadcast not to be called by default")
slashing, err = util.GenerateAttesterSlashingForValidator(st, privs[5], primitives.ValidatorIndex(5))
require.NoError(t, err)