Check blob index duplication for blob notifier (#13123)

* Check blob index duplication for blob notifier

* Better locks and test

* Better locks and test

* Kasey's feedback

* Fix init
This commit is contained in:
terencechain 2023-10-26 20:26:34 -07:00 committed by GitHub
parent 6f941b8138
commit 203dc5f63b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 78 additions and 1 deletions

View File

@ -9,7 +9,7 @@ import (
// SendNewBlobEvent sends a message to the BlobNotifier channel that the blob
// for the blocroot `root` is ready in the database
func (s *Service) sendNewBlobEvent(root [32]byte, index uint64) {
s.blobNotifiers.forRoot(root) <- index
s.blobNotifiers.notifyIndex(root, index)
}
// ReceiveBlob saves the blob to database and sends the new event

View File

@ -94,6 +94,35 @@ var ErrMissingClockSetter = errors.New("blockchain Service initialized without a
type blobNotifierMap struct {
sync.RWMutex
notifiers map[[32]byte]chan uint64
seenIndex map[[32]byte][fieldparams.MaxBlobsPerBlock]bool
}
// notifyIndex notifies a blob by its index for a given root.
// It uses internal maps to keep track of seen indices and notifier channels.
func (bn *blobNotifierMap) notifyIndex(root [32]byte, idx uint64) {
if idx >= fieldparams.MaxBlobsPerBlock {
return
}
bn.Lock()
seen := bn.seenIndex[root]
if seen[idx] {
bn.Unlock()
return
}
seen[idx] = true
bn.seenIndex[root] = seen
// Retrieve or create the notifier channel for the given root.
c, ok := bn.notifiers[root]
if !ok {
c = make(chan uint64, fieldparams.MaxBlobsPerBlock)
bn.notifiers[root] = c
}
bn.Unlock()
c <- idx
}
func (bn *blobNotifierMap) forRoot(root [32]byte) chan uint64 {
@ -110,6 +139,7 @@ func (bn *blobNotifierMap) forRoot(root [32]byte) chan uint64 {
func (bn *blobNotifierMap) delete(root [32]byte) {
bn.Lock()
defer bn.Unlock()
delete(bn.seenIndex, root)
delete(bn.notifiers, root)
}
@ -126,6 +156,7 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) {
ctx, cancel := context.WithCancel(ctx)
bn := &blobNotifierMap{
notifiers: make(map[[32]byte]chan uint64),
seenIndex: make(map[[32]byte][fieldparams.MaxBlobsPerBlock]bool),
}
srv := &Service{
ctx: ctx,

View File

@ -25,6 +25,7 @@ import (
state_native "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/state-native"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v4/config/features"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
consensusblocks "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
@ -514,3 +515,48 @@ func (s *MockClockSetter) SetClock(g *startup.Clock) error {
s.G = g
return s.Err
}
func TestNotifyIndex(t *testing.T) {
// Initialize a blobNotifierMap
bn := &blobNotifierMap{
seenIndex: make(map[[32]byte][fieldparams.MaxBlobsPerBlock]bool),
notifiers: make(map[[32]byte]chan uint64),
}
// Sample root and index
var root [32]byte
copy(root[:], "exampleRoot")
// Test notifying a new index
bn.notifyIndex(root, 1)
if !bn.seenIndex[root][1] {
t.Errorf("Index was not marked as seen")
}
// Test that a new channel is created
if _, ok := bn.notifiers[root]; !ok {
t.Errorf("Notifier channel was not created")
}
// Test notifying an already seen index
bn.notifyIndex(root, 1)
if len(bn.notifiers[root]) > 1 {
t.Errorf("Notifier channel should not receive multiple messages for the same index")
}
// Test notifying a new index again
bn.notifyIndex(root, 2)
if !bn.seenIndex[root][2] {
t.Errorf("Index was not marked as seen")
}
// Test that the notifier channel receives the index
select {
case idx := <-bn.notifiers[root]:
if idx != 1 {
t.Errorf("Received index on channel is incorrect")
}
default:
t.Errorf("Notifier channel did not receive the index")
}
}