fix-roundRobinSync (#3862)

This commit is contained in:
Alex 2019-10-28 22:29:33 +03:00 committed by Preston Van Loon
parent c4d47faae5
commit 094f1974be
2 changed files with 26 additions and 18 deletions

View File

@ -35,6 +35,8 @@ go_test(
name = "go_default_test", name = "go_default_test",
srcs = ["round_robin_test.go"], srcs = ["round_robin_test.go"],
embed = [":go_default_library"], embed = [":go_default_library"],
race = "on",
tags = ["race_on"],
deps = [ deps = [
"//beacon-chain/blockchain/testing:go_default_library", "//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/p2p/testing:go_default_library", "//beacon-chain/p2p/testing:go_default_library",

View File

@ -5,7 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"sort" "sort"
"sync" "sync/atomic"
"time" "time"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
@ -42,13 +42,10 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error {
counter := ratecounter.NewRateCounter(counterSeconds * time.Second) counter := ratecounter.NewRateCounter(counterSeconds * time.Second)
var lastEmptyRequests int var lastEmptyRequests int
errChan := make(chan error)
// Step 1 - Sync to end of finalized epoch. // Step 1 - Sync to end of finalized epoch.
for s.chain.HeadSlot() < helpers.StartSlot(highestFinalizedEpoch()+1) { for s.chain.HeadSlot() < helpers.StartSlot(highestFinalizedEpoch()+1) {
root, finalizedEpoch, peers := bestFinalized() root, finalizedEpoch, peers := bestFinalized()
var blocks []*eth.BeaconBlock
// request a range of blocks to be requested from multiple peers. // request a range of blocks to be requested from multiple peers.
// Example: // Example:
// - number of peers = 4 // - number of peers = 4
@ -61,11 +58,14 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error {
if len(peers) == 0 { if len(peers) == 0 {
return nil, errors.WithStack(errors.New("no peers left to request blocks")) return nil, errors.WithStack(errors.New("no peers left to request blocks"))
} }
var wg sync.WaitGroup var p2pRequestCount int32
errChan := make(chan error)
blocksChan := make(chan []*eth.BeaconBlock)
// Handle block large block ranges of skipped slots. // Handle block large block ranges of skipped slots.
start += count * uint64(lastEmptyRequests*len(peers)) start += count * uint64(lastEmptyRequests*len(peers))
atomic.AddInt32(&p2pRequestCount, int32(len(peers)))
for i, pid := range peers { for i, pid := range peers {
if ctx.Err() != nil { if ctx.Err() != nil {
return nil, ctx.Err() return nil, ctx.Err()
@ -90,10 +90,14 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error {
Step: step, Step: step,
} }
// Fulfill requests asynchronously, in parallel, and wait for results from all.
wg.Add(1)
go func(i int, pid peer.ID) { go func(i int, pid peer.ID) {
defer wg.Done() defer func() {
zeroIfIAmTheLast := atomic.AddInt32(&p2pRequestCount, -1)
if zeroIfIAmTheLast == 0 {
close(blocksChan)
}
}()
resp, err := s.requestBlocks(ctx, req, pid) resp, err := s.requestBlocks(ctx, req, pid)
log.WithField("peer", pid.Pretty()).Debugf("Received %d blocks", len(resp)) log.WithField("peer", pid.Pretty()).Debugf("Received %d blocks", len(resp))
if err != nil { if err != nil {
@ -110,28 +114,30 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error {
errChan <- errors.WithStack(errors.New("no peers left to request blocks")) errChan <- errors.WithStack(errors.New("no peers left to request blocks"))
return return
} }
_, err = request(start, step, count/uint64(len(ps)) /*count*/, ps, int(count)%len(ps) /*remainder*/) resp, err = request(start, step, count/uint64(len(ps)) /*count*/, ps, int(count)%len(ps) /*remainder*/)
if err != nil { if err != nil {
errChan <- err errChan <- err
return return
} }
} }
blocks = append(blocks, resp...) blocksChan <- resp
}(i, pid) }(i, pid)
} }
// Wait for done signal or any error. var unionRespBlocks []*eth.BeaconBlock
done := make(chan interface{})
go func() {
wg.Wait()
done <- true
}()
for { for {
select { select {
case err := <-errChan: case err := <-errChan:
return nil, err return nil, err
case <-done: case resp, ok := <-blocksChan:
return blocks, nil if ok {
// if this synchronization becomes a bottleneck:
// think about immediately allocating space for all peers in unionRespBlocks,
// and write without synchronization
unionRespBlocks = append(unionRespBlocks, resp...)
} else {
return unionRespBlocks, nil
}
} }
} }
} }