Only process blocks which haven't been processed (#13442)

Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
This commit is contained in:
Justin Traglia 2024-01-09 16:14:03 -06:00 committed by GitHub
parent 703cfc5819
commit cf606e3766
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 9 additions and 12 deletions

View File

@ -103,7 +103,7 @@ func (e *cacheEntry) filter(root [32]byte, kc safeCommitmentArray) ([]blocks.ROB
return scs, nil
}
// safeCommitemntArray is a fixed size array of commitment byte slices. This is helpful for avoiding
// safeCommitmentArray is a fixed size array of commitment byte slices. This is helpful for avoiding
// gratuitous bounds checks.
type safeCommitmentArray [fieldparams.MaxBlobsPerBlock][]byte

View File

@ -154,7 +154,7 @@ func (s *Service) processFetchedData(
}
}
// processFetchedData processes data received from queue.
// processFetchedDataRegSync processes data received from queue.
func (s *Service) processFetchedDataRegSync(
ctx context.Context, genesis time.Time, startSlot primitives.Slot, data *blocksQueueFetchedData) {
defer s.updatePeerScorerStats(data.pid, startSlot)
@ -173,16 +173,13 @@ func (s *Service) processFetchedDataRegSync(
"firstSlot": data.bwb[0].Block.Block().Slot(),
"firstUnprocessed": bwb[0].Block.Block().Slot(),
}
for _, b := range data.bwb {
for _, b := range bwb {
if err := avs.Persist(s.clock.CurrentSlot(), b.Blobs...); err != nil {
log.WithError(err).WithFields(batchFields).WithFields(syncFields(b.Block)).Warn("Batch failure due to BlobSidecar issues")
return
}
if err := s.processBlock(ctx, genesis, b, s.cfg.Chain.ReceiveBlock, avs); err != nil {
switch {
case errors.Is(err, errBlockAlreadyProcessed):
log.WithError(err).WithFields(batchFields).WithFields(syncFields(b.Block)).Warn("Skipping already processed block")
continue
case errors.Is(err, errParentDoesNotExist):
log.WithFields(batchFields).WithField("missingParent", fmt.Sprintf("%#x", b.Block.Block().ParentRoot())).
WithFields(syncFields(b.Block)).Debug("Could not process batch blocks due to missing parent")
@ -292,7 +289,7 @@ func validUnprocessed(ctx context.Context, bwb []blocks.BlockWithROBlobs, headSl
parent := bwb[i-1].Block
if parent.Root() != b.Block().ParentRoot() {
return nil, fmt.Errorf("expected linear block list with parent root of %#x (slot %d) but received %#x (slot %d)",
parent, parent.Block().Slot(), b.Block().ParentRoot(), b.Block().Slot())
parent.Root(), parent.Block().Slot(), b.Block().ParentRoot(), b.Block().Slot())
}
}
}
@ -302,7 +299,7 @@ func validUnprocessed(ctx context.Context, bwb []blocks.BlockWithROBlobs, headSl
if *processed+1 == len(bwb) {
maxIncoming := bwb[len(bwb)-1].Block
maxRoot := maxIncoming.Root()
return nil, fmt.Errorf("headSlot:%d, blockSlot:%d , root %#x:%w", headSlot, maxIncoming.Block().Slot(), maxRoot, errBlockAlreadyProcessed)
return nil, fmt.Errorf("%w: headSlot=%d, blockSlot=%d, root=%#x", errBlockAlreadyProcessed, headSlot, maxIncoming.Block().Slot(), maxRoot)
}
nonProcessedIdx := *processed + 1
return bwb[nonProcessedIdx:], nil

View File

@ -110,7 +110,7 @@ func validateRangeRequest(r *pb.BeaconBlocksByRangeRequest, current primitives.S
if rp.start > maxStart {
return rangeParams{}, p2ptypes.ErrInvalidRequest
}
rp.end, err = rp.start.SafeAdd((rp.size - 1))
rp.end, err = rp.start.SafeAdd(rp.size - 1)
if err != nil {
return rangeParams{}, p2ptypes.ErrInvalidRequest
}

View File

@ -81,7 +81,7 @@ type BlockWithROBlobs struct {
Blobs []ROBlob
}
// BlockWithROBlobsSlice gives convnenient access to getting a slice of just the ROBlocks,
// BlockWithROBlobsSlice gives convenient access to getting a slice of just the ROBlocks,
// and defines sorting helpers.
type BlockWithROBlobsSlice []BlockWithROBlobs

View File

@ -250,7 +250,7 @@ func (m *SparseMerkleTrie) Copy() *SparseMerkleTrie {
// NumOfItems returns the num of items stored in
// the sparse merkle trie. We handle a special case
// where if there is only one item stored and it is a
// where if there is only one item stored and it is an
// empty 32-byte root.
func (m *SparseMerkleTrie) NumOfItems() int {
var zeroBytes [32]byte

View File

@ -189,7 +189,7 @@ func ExtendBlocksPlusBlobs(t *testing.T, blks []blocks.ROBlock, size int) ([]blo
return blks, blobs
}
// HackDenebForkEpoch is helpful for tests that need to set up cases where the deneb fork has passed.
// HackDenebMaxuint is helpful for tests that need to set up cases where the deneb fork has passed.
// We have unit tests that assert our config matches the upstream config, where the next fork is always
// set to MaxUint64 until the fork epoch is formally set. This creates an issue for tests that want to
// work with slots that are defined to be after deneb because converting the max epoch to a slot leads