prysm-pulse/beacon-chain/sync/initial-sync/sync_blocks.go
terence tsao 1b5b8a57e0 Remove unused proto schemas (#3005)
* Update io_kubernetes_build commit hash to 1246899

* Update dependency build_bazel_rules_nodejs to v0.33.1

* Update dependency com_github_hashicorp_golang_lru to v0.5.1

* Update libp2p

* Update io_bazel_rules_k8s commit hash to e68d5d7

* Starting to remove old protos

* Bazel build proto passes

* Fixing pb version

* Cleaned up core package

* Fixing tests

* 6 tests failing

* Update proto bugs

* Fixed incorrect validator ordering proto

* Sync with master

* Update go-ssz commit

* Removed bad copies from v1alpha1 folder

* add json spec json to pb handler

* add nested proto example

* proto/testing test works

* fix refactoring build failures

* use merged ssz

* push latest changes

* used forked json encoding

* used forked json encoding

* fix warning

* fix build issues

* fix test and lint

* fix build

* lint
2019-07-22 10:03:57 -04:00

144 lines
4.5 KiB
Go

package initialsync
import (
"context"
"errors"
"fmt"
"sort"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/prysmaticlabs/go-ssz"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/p2p"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
// processBlock is the main method that validates each block which is received
// for initial sync. It checks if the blocks are valid and then will continue to
// process and save it into the db.
func (s *InitialSync) processBlock(ctx context.Context, block *ethpb.BeaconBlock, chainHead *pb.ChainHeadResponse) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.processBlock")
defer span.End()
recBlock.Inc()
if block.Slot == chainHead.CanonicalSlot {
if err := s.exitInitialSync(s.ctx, block, chainHead); err != nil {
log.Errorf("Could not exit initial sync: %v", err)
return err
}
return nil
}
if err := s.validateAndSaveNextBlock(ctx, block); err != nil {
return err
}
return nil
}
// processBatchedBlocks processes all the received blocks from
// the p2p message.
func (s *InitialSync) processBatchedBlocks(msg p2p.Message, chainHead *pb.ChainHeadResponse) error {
ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processBatchedBlocks")
defer span.End()
batchedBlockReq.Inc()
response := msg.Data.(*pb.BatchedBeaconBlockResponse)
batchedBlocks := response.BatchedBlocks
if len(batchedBlocks) == 0 {
// Do not process empty responses.
s.p2p.Reputation(msg.Peer, p2p.RepPenalityInitialSyncFailure)
return nil
}
log.WithField("blocks", len(batchedBlocks)).Info("Processing batched block response")
// Sort batchBlocks in ascending order.
sort.Slice(batchedBlocks, func(i, j int) bool {
return batchedBlocks[i].Slot < batchedBlocks[j].Slot
})
for _, block := range batchedBlocks {
if err := s.processBlock(ctx, block, chainHead); err != nil {
return err
}
}
log.Debug("Finished processing batched blocks")
return nil
}
// requestBatchedBlocks sends out a request for multiple blocks that's between finalized roots
// and head roots.
func (s *InitialSync) requestBatchedBlocks(ctx context.Context, FinalizedRoot []byte, canonicalRoot []byte, peer peer.ID) {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.requestBatchedBlocks")
defer span.End()
sentBatchedBlockReq.Inc()
log.WithFields(logrus.Fields{
"finalizedBlkRoot": fmt.Sprintf("%#x", bytesutil.Trunc(FinalizedRoot[:])),
"headBlkRoot": fmt.Sprintf("%#x", bytesutil.Trunc(canonicalRoot[:]))},
).Debug("Requesting batched blocks")
if err := s.p2p.Send(ctx, &pb.BatchedBeaconBlockRequest{
FinalizedRoot: FinalizedRoot,
CanonicalRoot: canonicalRoot,
}, peer); err != nil {
log.Errorf("Could not send batch block request to peer %s: %v", peer.Pretty(), err)
}
}
// validateAndSaveNextBlock will validate whether blocks received from the blockfetcher
// routine can be added to the chain.
func (s *InitialSync) validateAndSaveNextBlock(ctx context.Context, block *ethpb.BeaconBlock) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.validateAndSaveNextBlock")
defer span.End()
if block == nil {
return errors.New("received nil block")
}
root, err := ssz.SigningRoot(block)
if err != nil {
return err
}
if s.db.HasBlock(root) {
log.WithField("block", fmt.Sprintf("%#x", root)).
Warn("Skipping block in db already")
return nil
}
if err := s.checkBlockValidity(ctx, block); err != nil {
return err
}
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", bytesutil.Trunc(root[:])),
"slot": block.Slot,
}).Info("Saving block")
s.mutex.Lock()
defer s.mutex.Unlock()
state, err := s.db.HeadState(ctx)
if err != nil {
return err
}
if err := s.chainService.VerifyBlockValidity(ctx, block, state); err != nil {
return err
}
if err := s.db.SaveBlock(block); err != nil {
return err
}
if err := s.db.SaveAttestationTarget(ctx, &pb.AttestationTarget{
Slot: block.Slot,
BeaconBlockRoot: root[:],
ParentRoot: block.ParentRoot,
}); err != nil {
return fmt.Errorf("could not to save attestation target: %v", err)
}
state, err = s.chainService.AdvanceState(ctx, state, block)
if err != nil {
return fmt.Errorf("could not apply block state transition: %v", err)
}
if err := s.chainService.CleanupBlockOperations(ctx, block); err != nil {
return err
}
return s.db.UpdateChainHead(ctx, block, state)
}