Batch Block Roots Requesting (#7027)

* checkpoint
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into batchRootReq
* test
* gaz
* fix test
* comment
* Merge refs/heads/master into multipleBranchProcessing
* Merge refs/heads/master into multipleBranchProcessing
* Merge refs/heads/master into multipleBranchProcessing
* Merge refs/heads/master into multipleBranchProcessing
* Merge refs/heads/master into multipleBranchProcessing
* Merge branch 'master' of https://github.com/prysmaticlabs/geth-sharding into batchRootReq
* terence's review
* Merge branch 'multipleBranchProcessing' of https://github.com/prysmaticlabs/geth-sharding into batchRootReq
This commit is contained in:
Nishant Das 2020-08-20 12:50:14 +08:00 committed by GitHub
parent ba5da21026
commit 7744c3ae47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 151 additions and 54 deletions

View File

@ -57,7 +57,7 @@ const maxBadResponses = 5
const cacheNumCounters, cacheMaxCost, cacheBufferItems = 1000, 1000, 64
// maxDialTimeout is the timeout for a single peer dial.
const maxDialTimeout = 30 * time.Second
var maxDialTimeout = params.BeaconNetworkConfig().RespTimeout
// Service for managing peer to peer (p2p) networking.
type Service struct {

View File

@ -153,6 +153,7 @@ go_test(
"//shared/bytesutil:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/params:go_default_library",
"//shared/rand:go_default_library",
"//shared/roughtime:go_default_library",
"//shared/testutil:go_default_library",
"//shared/testutil/assert:go_default_library",

View File

@ -3,11 +3,9 @@ package sync
import (
"context"
"encoding/hex"
"io"
"sync"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/bls"
@ -17,7 +15,6 @@ import (
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/prysmaticlabs/prysm/shared/slotutil"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
@ -47,8 +44,6 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "processPendingAtts")
defer span.End()
pids := s.p2p.Peers().Connected()
// Before a node processes pending attestations queue, it verifies
// the attestations in the queue are still valid. Attestations will
// be deleted from the queue if invalid (ie. getting staled from falling too many slots behind).
@ -61,6 +56,7 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
}
s.pendingAttsLock.RUnlock()
pendingRoots := [][32]byte{}
randGen := rand.NewGenerator()
for _, bRoot := range roots {
s.pendingAttsLock.RLock()
@ -123,36 +119,10 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
"attCount": len(attestations),
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
}).Debug("Requesting block for pending attestation")
// Start with a random peer to query, but choose the first peer in our unsorted list that claims to
// have a head slot newer or equal to the pending attestation's target boundary slot.
// If there are no peer id's available, then we should exit from this function. The function will
// be run again periodically, and there may be peers available in future runs.
if len(pids) == 0 {
log.Debug("No peer IDs available to request missing block from for pending attestation")
return nil
}
pid := pids[randGen.Int()%len(pids)]
targetSlot := helpers.SlotToEpoch(attestations[0].Message.Aggregate.Data.Target.Epoch)
for _, p := range pids {
cs, err := s.p2p.Peers().ChainState(p)
if err != nil {
return errors.Wrap(err, "could not get chain state for peer")
}
if cs != nil && cs.HeadSlot >= targetSlot {
pid = p
break
}
}
req := [][32]byte{bRoot}
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil && err == io.EOF {
traceutil.AnnotateError(span, err)
log.Debugf("Could not send recent block request: %v", err)
}
pendingRoots = append(pendingRoots, bRoot)
}
}
return nil
return s.sendBatchRootRequest(ctx, pendingRoots, randGen)
}
// This defines how pending attestations is saved in the map. The key is the

View File

@ -44,7 +44,7 @@ func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) {
r := &Service{
p2p: p1,
db: db,
chain: &mock.ChainService{Genesis: roughtime.Now()},
chain: &mock.ChainService{Genesis: roughtime.Now(), FinalizedCheckPoint: &ethpb.Checkpoint{}},
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof),
stateSummaryCache: cache.NewStateSummaryCache(),
}

View File

@ -12,6 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/prysmaticlabs/prysm/shared/slotutil"
@ -22,6 +23,9 @@ import (
var processPendingBlocksPeriod = slotutil.DivideSlotBy(3 /* times per slot */)
const maxPeerRequest = 50
const numOfTries = 5
// processes pending blocks queue on every processPendingBlocksPeriod
func (s *Service) processPendingBlocksQueue() {
ctx := context.Background()
@ -46,6 +50,7 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
return errors.Wrap(err, "could not validate pending slots")
}
slots := s.sortedPendingSlots()
parentRoots := [][32]byte{}
span.AddAttributes(
trace.Int64Attribute("numSlots", int64(len(slots))),
@ -110,26 +115,8 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
"currentSlot": b.Block.Slot,
"parentRoot": hex.EncodeToString(bytesutil.Trunc(b.Block.ParentRoot)),
}).Info("Requesting parent block")
req := [][32]byte{bytesutil.ToBytes32(b.Block.ParentRoot)}
parentRoots = append(parentRoots, bytesutil.ToBytes32(b.Block.ParentRoot))
// Start with a random peer to query, but choose the first peer in our unsorted list that claims to
// have a head slot newer than the block slot we are requesting.
pid := pids[randGen.Int()%len(pids)]
for _, p := range pids {
cs, err := s.p2p.Peers().ChainState(p)
if err != nil {
return errors.Wrap(err, "failed to read chain state for peer")
}
if cs != nil && cs.HeadSlot >= slot {
pid = p
break
}
}
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
traceutil.AnnotateError(span, err)
log.Debugf("Could not send recent block request: %v", err)
}
span.End()
continue
}
@ -162,6 +149,50 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
}
}
return s.sendBatchRootRequest(ctx, parentRoots, randGen)
}
func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, randGen *rand.Rand) error {
ctx, span := trace.StartSpan(ctx, "sendBatchRootRequest")
defer span.End()
if len(roots) == 0 {
return nil
}
_, bestPeers := s.p2p.Peers().BestFinalized(maxPeerRequest, s.chain.FinalizedCheckpt().Epoch)
if len(bestPeers) == 0 {
return nil
}
roots = s.dedupRoots(roots)
// Randomly choose a peer to query from our best peers. If that peer cannot return
// all the requested blocks, we randomly select another peer.
pid := bestPeers[randGen.Int()%len(bestPeers)]
for i := 0; i < numOfTries; i++ {
req := roots
if len(roots) > int(params.BeaconNetworkConfig().MaxRequestBlocks) {
req = roots[:params.BeaconNetworkConfig().MaxRequestBlocks]
}
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
traceutil.AnnotateError(span, err)
log.Debugf("Could not send recent block request: %v", err)
}
newRoots := make([][32]byte, 0, len(roots))
s.pendingQueueLock.RLock()
for _, rt := range roots {
if !s.seenPendingBlocks[rt] {
newRoots = append(newRoots, rt)
}
}
s.pendingQueueLock.RUnlock()
if len(newRoots) == 0 {
break
}
// Choosing a new peer with the leftover set of
// roots to request.
roots = newRoots
pid = bestPeers[randGen.Int()%len(bestPeers)]
}
return nil
}

View File

@ -5,6 +5,7 @@ import (
"math"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p-core/network"
@ -17,6 +18,8 @@ import (
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
@ -278,3 +281,82 @@ func TestService_sortedPendingSlots(t *testing.T) {
want := []uint64{lastSlot - 5, lastSlot - 3, lastSlot - 2, lastSlot}
assert.DeepEqual(t, want, r.sortedPendingSlots(), "Unexpected pending slots list")
}
func TestService_BatchRootRequest(t *testing.T) {
db, _ := dbtest.SetupDB(t)
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
r := &Service{
p2p: p1,
db: db,
chain: &mock.ChainService{
FinalizedCheckPoint: &ethpb.Checkpoint{
Epoch: 1,
},
},
slotToPendingBlocks: make(map[uint64][]*ethpb.SignedBeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
}
err := r.initCaches()
require.NoError(t, err)
p1.Peers().Add(new(enr.Record), p2.PeerID(), nil, network.DirOutbound)
p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerConnected)
p1.Peers().SetChainState(p2.PeerID(), &pb.Status{FinalizedEpoch: 2})
b0 := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{}}
require.NoError(t, r.db.SaveBlock(context.Background(), b0))
b0Root, err := stateutil.BlockRoot(b0.Block)
require.NoError(t, err)
b1 := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 1, ParentRoot: b0Root[:]}}
require.NoError(t, r.db.SaveBlock(context.Background(), b1))
b1Root, err := stateutil.BlockRoot(b1.Block)
require.NoError(t, err)
b2 := &ethpb.BeaconBlock{Slot: 2, ParentRoot: b1Root[:]}
b2Root, err := ssz.HashTreeRoot(b2)
require.NoError(t, err)
b5 := &ethpb.BeaconBlock{Slot: 5, ParentRoot: b2Root[:]}
b5Root, err := ssz.HashTreeRoot(b5)
require.NoError(t, err)
b3 := &ethpb.BeaconBlock{Slot: 3, ParentRoot: b0Root[:]}
b3Root, err := ssz.HashTreeRoot(b3)
require.NoError(t, err)
b4 := &ethpb.BeaconBlock{Slot: 4, ParentRoot: b3Root[:]}
b4Root, err := ssz.HashTreeRoot(b4)
require.NoError(t, err)
// Send in duplicated roots to also test deduplicaton.
sentRoots := [][32]byte{b2Root, b2Root, b3Root, b3Root, b4Root, b5Root}
expectedRoots := [][32]byte{b2Root, b3Root, b4Root, b5Root}
pcl := protocol.ID("/eth2/beacon_chain/req/beacon_blocks_by_root/1/ssz_snappy")
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
out := [][32]byte{}
assert.NoError(t, p2.Encoding().DecodeWithMaxLength(stream, &out))
assert.DeepEqual(t, expectedRoots, out, "Did not receive expected message")
response := []*ethpb.SignedBeaconBlock{{Block: b2},
{Block: b3}, {Block: b4}, {Block: b5}}
for _, blk := range response {
_, err := stream.Write([]byte{responseCodeSuccess})
assert.NoError(t, err, "Failed to write to stream")
_, err = p2.Encoding().EncodeWithMaxLength(stream, blk)
assert.NoError(t, err, "Could not send response back")
}
assert.NoError(t, stream.Close())
})
require.NoError(t, r.sendBatchRootRequest(context.Background(), sentRoots, rand.NewGenerator()))
if testutil.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
assert.Equal(t, 4, len(r.slotToPendingBlocks), "Incorrect size for slot to pending blocks cache")
assert.Equal(t, 4, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
}

View File

@ -49,6 +49,19 @@ func (s *Service) dedupBlocksAndRoots(blks []*ethpb.SignedBeaconBlock, roots [][
return newBlks, newRoots, nil
}
func (s *Service) dedupRoots(roots [][32]byte) [][32]byte {
newRoots := make([][32]byte, 0, len(roots))
rootMap := make(map[[32]byte]bool, len(roots))
for i, r := range roots {
if rootMap[r] {
continue
}
rootMap[r] = true
newRoots = append(newRoots, roots[i])
}
return newRoots
}
// sort the provided blocks and roots in ascending order. This method assumes that the size of
// block slice and root slice is equal.
func (s *Service) sortBlocksAndRoots(blks []*ethpb.SignedBeaconBlock, roots [][32]byte) ([]*ethpb.SignedBeaconBlock, [][32]byte) {